From bf11a4d35c83a4518f32c72c277eb81cc661e81b Mon Sep 17 00:00:00 2001 From: James Lucas Date: Thu, 16 Sep 2021 12:37:05 +1000 Subject: [PATCH] Action methods can also send a JsonRpcNotification with method log to the scheduler to have it log on the action's behalf --- src/Scheduler.php | 55 +++++++++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 23 deletions(-) diff --git a/src/Scheduler.php b/src/Scheduler.php index 811166a..4bdb820 100644 --- a/src/Scheduler.php +++ b/src/Scheduler.php @@ -202,6 +202,9 @@ class Scheduler implements LoggerAwareInterface { /** @var string RPC method name to get an action handler to run a request */ const ACTION_RUN_METHOD = 'run'; + /** @var string RPC method from an Action to request the scheduler to log a message*/ + const ACTION_LOG_METHOD = 'log'; + /** @var int */ protected int $memoryLimit = 0; @@ -390,30 +393,36 @@ public function start_action(string $actionName): Process $process_decoded_stdout = new JsonRpcDecoder( $process->stdout ); /** Handler for the Json RPC response */ - $process_decoded_stdout->on('data', function (JsonRpcResponse $response) { - if ($response->isSuccess()) { - /** Once the action has been processed successfully we can discard of our copy of it */ - unset($this->inflightActionCommands[$response->getId()]); - } else { - /** Transfer the action from the running queue to the errored queue - * @TODO We need to watch this queue and handle any run-away errors (eg a database been unavailable to ingest events) - * @TODO This should be put into a function as we call it both here and when an action terminates unexpectedly - */ - $error = [ - 'error' => $response->getError(), - 'action' => $this->inflightActionCommands[$response->getId()]['action'], - ]; - $this->erroredActionCommands[] = $error; - unset($this->inflightActionCommands[$response->getId()]); - - $this->logger->error($response->getError()->getMessage() . " : " . json_encode($response->getError()->getData())); - } - /** Release memory used by the inflight action table */ - if (count($this->inflightActionCommands) === 0) - { - $this->inflightActionCommands = []; + $process_decoded_stdout->on('data', function ($rpc) { + if ($rpc instanceof JsonRpcResponse) { + if ($rpc->isSuccess()) { + /** Once the action has been processed successfully we can discard of our copy of it */ + unset($this->inflightActionCommands[$rpc->getId()]); + } else { + /** Transfer the action from the running queue to the errored queue + * @TODO We need to watch this queue and handle any run-away errors (eg a database been unavailable to ingest events) + * @TODO This should be put into a function as we call it both here and when an action terminates unexpectedly + */ + $error = [ + 'error' => $rpc->getError(), + 'action' => $this->inflightActionCommands[$rpc->getId()]['action'], + ]; + $this->erroredActionCommands[] = $error; + unset($this->inflightActionCommands[$rpc->getId()]); + + $this->logger->error($rpc->getError()->getMessage() . " : " . json_encode($rpc->getError()->getData())); + } + /** Release memory used by the inflight action table */ + if (count($this->inflightActionCommands) === 0) { + $this->inflightActionCommands = []; + } + $this->dirty = true; + } elseif ($rpc instanceof JsonRpcNotification) { + if ($rpc->getMethod() === self::ACTION_LOG_METHOD) { + //Log action expects logLevel to match \Psr\Log\LogLevel + $this->logger->log($rpc->getParam('logLevel'), $rpc->getParam('message')); + } } - $this->dirty = true; }); $process->on('exit', function ($code, $term) use ($actionName, $process) {