Skip to content

Commit

Permalink
Action methods can also send a JsonRpcNotification with method log to…
Browse files Browse the repository at this point in the history
… the scheduler to have it log on the action's behalf
  • Loading branch information
lucasnetau committed Sep 16, 2021
1 parent d7ac745 commit bf11a4d
Showing 1 changed file with 32 additions and 23 deletions.
55 changes: 32 additions & 23 deletions src/Scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ class Scheduler implements LoggerAwareInterface {
/** @var string RPC method name to get an action handler to run a request */
const ACTION_RUN_METHOD = 'run';

/** @var string RPC method from an Action to request the scheduler to log a message*/
const ACTION_LOG_METHOD = 'log';

/** @var int */
protected int $memoryLimit = 0;

Expand Down Expand Up @@ -390,30 +393,36 @@ public function start_action(string $actionName): Process
$process_decoded_stdout = new JsonRpcDecoder( $process->stdout );

/** Handler for the Json RPC response */
$process_decoded_stdout->on('data', function (JsonRpcResponse $response) {
if ($response->isSuccess()) {
/** Once the action has been processed successfully we can discard of our copy of it */
unset($this->inflightActionCommands[$response->getId()]);
} else {
/** Transfer the action from the running queue to the errored queue
* @TODO We need to watch this queue and handle any run-away errors (eg a database been unavailable to ingest events)
* @TODO This should be put into a function as we call it both here and when an action terminates unexpectedly
*/
$error = [
'error' => $response->getError(),
'action' => $this->inflightActionCommands[$response->getId()]['action'],
];
$this->erroredActionCommands[] = $error;
unset($this->inflightActionCommands[$response->getId()]);

$this->logger->error($response->getError()->getMessage() . " : " . json_encode($response->getError()->getData()));
}
/** Release memory used by the inflight action table */
if (count($this->inflightActionCommands) === 0)
{
$this->inflightActionCommands = [];
$process_decoded_stdout->on('data', function ($rpc) {
if ($rpc instanceof JsonRpcResponse) {
if ($rpc->isSuccess()) {
/** Once the action has been processed successfully we can discard of our copy of it */
unset($this->inflightActionCommands[$rpc->getId()]);
} else {
/** Transfer the action from the running queue to the errored queue
* @TODO We need to watch this queue and handle any run-away errors (eg a database been unavailable to ingest events)
* @TODO This should be put into a function as we call it both here and when an action terminates unexpectedly
*/
$error = [
'error' => $rpc->getError(),
'action' => $this->inflightActionCommands[$rpc->getId()]['action'],
];
$this->erroredActionCommands[] = $error;
unset($this->inflightActionCommands[$rpc->getId()]);

$this->logger->error($rpc->getError()->getMessage() . " : " . json_encode($rpc->getError()->getData()));
}
/** Release memory used by the inflight action table */
if (count($this->inflightActionCommands) === 0) {
$this->inflightActionCommands = [];
}
$this->dirty = true;
} elseif ($rpc instanceof JsonRpcNotification) {
if ($rpc->getMethod() === self::ACTION_LOG_METHOD) {
//Log action expects logLevel to match \Psr\Log\LogLevel
$this->logger->log($rpc->getParam('logLevel'), $rpc->getParam('message'));
}
}
$this->dirty = true;
});

$process->on('exit', function ($code, $term) use ($actionName, $process) {
Expand Down

0 comments on commit bf11a4d

Please sign in to comment.