From c3ec89dd8d3f66faf1b460d9e6bea71d06b69d90 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Wed, 14 Dec 2022 18:57:39 +0300 Subject: [PATCH 01/10] Force collection of garbage cycles after a Workflow destroying; remove unused parameter from the \Temporal\Internal\Transport\Client constructor --- src/Internal/Transport/Client.php | 5 +---- src/Internal/Transport/Router/DestroyWorkflow.php | 2 ++ 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/Internal/Transport/Client.php b/src/Internal/Transport/Client.php index a02748f7..cd2f1dd7 100644 --- a/src/Internal/Transport/Client.php +++ b/src/Internal/Transport/Client.php @@ -37,17 +37,14 @@ final class Client implements ClientInterface 'a request with that identifier was not sent'; private QueueInterface $queue; - private LoopInterface $loop; private array $requests = []; /** * @param QueueInterface $queue - * @param LoopInterface $loop */ - public function __construct(QueueInterface $queue, LoopInterface $loop) + public function __construct(QueueInterface $queue) { $this->queue = $queue; - $this->loop = $loop; } /** diff --git a/src/Internal/Transport/Router/DestroyWorkflow.php b/src/Internal/Transport/Router/DestroyWorkflow.php index 7f92a035..ec257969 100644 --- a/src/Internal/Transport/Router/DestroyWorkflow.php +++ b/src/Internal/Transport/Router/DestroyWorkflow.php @@ -34,6 +34,8 @@ public function handle(RequestInterface $request, array $headers, Deferred $reso $this->kill($runId); $resolver->resolve(EncodedValues::fromValues([null])); + + \gc_collect_cycles(); } /** From 72b3db9fbdcf1558a2c3c3f5ad3e0b648dab97e7 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Thu, 15 Dec 2022 11:52:39 +0300 Subject: [PATCH 02/10] Update AsyncAwaits --- src/Internal/Workflow/WorkflowContext.php | 73 ++++++++++++----------- 1 file changed, 37 insertions(+), 36 deletions(-) diff --git a/src/Internal/Workflow/WorkflowContext.php b/src/Internal/Workflow/WorkflowContext.php index 454c69cf..b300cfe7 100644 --- a/src/Internal/Workflow/WorkflowContext.php +++ b/src/Internal/Workflow/WorkflowContext.php @@ -449,10 +449,10 @@ public function await(...$conditions): PromiseInterface $result[] = $this->addCondition($conditionGroupId, $condition); } - if ($condition instanceof PromiseInterface) - { - $result[] = $this->addAsyncCondition($conditionGroupId, $condition); - } + // if ($condition instanceof PromiseInterface) + // { + // $result[] = $this->addAsyncCondition($conditionGroupId, $condition); + // } } if (\count($result) === 1) { @@ -472,6 +472,7 @@ public function awaitWithTimeout($interval, ...$conditions): PromiseInterface $conditions[] = $timer; return $this->await(...$conditions) + // is internal timer is not complete then cancel it ->then(static fn (): bool => !$timer->isComplete()); } @@ -505,19 +506,19 @@ protected function addCondition(string $conditionGroupId, callable $condition): return $deferred->promise(); } - protected function addAsyncCondition(string $conditionGroupId, PromiseInterface $condition): PromiseInterface - { - $this->asyncAwaits[$conditionGroupId][] = $condition; - return $condition->then( - function ($result) use ($conditionGroupId) { - $this->resolveConditionGroup($conditionGroupId); - return $result; - }, - function () use ($conditionGroupId) { - $this->rejectConditionGroup($conditionGroupId); - } - ); - } + // protected function addAsyncCondition(string $conditionGroupId, PromiseInterface $condition): PromiseInterface + // { + // $this->asyncAwaits[$conditionGroupId][] = $condition; + // return $condition->then( + // function ($result) use ($conditionGroupId) { + // $this->resolveConditionGroup($conditionGroupId); + // return $result; + // }, + // function () use ($conditionGroupId) { + // $this->rejectConditionGroup($conditionGroupId); + // } + // ); + // } /** * Record last stack trace of the call. @@ -560,24 +561,24 @@ public function rejectConditionGroup(string $conditionGroupId): void private function clearAsyncAwaits(string $conditionGroupId): void { - // Check pending timers in this group - if (!isset($this->asyncAwaits[$conditionGroupId])) { - return; - } - - // Then cancel any pending timers if exist - foreach ($this->asyncAwaits[$conditionGroupId] as $index => $awaitCondition) { - if (!$awaitCondition->isComplete()) { - /** @var NewTimer $timer */ - $timer = $this->timers->offsetGet($awaitCondition); - if ($timer !== null) { - $request = new Cancel($timer->getID()); - $this->request($request); - $this->timers->offsetUnset($awaitCondition); - } - } - unset($this->asyncAwaits[$conditionGroupId][$index]); - } - unset($this->asyncAwaits[$conditionGroupId]); + // // Check pending timers in this group + // if (!isset($this->asyncAwaits[$conditionGroupId])) { + // return; + // } + // + // // Then cancel any pending timers if exist + // foreach ($this->asyncAwaits[$conditionGroupId] as $index => $awaitCondition) { + // if (!$awaitCondition->isComplete()) { + // /** @var NewTimer $timer */ + // $timer = $this->timers->offsetGet($awaitCondition); + // if ($timer !== null) { + // $request = new Cancel($timer->getID()); + // $this->request($request); + // $this->timers->offsetUnset($awaitCondition); + // } + // } + // unset($this->asyncAwaits[$conditionGroupId][$index]); + // } + // unset($this->asyncAwaits[$conditionGroupId]); } } From d47ae6fac21c15ff9c92b243e05da5c14e36450c Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Thu, 15 Dec 2022 15:59:37 +0300 Subject: [PATCH 03/10] Fix tests; add optimization for internal timer in the `awaitWithTimeout()` method; cleanup; --- src/Internal/Workflow/Process/Scope.php | 2 +- src/Internal/Workflow/ScopeContext.php | 29 ------- src/Internal/Workflow/WorkflowContext.php | 82 +++++-------------- .../AwaitWithTimeoutTestCase.php | 27 +----- 4 files changed, 25 insertions(+), 115 deletions(-) diff --git a/src/Internal/Workflow/Process/Scope.php b/src/Internal/Workflow/Process/Scope.php index 8fdf9ff8..cdcffdf4 100644 --- a/src/Internal/Workflow/Process/Scope.php +++ b/src/Internal/Workflow/Process/Scope.php @@ -273,7 +273,7 @@ public function then( */ public function onAwait(Deferred $deferred): void { - $this->onCancel[++$this->cancelID] = function (\Throwable $e = null) use ($deferred): void { + $this->onCancel[++$this->cancelID] = static function (\Throwable $e = null) use ($deferred): void { $deferred->reject($e ?? new CanceledFailure('')); }; diff --git a/src/Internal/Workflow/ScopeContext.php b/src/Internal/Workflow/ScopeContext.php index fde43285..a075d395 100644 --- a/src/Internal/Workflow/ScopeContext.php +++ b/src/Internal/Workflow/ScopeContext.php @@ -14,9 +14,7 @@ use React\Promise\Deferred; use React\Promise\PromiseInterface; use Temporal\Exception\Failure\CanceledFailure; -use Temporal\Internal\Support\DateInterval; use Temporal\Internal\Transport\CompletableResult; -use Temporal\Internal\Transport\Request\NewTimer; use Temporal\Internal\Workflow\Process\Scope; use Temporal\Worker\Transport\Command\RequestInterface; use Temporal\Workflow\CancellationScopeInterface; @@ -120,21 +118,6 @@ protected function addCondition(string $conditionGroupId, callable $condition): ); } - protected function addAsyncCondition(string $conditionGroupId, PromiseInterface $condition): PromiseInterface - { - $this->parent->asyncAwaits[$conditionGroupId][] = $condition; - - return $condition->then( - function ($result) use ($conditionGroupId) { - $this->resolveConditionGroup($conditionGroupId); - return $result; - }, - function () use ($conditionGroupId) { - $this->rejectConditionGroup($conditionGroupId); - } - ); - } - /** * Calculate unblocked conditions. */ @@ -153,18 +136,6 @@ public function rejectConditionGroup(string $conditionGroupId): void $this->parent->rejectConditionGroup($conditionGroupId); } - /** - * {@inheritDoc} - */ - public function timer($interval): PromiseInterface - { - $request = new NewTimer(DateInterval::parse($interval, DateInterval::FORMAT_SECONDS)); - $result = $this->request($request); - $this->parent->timers->attach($result, $request); - - return $result; - } - /** * {@inheritDoc} */ diff --git a/src/Internal/Workflow/WorkflowContext.php b/src/Internal/Workflow/WorkflowContext.php index b300cfe7..b918231c 100644 --- a/src/Internal/Workflow/WorkflowContext.php +++ b/src/Internal/Workflow/WorkflowContext.php @@ -35,7 +35,6 @@ use Temporal\Internal\Transport\Request\Panic; use Temporal\Internal\Transport\Request\SideEffect; use Temporal\Promise; -use Temporal\Worker\Transport\Command\CommandInterface; use Temporal\Worker\Transport\Command\RequestInterface; use Temporal\Workflow\ActivityStubInterface; use Temporal\Workflow\ChildWorkflowOptions; @@ -59,12 +58,11 @@ class WorkflowContext implements WorkflowContextInterface protected WorkflowInstanceInterface $workflowInstance; protected ?ValuesInterface $lastCompletionResult = null; - protected array $awaits = []; - protected array $asyncAwaits = []; /** - * @var + * Contains conditional groups + * @var array */ - protected \SplObjectStorage $timers; + protected array $awaits = []; private array $trace = []; private bool $continueAsNew = false; @@ -89,7 +87,6 @@ public function __construct( $this->workflowInstance = $workflowInstance; $this->input = $input; $this->lastCompletionResult = $lastCompletionResult; - $this->timers = new \SplObjectStorage(); } /** @@ -396,10 +393,7 @@ public function newActivityStub(string $class, ActivityOptionsInterface $options public function timer($interval): PromiseInterface { $request = new NewTimer(DateInterval::parse($interval, DateInterval::FORMAT_SECONDS)); - $result = $this->request($request); - $this->timers->attach($result, $request); - - return $result; + return $this->request($request); } /** @@ -447,12 +441,12 @@ public function await(...$conditions): PromiseInterface return resolve(true); } $result[] = $this->addCondition($conditionGroupId, $condition); + continue; } - // if ($condition instanceof PromiseInterface) - // { - // $result[] = $this->addAsyncCondition($conditionGroupId, $condition); - // } + if ($condition instanceof PromiseInterface) { + $result[] = $condition; + } } if (\count($result) === 1) { @@ -467,13 +461,23 @@ public function await(...$conditions): PromiseInterface */ public function awaitWithTimeout($interval, ...$conditions): PromiseInterface { - $timer = $this->timer($interval); + $request = new NewTimer(DateInterval::parse($interval, DateInterval::FORMAT_SECONDS)); + $requestId = $request->getID(); + $timer = $this->request($request); + \assert($timer instanceof CompletableResultInterface); $conditions[] = $timer; return $this->await(...$conditions) - // is internal timer is not complete then cancel it - ->then(static fn (): bool => !$timer->isComplete()); + ->then(function () use ($timer, $requestId): bool { + // If internal timer was not completed then cancel it + $isCompleted = $timer->isComplete(); + if (!$isCompleted) { + $request = new Cancel($requestId); + $this->request($request); + } + return !$isCompleted; + }); } /** @@ -482,8 +486,7 @@ public function awaitWithTimeout($interval, ...$conditions): PromiseInterface public function resolveConditions(): void { foreach ($this->awaits as $awaitsGroupId => $awaitsGroup) { - foreach ($awaitsGroup as $i => $cond) { - [$condition, $deferred] = $cond; + foreach ($awaitsGroup as $i => [$condition, $deferred]) { if ($condition()) { $deferred->resolve(); unset($this->awaits[$awaitsGroupId][$i]); @@ -506,20 +509,6 @@ protected function addCondition(string $conditionGroupId, callable $condition): return $deferred->promise(); } - // protected function addAsyncCondition(string $conditionGroupId, PromiseInterface $condition): PromiseInterface - // { - // $this->asyncAwaits[$conditionGroupId][] = $condition; - // return $condition->then( - // function ($result) use ($conditionGroupId) { - // $this->resolveConditionGroup($conditionGroupId); - // return $result; - // }, - // function () use ($conditionGroupId) { - // $this->rejectConditionGroup($conditionGroupId); - // } - // ); - // } - /** * Record last stack trace of the call. * @@ -541,8 +530,6 @@ public function resolveConditionGroup(string $conditionGroupId): void } unset($this->awaits[$conditionGroupId]); } - - $this->clearAsyncAwaits($conditionGroupId); } public function rejectConditionGroup(string $conditionGroupId): void @@ -555,30 +542,5 @@ public function rejectConditionGroup(string $conditionGroupId): void } unset($this->awaits[$conditionGroupId]); } - - $this->clearAsyncAwaits($conditionGroupId); - } - - private function clearAsyncAwaits(string $conditionGroupId): void - { - // // Check pending timers in this group - // if (!isset($this->asyncAwaits[$conditionGroupId])) { - // return; - // } - // - // // Then cancel any pending timers if exist - // foreach ($this->asyncAwaits[$conditionGroupId] as $index => $awaitCondition) { - // if (!$awaitCondition->isComplete()) { - // /** @var NewTimer $timer */ - // $timer = $this->timers->offsetGet($awaitCondition); - // if ($timer !== null) { - // $request = new Cancel($timer->getID()); - // $this->request($request); - // $this->timers->offsetUnset($awaitCondition); - // } - // } - // unset($this->asyncAwaits[$conditionGroupId][$index]); - // } - // unset($this->asyncAwaits[$conditionGroupId]); } } diff --git a/tests/Unit/WorkflowContext/AwaitWithTimeoutTestCase.php b/tests/Unit/WorkflowContext/AwaitWithTimeoutTestCase.php index 48a61948..dc4751f9 100644 --- a/tests/Unit/WorkflowContext/AwaitWithTimeoutTestCase.php +++ b/tests/Unit/WorkflowContext/AwaitWithTimeoutTestCase.php @@ -94,16 +94,8 @@ public function testAwaitWithTimeoutReturnsTrueWithMetCondition(): void { $this->worker->registerWorkflowObject( new - /** - * Support for PHP7.4 - * @Workflow\WorkflowInterface - */ #[Workflow\WorkflowInterface] class { - /** - * Support for PHP7.4 - * @Workflow\WorkflowMethod(name="AwaitWorkflow") - */ #[WorkflowMethod(name: 'AwaitWorkflow')] public function handler(): iterable { @@ -124,26 +116,15 @@ public function testTimerIsCanceledOnceConditionIsMet(): void $this->addToAssertionCount(1); $this->worker->registerWorkflowObject( new - /** - * Support for PHP7.4 - * @Workflow\WorkflowInterface - */ #[Workflow\WorkflowInterface] class { - /** - * Support for PHP7.4 - * @Workflow\WorkflowMethod(name="AwaitWorkflow") - */ + private bool $doCancel = false; #[WorkflowMethod(name: 'AwaitWorkflow')] public function handler(): iterable { - $this->doCancel = false; - yield Workflow::awaitWithTimeout( 50, - function() { - return $this->doCancel; - } + fn () => $this->doCancel, ); if ($this->doCancel) { @@ -153,10 +134,6 @@ function() { return 'OK'; } - /** - * Support for PHP7.4 - * @Workflow\SignalMethod() - */ #[Workflow\SignalMethod] public function cancel(): void { From 48fcaffcbfcce0838ba91de9ce7b8c5d63319a9e Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Thu, 15 Dec 2022 18:55:46 +0300 Subject: [PATCH 04/10] Optimize WorkflowContext internal promises resolving --- src/Internal/Workflow/ScopeContext.php | 1 + src/Internal/Workflow/WorkflowContext.php | 43 ++++++++----------- .../AwaitWithTimeoutTestCase.php | 16 ------- 3 files changed, 18 insertions(+), 42 deletions(-) diff --git a/src/Internal/Workflow/ScopeContext.php b/src/Internal/Workflow/ScopeContext.php index a075d395..89a276a4 100644 --- a/src/Internal/Workflow/ScopeContext.php +++ b/src/Internal/Workflow/ScopeContext.php @@ -26,6 +26,7 @@ class ScopeContext extends WorkflowContext implements ScopedContextInterface { private WorkflowContext $parent; private Scope $scope; + /** @var callable */ private $onRequest; /** diff --git a/src/Internal/Workflow/WorkflowContext.php b/src/Internal/Workflow/WorkflowContext.php index b918231c..9a7d8cbe 100644 --- a/src/Internal/Workflow/WorkflowContext.php +++ b/src/Internal/Workflow/WorkflowContext.php @@ -60,7 +60,7 @@ class WorkflowContext implements WorkflowContextInterface /** * Contains conditional groups - * @var array + * @var array, array{callable, Deferred}>> */ protected array $awaits = []; @@ -445,7 +445,15 @@ public function await(...$conditions): PromiseInterface } if ($condition instanceof PromiseInterface) { - $result[] = $condition; + $result[] = $condition->then( + function ($result) use ($conditionGroupId) { + $this->resolveConditionGroup($conditionGroupId); + return $result; + }, + function () use ($conditionGroupId) { + $this->rejectConditionGroup($conditionGroupId); + }, + ); } } @@ -461,20 +469,18 @@ public function await(...$conditions): PromiseInterface */ public function awaitWithTimeout($interval, ...$conditions): PromiseInterface { + /** Bypassing {@see timer()} to acquire a timer request ID */ $request = new NewTimer(DateInterval::parse($interval, DateInterval::FORMAT_SECONDS)); $requestId = $request->getID(); $timer = $this->request($request); \assert($timer instanceof CompletableResultInterface); - $conditions[] = $timer; - - return $this->await(...$conditions) + return $this->await($timer, ...$conditions) ->then(function () use ($timer, $requestId): bool { - // If internal timer was not completed then cancel it $isCompleted = $timer->isComplete(); if (!$isCompleted) { - $request = new Cancel($requestId); - $this->request($request); + // If internal timer was not completed then cancel it + $this->request(new Cancel($requestId)); } return !$isCompleted; }); @@ -497,7 +503,7 @@ public function resolveConditions(): void } /** - * @param string $conditionGroupId + * @param non-empty-string $conditionGroupId * @param callable $condition * @return PromiseInterface */ @@ -521,26 +527,11 @@ protected function recordTrace(): void public function resolveConditionGroup(string $conditionGroupId): void { - // First resolve pending promises - if (isset($this->awaits[$conditionGroupId])) { - foreach ($this->awaits[$conditionGroupId] as $i => $cond) { - [$_, $deferred] = $cond; - unset($this->awaits[$conditionGroupId][$i]); - $deferred->resolve(); - } - unset($this->awaits[$conditionGroupId]); - } + unset($this->awaits[$conditionGroupId]); } public function rejectConditionGroup(string $conditionGroupId): void { - if (isset($this->awaits[$conditionGroupId])) { - foreach ($this->awaits[$conditionGroupId] as $i => $cond) { - [$_, $deferred] = $cond; - unset($this->awaits[$conditionGroupId][$i]); - $deferred->reject(); - } - unset($this->awaits[$conditionGroupId]); - } + unset($this->awaits[$conditionGroupId]); } } diff --git a/tests/Unit/WorkflowContext/AwaitWithTimeoutTestCase.php b/tests/Unit/WorkflowContext/AwaitWithTimeoutTestCase.php index dc4751f9..9282c4a7 100644 --- a/tests/Unit/WorkflowContext/AwaitWithTimeoutTestCase.php +++ b/tests/Unit/WorkflowContext/AwaitWithTimeoutTestCase.php @@ -33,16 +33,8 @@ public function testAwaitWithTimeoutReturnsFalseIfTimeoutWasOff(): void { $this->worker->registerWorkflowObject( new - /** - * Support for PHP7.4 - * @Workflow\WorkflowInterface - */ #[Workflow\WorkflowInterface] class { - /** - * Support for PHP7.4 - * @Workflow\WorkflowMethod(name="AwaitWorkflow") - */ #[WorkflowMethod(name: 'AwaitWorkflow')] public function handler(): iterable { @@ -65,16 +57,8 @@ public function testAwaitWithTimeoutStartsTimerWithConditionIsNotMet(): void $this->worker->registerWorkflowObject( new - /** - * Support for PHP7.4 - * @Workflow\WorkflowInterface - */ #[Workflow\WorkflowInterface] class { - /** - * Support for PHP7.4 - * @Workflow\WorkflowMethod(name="AwaitWorkflow") - */ #[WorkflowMethod(name: 'AwaitWorkflow')] public function handler(): iterable { From 496c08f2af6830ebde0c0f5872586ce9ef4c6dd5 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Thu, 15 Dec 2022 19:42:03 +0300 Subject: [PATCH 05/10] Return resolving and rejecting of deferred objects --- src/Internal/Workflow/WorkflowContext.php | 28 +++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/src/Internal/Workflow/WorkflowContext.php b/src/Internal/Workflow/WorkflowContext.php index 9a7d8cbe..3f078bff 100644 --- a/src/Internal/Workflow/WorkflowContext.php +++ b/src/Internal/Workflow/WorkflowContext.php @@ -500,6 +500,15 @@ public function resolveConditions(): void } } } + // foreach ($this->awaits as $awaitsGroupId => $awaitsGroup) { + // foreach ($awaitsGroup as $i => [$condition, $_]) { + // if ($condition()) { + // unset($this->awaits[$awaitsGroupId][$i]); + // $this->resolveConditionGroup($awaitsGroupId); + // break; + // } + // } + // } } /** @@ -527,11 +536,26 @@ protected function recordTrace(): void public function resolveConditionGroup(string $conditionGroupId): void { - unset($this->awaits[$conditionGroupId]); + // First resolve pending promises + if (isset($this->awaits[$conditionGroupId])) { + foreach ($this->awaits[$conditionGroupId] as $i => $cond) { + [$_, $deferred] = $cond; + unset($this->awaits[$conditionGroupId][$i]); + $deferred->resolve(); + } + unset($this->awaits[$conditionGroupId]); + } } public function rejectConditionGroup(string $conditionGroupId): void { - unset($this->awaits[$conditionGroupId]); + if (isset($this->awaits[$conditionGroupId])) { + foreach ($this->awaits[$conditionGroupId] as $i => $cond) { + [$_, $deferred] = $cond; + unset($this->awaits[$conditionGroupId][$i]); + $deferred->reject(); + } + unset($this->awaits[$conditionGroupId]); + } } } From c3027088dd14295a58475584d32a5fad0a425ba8 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Thu, 15 Dec 2022 21:02:32 +0300 Subject: [PATCH 06/10] Update --- src/Internal/Workflow/WorkflowContext.php | 41 ++++++------------- .../AwaitWithTimeoutTestCase.php | 3 +- 2 files changed, 15 insertions(+), 29 deletions(-) diff --git a/src/Internal/Workflow/WorkflowContext.php b/src/Internal/Workflow/WorkflowContext.php index 3f078bff..ca355dfe 100644 --- a/src/Internal/Workflow/WorkflowContext.php +++ b/src/Internal/Workflow/WorkflowContext.php @@ -59,7 +59,7 @@ class WorkflowContext implements WorkflowContextInterface protected ?ValuesInterface $lastCompletionResult = null; /** - * Contains conditional groups + * Contains conditional groups that contains tuple of a condition callable and its promise * @var array, array{callable, Deferred}>> */ protected array $awaits = []; @@ -445,15 +445,7 @@ public function await(...$conditions): PromiseInterface } if ($condition instanceof PromiseInterface) { - $result[] = $condition->then( - function ($result) use ($conditionGroupId) { - $this->resolveConditionGroup($conditionGroupId); - return $result; - }, - function () use ($conditionGroupId) { - $this->rejectConditionGroup($conditionGroupId); - }, - ); + $result[] = $condition; } } @@ -461,7 +453,15 @@ function () use ($conditionGroupId) { return $result[0]; } - return Promise::any($result); + return Promise::any($result)->then( + function ($result) use ($conditionGroupId) { + $this->resolveConditionGroup($conditionGroupId); + return $result; + }, + function () use ($conditionGroupId) { + $this->rejectConditionGroup($conditionGroupId); + }, + ); } /** @@ -536,26 +536,11 @@ protected function recordTrace(): void public function resolveConditionGroup(string $conditionGroupId): void { - // First resolve pending promises - if (isset($this->awaits[$conditionGroupId])) { - foreach ($this->awaits[$conditionGroupId] as $i => $cond) { - [$_, $deferred] = $cond; - unset($this->awaits[$conditionGroupId][$i]); - $deferred->resolve(); - } - unset($this->awaits[$conditionGroupId]); - } + unset($this->awaits[$conditionGroupId]); } public function rejectConditionGroup(string $conditionGroupId): void { - if (isset($this->awaits[$conditionGroupId])) { - foreach ($this->awaits[$conditionGroupId] as $i => $cond) { - [$_, $deferred] = $cond; - unset($this->awaits[$conditionGroupId][$i]); - $deferred->reject(); - } - unset($this->awaits[$conditionGroupId]); - } + unset($this->awaits[$conditionGroupId]); } } diff --git a/tests/Unit/WorkflowContext/AwaitWithTimeoutTestCase.php b/tests/Unit/WorkflowContext/AwaitWithTimeoutTestCase.php index 9282c4a7..15ab8be1 100644 --- a/tests/Unit/WorkflowContext/AwaitWithTimeoutTestCase.php +++ b/tests/Unit/WorkflowContext/AwaitWithTimeoutTestCase.php @@ -106,10 +106,11 @@ class { #[WorkflowMethod(name: 'AwaitWorkflow')] public function handler(): iterable { - yield Workflow::awaitWithTimeout( + $result = yield Workflow::awaitWithTimeout( 50, fn () => $this->doCancel, ); + assertTrue($result); if ($this->doCancel) { return 'CANCEL'; From 85ebb5c35756368f7eea8c10e71357e5d35c4865 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Fri, 16 Dec 2022 16:09:04 +0300 Subject: [PATCH 07/10] WorkflowContext patch: throw an exception when the promise::any is rejected --- src/Internal/Workflow/WorkflowContext.php | 26 +++++++++++------------ 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/Internal/Workflow/WorkflowContext.php b/src/Internal/Workflow/WorkflowContext.php index ca355dfe..847051f4 100644 --- a/src/Internal/Workflow/WorkflowContext.php +++ b/src/Internal/Workflow/WorkflowContext.php @@ -449,17 +449,26 @@ public function await(...$conditions): PromiseInterface } } - if (\count($result) === 1) { - return $result[0]; - } + // if (\count($result) === 1) { + // return $result[0]; + // } return Promise::any($result)->then( function ($result) use ($conditionGroupId) { $this->resolveConditionGroup($conditionGroupId); return $result; }, - function () use ($conditionGroupId) { + function ($reason) use ($conditionGroupId) { $this->rejectConditionGroup($conditionGroupId); + // Throw the first reason + // It need to avoid memory leak when the related workflow is destroyed + if (\is_iterable($reason)) { + foreach ($reason as $exception) { + if ($exception instanceof \Throwable) { + throw $exception; + } + } + } }, ); } @@ -500,15 +509,6 @@ public function resolveConditions(): void } } } - // foreach ($this->awaits as $awaitsGroupId => $awaitsGroup) { - // foreach ($awaitsGroup as $i => [$condition, $_]) { - // if ($condition()) { - // unset($this->awaits[$awaitsGroupId][$i]); - // $this->resolveConditionGroup($awaitsGroupId); - // break; - // } - // } - // } } /** From 02dfb53d10cf9f23daadb3b3c4fb887461eda16f Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Fri, 16 Dec 2022 17:21:18 +0300 Subject: [PATCH 08/10] Add test case with AwaitWithTimeout + memory leak tests --- tests/Fixtures/Splitter.php | 30 ++++--- .../src/Workflow/AwaitWithTimeoutWorkflow.php | 36 ++++++++ tests/Functional/WorkflowTestCase.php | 86 +++++++++++++++++++ 3 files changed, 142 insertions(+), 10 deletions(-) create mode 100644 tests/Fixtures/src/Workflow/AwaitWithTimeoutWorkflow.php diff --git a/tests/Fixtures/Splitter.php b/tests/Fixtures/Splitter.php index edf0e5f2..fe10cb4d 100644 --- a/tests/Fixtures/Splitter.php +++ b/tests/Fixtures/Splitter.php @@ -16,8 +16,8 @@ */ class Splitter { - /** @var string */ - private string $filename; + /** @var string[] */ + private array $lines; /** @var array */ private array $in = []; @@ -28,9 +28,9 @@ class Splitter /** * @param string $filename */ - public function __construct(string $filename) + public function __construct(array $lines) { - $this->filename = $filename; + $this->lines = $lines; $this->parse(); } @@ -49,12 +49,10 @@ public function getQueue(): array */ private function parse() { - $lines = file($this->filename); - // skip get worker info $offset = 0; - while (isset($lines[$offset])) { - $line = $lines[$offset]; + while (isset($this->lines[$offset])) { + $line = $this->lines[$offset]; if (preg_match('/(?:\[0m\t)(\[.*\])\s*({.*})(?:[\r\n]*)$/', $line, $matches)) { $ctx = json_decode($matches[2], true); @@ -72,10 +70,22 @@ private function parse() } /** + * Create from file + * + * @return Splitter + */ + public static function create(string $name): self + { + return new self(file(__DIR__ . '/data/' . $name)); + } + + /** + * Create from text block + * * @return Splitter */ - public static function create(string $name) + public static function createFromString(string $text): self { - return new self(__DIR__ . '/data/' . $name); + return new self(\explode("\n", $text)); } } diff --git a/tests/Fixtures/src/Workflow/AwaitWithTimeoutWorkflow.php b/tests/Fixtures/src/Workflow/AwaitWithTimeoutWorkflow.php new file mode 100644 index 00000000..3cb3cb10 --- /dev/null +++ b/tests/Fixtures/src/Workflow/AwaitWithTimeoutWorkflow.php @@ -0,0 +1,36 @@ + false, + ); + + yield Workflow::awaitWithTimeout( + 20, + Workflow::awaitWithTimeout(500, fn() => false), + Workflow::awaitWithTimeout(120, fn() => false), + ); + + return 'ok'; + } +} diff --git a/tests/Functional/WorkflowTestCase.php b/tests/Functional/WorkflowTestCase.php index de78b702..45f37ba5 100644 --- a/tests/Functional/WorkflowTestCase.php +++ b/tests/Functional/WorkflowTestCase.php @@ -11,6 +11,8 @@ namespace Temporal\Tests\Functional; +use Temporal\Common\Uuid; +use Temporal\Tests\Fixtures\CommandResetter; use Temporal\Tests\Fixtures\Splitter; use Temporal\Tests\Fixtures\WorkerMock; @@ -223,4 +225,88 @@ public function testBatchedSignal_Combined() $worker->run($this, Splitter::create('Test_BatchedSignal_01.log')->getQueue()); } + + /** + * Destroy workflow with a started awaitWithTimeout promise inside. + */ + public function testAwaitWithTimeout() + { + $worker = WorkerMock::createMock(); + + $id = 9001; + $uuid1 = Uuid::v4(); + $uuid2 = Uuid::v4(); + $log = <<run($this, Splitter::createFromString($log)->getQueue()); + } + + /** + * Destroy 100 workflows with a started awaitWithTimeout promise inside. + * The promise will be annihilated on the workflow destroy. + * There mustn't be any leaks. + */ + public function testAwaitWithTimeout_Leaks() + { + $worker = WorkerMock::createMock(); + + // Run the workflow $i times + for ($id = 9001, $i = 0; $i < 100; ++$i, ++$id) { + $uuid1 = Uuid::v4(); + $uuid2 = Uuid::v4(); + $log = <<run($this, Splitter::createFromString($log)->getQueue()); + $before ??= \memory_get_usage(); + } + $after = \memory_get_usage(); + + $this->assertSame(0, $after - $before); + } + + /** + * Destroy 100 workflows with started few awaitWithTimeout promises inside. + * The promises will be annihilated on the workflow destroy. + * There mustn't be any leaks. + */ + public function testAwaitWithFewParallelTimeouts_Leaks() + { + $worker = WorkerMock::createMock(); + + // Run the workflow $i times + for ($id = 9001, $i = 0; $i < 100; ++$i, ++$id) { + $uuid1 = Uuid::v4(); + $uuid2 = Uuid::v4(); + $id1 = $id; + $id2 = ++$id; + $id3 = ++$id; + $id4 = ++$id; + $log = <<run($this, Splitter::createFromString($log)->getQueue()); + $before ??= \memory_get_usage(); + } + $after = \memory_get_usage(); + + $this->assertSame(0, $after - $before); + } } From cb78de28be68c7bc2e928445dbcd9a2932b3a97f Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Fri, 16 Dec 2022 19:07:36 +0300 Subject: [PATCH 09/10] Add test case with `wf::timer` and `wf::await` to confirm that `WorkflowContext::$timer` isn't necessary now --- .../src/Workflow/TimerWayWorkflow.php | 34 +++++++++++++++++++ tests/Fixtures/src/Workflow/TimerWorkflow.php | 3 ++ tests/Functional/Client/AwaitTestCase.php | 10 ++++++ 3 files changed, 47 insertions(+) create mode 100644 tests/Fixtures/src/Workflow/TimerWayWorkflow.php diff --git a/tests/Fixtures/src/Workflow/TimerWayWorkflow.php b/tests/Fixtures/src/Workflow/TimerWayWorkflow.php new file mode 100644 index 00000000..983e43ea --- /dev/null +++ b/tests/Fixtures/src/Workflow/TimerWayWorkflow.php @@ -0,0 +1,34 @@ +then(function () use (&$timerResolved) { + $timerResolved = true; + }); + + yield Workflow::await($timer, fn() => true); + + return $timerResolved; + } +} diff --git a/tests/Fixtures/src/Workflow/TimerWorkflow.php b/tests/Fixtures/src/Workflow/TimerWorkflow.php index 4260a3f2..b82ac67f 100644 --- a/tests/Fixtures/src/Workflow/TimerWorkflow.php +++ b/tests/Fixtures/src/Workflow/TimerWorkflow.php @@ -16,6 +16,9 @@ use Temporal\Workflow\WorkflowMethod; use Temporal\Tests\Activity\SimpleActivity; +/** + * @see \Temporal\Tests\Functional\WorkflowTestCase::testTimer() + */ #[Workflow\WorkflowInterface] class TimerWorkflow { diff --git a/tests/Functional/Client/AwaitTestCase.php b/tests/Functional/Client/AwaitTestCase.php index 0b4f1a5a..2b4614a7 100644 --- a/tests/Functional/Client/AwaitTestCase.php +++ b/tests/Functional/Client/AwaitTestCase.php @@ -20,6 +20,7 @@ use Temporal\Tests\Workflow\AggregatedWorkflow; use Temporal\Tests\Workflow\LoopWithSignalCoroutinesWorkflow; use Temporal\Tests\Workflow\LoopWorkflow; +use Temporal\Tests\Workflow\TimerWayWorkflow; use Temporal\Tests\Workflow\WaitWorkflow; use Temporal\Workflow\WorkflowStub; @@ -189,4 +190,13 @@ public function testCancelAwait() $this->assertInstanceOf(CanceledFailure::class, $e->getPrevious()); } } + + public function testTimer(): void + { + $client = $this->createClient(); + $wait = $client->newWorkflowStub(TimerWayWorkflow::class); + $run = $client->start($wait); + + $this->assertFalse($run->getResult()); + } } From 9ee9beba77f447757d2c060926e88d97cb1db751 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Fri, 16 Dec 2022 19:27:26 +0300 Subject: [PATCH 10/10] Add test with single promise inside Workflow::await --- src/Internal/Workflow/WorkflowContext.php | 6 +-- .../AwaitWithSingleTimeoutWorkflow.php | 27 +++++++++++ tests/Functional/WorkflowTestCase.php | 48 ++++++++++++++++--- 3 files changed, 72 insertions(+), 9 deletions(-) create mode 100644 tests/Fixtures/src/Workflow/AwaitWithSingleTimeoutWorkflow.php diff --git a/src/Internal/Workflow/WorkflowContext.php b/src/Internal/Workflow/WorkflowContext.php index 847051f4..2cc3ab8c 100644 --- a/src/Internal/Workflow/WorkflowContext.php +++ b/src/Internal/Workflow/WorkflowContext.php @@ -449,9 +449,9 @@ public function await(...$conditions): PromiseInterface } } - // if (\count($result) === 1) { - // return $result[0]; - // } + if (\count($result) === 1) { + return $result[0]; + } return Promise::any($result)->then( function ($result) use ($conditionGroupId) { diff --git a/tests/Fixtures/src/Workflow/AwaitWithSingleTimeoutWorkflow.php b/tests/Fixtures/src/Workflow/AwaitWithSingleTimeoutWorkflow.php new file mode 100644 index 00000000..921cc81e --- /dev/null +++ b/tests/Fixtures/src/Workflow/AwaitWithSingleTimeoutWorkflow.php @@ -0,0 +1,27 @@ +run($this, Splitter::createFromString($log)->getQueue()); + $before ??= \memory_get_usage(); + } + $after = \memory_get_usage(); + + $this->assertSame(0, $after - $before); + } + + /** + * Destroy 100 workflows with single promise inside Workflow::await. + * That case mustn't leak. + * @see \Temporal\Tests\Workflow\AwaitWithSingleTimeoutWorkflow + */ + public function testAwaitWithOneTimer_Leaks() + { + $worker = WorkerMock::createMock(); + + // Run the workflow $i times + for ($id = 9000, $i = 0; $i < 100; ++$i) { + $uuid1 = Uuid::v4(); + $uuid2 = Uuid::v4(); + $id1 = ++$id; + $log = <<