diff --git a/src/Internal/Transport/Client.php b/src/Internal/Transport/Client.php index a02748f7..cd2f1dd7 100644 --- a/src/Internal/Transport/Client.php +++ b/src/Internal/Transport/Client.php @@ -37,17 +37,14 @@ final class Client implements ClientInterface 'a request with that identifier was not sent'; private QueueInterface $queue; - private LoopInterface $loop; private array $requests = []; /** * @param QueueInterface $queue - * @param LoopInterface $loop */ - public function __construct(QueueInterface $queue, LoopInterface $loop) + public function __construct(QueueInterface $queue) { $this->queue = $queue; - $this->loop = $loop; } /** diff --git a/src/Internal/Transport/Router/DestroyWorkflow.php b/src/Internal/Transport/Router/DestroyWorkflow.php index 7f92a035..ec257969 100644 --- a/src/Internal/Transport/Router/DestroyWorkflow.php +++ b/src/Internal/Transport/Router/DestroyWorkflow.php @@ -34,6 +34,8 @@ public function handle(RequestInterface $request, array $headers, Deferred $reso $this->kill($runId); $resolver->resolve(EncodedValues::fromValues([null])); + + \gc_collect_cycles(); } /** diff --git a/src/Internal/Workflow/Process/Scope.php b/src/Internal/Workflow/Process/Scope.php index 8fdf9ff8..cdcffdf4 100644 --- a/src/Internal/Workflow/Process/Scope.php +++ b/src/Internal/Workflow/Process/Scope.php @@ -273,7 +273,7 @@ public function then( */ public function onAwait(Deferred $deferred): void { - $this->onCancel[++$this->cancelID] = function (\Throwable $e = null) use ($deferred): void { + $this->onCancel[++$this->cancelID] = static function (\Throwable $e = null) use ($deferred): void { $deferred->reject($e ?? new CanceledFailure('')); }; diff --git a/src/Internal/Workflow/ScopeContext.php b/src/Internal/Workflow/ScopeContext.php index fde43285..89a276a4 100644 --- a/src/Internal/Workflow/ScopeContext.php +++ b/src/Internal/Workflow/ScopeContext.php @@ -14,9 +14,7 @@ use React\Promise\Deferred; use React\Promise\PromiseInterface; use Temporal\Exception\Failure\CanceledFailure; -use Temporal\Internal\Support\DateInterval; use Temporal\Internal\Transport\CompletableResult; -use Temporal\Internal\Transport\Request\NewTimer; use Temporal\Internal\Workflow\Process\Scope; use Temporal\Worker\Transport\Command\RequestInterface; use Temporal\Workflow\CancellationScopeInterface; @@ -28,6 +26,7 @@ class ScopeContext extends WorkflowContext implements ScopedContextInterface { private WorkflowContext $parent; private Scope $scope; + /** @var callable */ private $onRequest; /** @@ -120,21 +119,6 @@ protected function addCondition(string $conditionGroupId, callable $condition): ); } - protected function addAsyncCondition(string $conditionGroupId, PromiseInterface $condition): PromiseInterface - { - $this->parent->asyncAwaits[$conditionGroupId][] = $condition; - - return $condition->then( - function ($result) use ($conditionGroupId) { - $this->resolveConditionGroup($conditionGroupId); - return $result; - }, - function () use ($conditionGroupId) { - $this->rejectConditionGroup($conditionGroupId); - } - ); - } - /** * Calculate unblocked conditions. */ @@ -153,18 +137,6 @@ public function rejectConditionGroup(string $conditionGroupId): void $this->parent->rejectConditionGroup($conditionGroupId); } - /** - * {@inheritDoc} - */ - public function timer($interval): PromiseInterface - { - $request = new NewTimer(DateInterval::parse($interval, DateInterval::FORMAT_SECONDS)); - $result = $this->request($request); - $this->parent->timers->attach($result, $request); - - return $result; - } - /** * {@inheritDoc} */ diff --git a/src/Internal/Workflow/WorkflowContext.php b/src/Internal/Workflow/WorkflowContext.php index 454c69cf..2cc3ab8c 100644 --- a/src/Internal/Workflow/WorkflowContext.php +++ b/src/Internal/Workflow/WorkflowContext.php @@ -35,7 +35,6 @@ use Temporal\Internal\Transport\Request\Panic; use Temporal\Internal\Transport\Request\SideEffect; use Temporal\Promise; -use Temporal\Worker\Transport\Command\CommandInterface; use Temporal\Worker\Transport\Command\RequestInterface; use Temporal\Workflow\ActivityStubInterface; use Temporal\Workflow\ChildWorkflowOptions; @@ -59,12 +58,11 @@ class WorkflowContext implements WorkflowContextInterface protected WorkflowInstanceInterface $workflowInstance; protected ?ValuesInterface $lastCompletionResult = null; - protected array $awaits = []; - protected array $asyncAwaits = []; /** - * @var + * Contains conditional groups that contains tuple of a condition callable and its promise + * @var array, array{callable, Deferred}>> */ - protected \SplObjectStorage $timers; + protected array $awaits = []; private array $trace = []; private bool $continueAsNew = false; @@ -89,7 +87,6 @@ public function __construct( $this->workflowInstance = $workflowInstance; $this->input = $input; $this->lastCompletionResult = $lastCompletionResult; - $this->timers = new \SplObjectStorage(); } /** @@ -396,10 +393,7 @@ public function newActivityStub(string $class, ActivityOptionsInterface $options public function timer($interval): PromiseInterface { $request = new NewTimer(DateInterval::parse($interval, DateInterval::FORMAT_SECONDS)); - $result = $this->request($request); - $this->timers->attach($result, $request); - - return $result; + return $this->request($request); } /** @@ -447,11 +441,11 @@ public function await(...$conditions): PromiseInterface return resolve(true); } $result[] = $this->addCondition($conditionGroupId, $condition); + continue; } - if ($condition instanceof PromiseInterface) - { - $result[] = $this->addAsyncCondition($conditionGroupId, $condition); + if ($condition instanceof PromiseInterface) { + $result[] = $condition; } } @@ -459,7 +453,24 @@ public function await(...$conditions): PromiseInterface return $result[0]; } - return Promise::any($result); + return Promise::any($result)->then( + function ($result) use ($conditionGroupId) { + $this->resolveConditionGroup($conditionGroupId); + return $result; + }, + function ($reason) use ($conditionGroupId) { + $this->rejectConditionGroup($conditionGroupId); + // Throw the first reason + // It need to avoid memory leak when the related workflow is destroyed + if (\is_iterable($reason)) { + foreach ($reason as $exception) { + if ($exception instanceof \Throwable) { + throw $exception; + } + } + } + }, + ); } /** @@ -467,12 +478,21 @@ public function await(...$conditions): PromiseInterface */ public function awaitWithTimeout($interval, ...$conditions): PromiseInterface { - $timer = $this->timer($interval); - - $conditions[] = $timer; - - return $this->await(...$conditions) - ->then(static fn (): bool => !$timer->isComplete()); + /** Bypassing {@see timer()} to acquire a timer request ID */ + $request = new NewTimer(DateInterval::parse($interval, DateInterval::FORMAT_SECONDS)); + $requestId = $request->getID(); + $timer = $this->request($request); + \assert($timer instanceof CompletableResultInterface); + + return $this->await($timer, ...$conditions) + ->then(function () use ($timer, $requestId): bool { + $isCompleted = $timer->isComplete(); + if (!$isCompleted) { + // If internal timer was not completed then cancel it + $this->request(new Cancel($requestId)); + } + return !$isCompleted; + }); } /** @@ -481,8 +501,7 @@ public function awaitWithTimeout($interval, ...$conditions): PromiseInterface public function resolveConditions(): void { foreach ($this->awaits as $awaitsGroupId => $awaitsGroup) { - foreach ($awaitsGroup as $i => $cond) { - [$condition, $deferred] = $cond; + foreach ($awaitsGroup as $i => [$condition, $deferred]) { if ($condition()) { $deferred->resolve(); unset($this->awaits[$awaitsGroupId][$i]); @@ -493,7 +512,7 @@ public function resolveConditions(): void } /** - * @param string $conditionGroupId + * @param non-empty-string $conditionGroupId * @param callable $condition * @return PromiseInterface */ @@ -505,20 +524,6 @@ protected function addCondition(string $conditionGroupId, callable $condition): return $deferred->promise(); } - protected function addAsyncCondition(string $conditionGroupId, PromiseInterface $condition): PromiseInterface - { - $this->asyncAwaits[$conditionGroupId][] = $condition; - return $condition->then( - function ($result) use ($conditionGroupId) { - $this->resolveConditionGroup($conditionGroupId); - return $result; - }, - function () use ($conditionGroupId) { - $this->rejectConditionGroup($conditionGroupId); - } - ); - } - /** * Record last stack trace of the call. * @@ -531,53 +536,11 @@ protected function recordTrace(): void public function resolveConditionGroup(string $conditionGroupId): void { - // First resolve pending promises - if (isset($this->awaits[$conditionGroupId])) { - foreach ($this->awaits[$conditionGroupId] as $i => $cond) { - [$_, $deferred] = $cond; - unset($this->awaits[$conditionGroupId][$i]); - $deferred->resolve(); - } - unset($this->awaits[$conditionGroupId]); - } - - $this->clearAsyncAwaits($conditionGroupId); + unset($this->awaits[$conditionGroupId]); } public function rejectConditionGroup(string $conditionGroupId): void { - if (isset($this->awaits[$conditionGroupId])) { - foreach ($this->awaits[$conditionGroupId] as $i => $cond) { - [$_, $deferred] = $cond; - unset($this->awaits[$conditionGroupId][$i]); - $deferred->reject(); - } - unset($this->awaits[$conditionGroupId]); - } - - $this->clearAsyncAwaits($conditionGroupId); - } - - private function clearAsyncAwaits(string $conditionGroupId): void - { - // Check pending timers in this group - if (!isset($this->asyncAwaits[$conditionGroupId])) { - return; - } - - // Then cancel any pending timers if exist - foreach ($this->asyncAwaits[$conditionGroupId] as $index => $awaitCondition) { - if (!$awaitCondition->isComplete()) { - /** @var NewTimer $timer */ - $timer = $this->timers->offsetGet($awaitCondition); - if ($timer !== null) { - $request = new Cancel($timer->getID()); - $this->request($request); - $this->timers->offsetUnset($awaitCondition); - } - } - unset($this->asyncAwaits[$conditionGroupId][$index]); - } - unset($this->asyncAwaits[$conditionGroupId]); + unset($this->awaits[$conditionGroupId]); } } diff --git a/tests/Fixtures/Splitter.php b/tests/Fixtures/Splitter.php index edf0e5f2..fe10cb4d 100644 --- a/tests/Fixtures/Splitter.php +++ b/tests/Fixtures/Splitter.php @@ -16,8 +16,8 @@ */ class Splitter { - /** @var string */ - private string $filename; + /** @var string[] */ + private array $lines; /** @var array */ private array $in = []; @@ -28,9 +28,9 @@ class Splitter /** * @param string $filename */ - public function __construct(string $filename) + public function __construct(array $lines) { - $this->filename = $filename; + $this->lines = $lines; $this->parse(); } @@ -49,12 +49,10 @@ public function getQueue(): array */ private function parse() { - $lines = file($this->filename); - // skip get worker info $offset = 0; - while (isset($lines[$offset])) { - $line = $lines[$offset]; + while (isset($this->lines[$offset])) { + $line = $this->lines[$offset]; if (preg_match('/(?:\[0m\t)(\[.*\])\s*({.*})(?:[\r\n]*)$/', $line, $matches)) { $ctx = json_decode($matches[2], true); @@ -72,10 +70,22 @@ private function parse() } /** + * Create from file + * + * @return Splitter + */ + public static function create(string $name): self + { + return new self(file(__DIR__ . '/data/' . $name)); + } + + /** + * Create from text block + * * @return Splitter */ - public static function create(string $name) + public static function createFromString(string $text): self { - return new self(__DIR__ . '/data/' . $name); + return new self(\explode("\n", $text)); } } diff --git a/tests/Fixtures/src/Workflow/AwaitWithSingleTimeoutWorkflow.php b/tests/Fixtures/src/Workflow/AwaitWithSingleTimeoutWorkflow.php new file mode 100644 index 00000000..921cc81e --- /dev/null +++ b/tests/Fixtures/src/Workflow/AwaitWithSingleTimeoutWorkflow.php @@ -0,0 +1,27 @@ + false, + ); + + yield Workflow::awaitWithTimeout( + 20, + Workflow::awaitWithTimeout(500, fn() => false), + Workflow::awaitWithTimeout(120, fn() => false), + ); + + return 'ok'; + } +} diff --git a/tests/Fixtures/src/Workflow/TimerWayWorkflow.php b/tests/Fixtures/src/Workflow/TimerWayWorkflow.php new file mode 100644 index 00000000..983e43ea --- /dev/null +++ b/tests/Fixtures/src/Workflow/TimerWayWorkflow.php @@ -0,0 +1,34 @@ +then(function () use (&$timerResolved) { + $timerResolved = true; + }); + + yield Workflow::await($timer, fn() => true); + + return $timerResolved; + } +} diff --git a/tests/Fixtures/src/Workflow/TimerWorkflow.php b/tests/Fixtures/src/Workflow/TimerWorkflow.php index 4260a3f2..b82ac67f 100644 --- a/tests/Fixtures/src/Workflow/TimerWorkflow.php +++ b/tests/Fixtures/src/Workflow/TimerWorkflow.php @@ -16,6 +16,9 @@ use Temporal\Workflow\WorkflowMethod; use Temporal\Tests\Activity\SimpleActivity; +/** + * @see \Temporal\Tests\Functional\WorkflowTestCase::testTimer() + */ #[Workflow\WorkflowInterface] class TimerWorkflow { diff --git a/tests/Functional/Client/AwaitTestCase.php b/tests/Functional/Client/AwaitTestCase.php index 0b4f1a5a..2b4614a7 100644 --- a/tests/Functional/Client/AwaitTestCase.php +++ b/tests/Functional/Client/AwaitTestCase.php @@ -20,6 +20,7 @@ use Temporal\Tests\Workflow\AggregatedWorkflow; use Temporal\Tests\Workflow\LoopWithSignalCoroutinesWorkflow; use Temporal\Tests\Workflow\LoopWorkflow; +use Temporal\Tests\Workflow\TimerWayWorkflow; use Temporal\Tests\Workflow\WaitWorkflow; use Temporal\Workflow\WorkflowStub; @@ -189,4 +190,13 @@ public function testCancelAwait() $this->assertInstanceOf(CanceledFailure::class, $e->getPrevious()); } } + + public function testTimer(): void + { + $client = $this->createClient(); + $wait = $client->newWorkflowStub(TimerWayWorkflow::class); + $run = $client->start($wait); + + $this->assertFalse($run->getResult()); + } } diff --git a/tests/Functional/WorkflowTestCase.php b/tests/Functional/WorkflowTestCase.php index de78b702..bbe99b58 100644 --- a/tests/Functional/WorkflowTestCase.php +++ b/tests/Functional/WorkflowTestCase.php @@ -11,6 +11,8 @@ namespace Temporal\Tests\Functional; +use Temporal\Common\Uuid; +use Temporal\Tests\Fixtures\CommandResetter; use Temporal\Tests\Fixtures\Splitter; use Temporal\Tests\Fixtures\WorkerMock; @@ -223,4 +225,124 @@ public function testBatchedSignal_Combined() $worker->run($this, Splitter::create('Test_BatchedSignal_01.log')->getQueue()); } + + /** + * Destroy workflow with a started awaitWithTimeout promise inside. + * @see \Temporal\Tests\Workflow\AwaitWithTimeoutWorkflow + */ + public function testAwaitWithTimeout(): void + { + $worker = WorkerMock::createMock(); + + $id = 9001; + $uuid1 = Uuid::v4(); + $uuid2 = Uuid::v4(); + $log = <<run($this, Splitter::createFromString($log)->getQueue()); + } + + /** + * Destroy 100 workflows with a started awaitWithTimeout promise inside. + * The promise will be annihilated on the workflow destroy. + * There mustn't be any leaks. + * @see \Temporal\Tests\Workflow\AwaitWithTimeoutWorkflow + */ + public function testAwaitWithTimeout_Leaks(): void + { + $worker = WorkerMock::createMock(); + + // Run the workflow $i times + for ($id = 9001, $i = 0; $i < 100; ++$i, ++$id) { + $uuid1 = Uuid::v4(); + $uuid2 = Uuid::v4(); + $log = <<run($this, Splitter::createFromString($log)->getQueue()); + $before ??= \memory_get_usage(); + } + $after = \memory_get_usage(); + + $this->assertSame(0, $after - $before); + } + + /** + * Destroy 100 workflows with started few awaitWithTimeout promises inside. + * The promises will be annihilated on the workflow destroy. + * There mustn't be any leaks. + * @see \Temporal\Tests\Workflow\AwaitWithTimeoutWorkflow + */ + public function testAwaitWithFewParallelTimeouts_Leaks(): void + { + $worker = WorkerMock::createMock(); + + // Run the workflow $i times + for ($id = 9000, $i = 0; $i < 100; ++$i) { + $uuid1 = Uuid::v4(); + $uuid2 = Uuid::v4(); + $id1 = ++$id; + $id2 = ++$id; + $id3 = ++$id; + $id4 = ++$id; + $log = <<run($this, Splitter::createFromString($log)->getQueue()); + $before ??= \memory_get_usage(); + } + $after = \memory_get_usage(); + + $this->assertSame(0, $after - $before); + } + + /** + * Destroy 100 workflows with single promise inside Workflow::await. + * That case mustn't leak. + * @see \Temporal\Tests\Workflow\AwaitWithSingleTimeoutWorkflow + */ + public function testAwaitWithOneTimer_Leaks() + { + $worker = WorkerMock::createMock(); + + // Run the workflow $i times + for ($id = 9000, $i = 0; $i < 100; ++$i) { + $uuid1 = Uuid::v4(); + $uuid2 = Uuid::v4(); + $id1 = ++$id; + $log = <<run($this, Splitter::createFromString($log)->getQueue()); + $before ??= \memory_get_usage(); + } + $after = \memory_get_usage(); + + $this->assertSame(0, $after - $before); + } } diff --git a/tests/Unit/WorkflowContext/AwaitWithTimeoutTestCase.php b/tests/Unit/WorkflowContext/AwaitWithTimeoutTestCase.php index 48a61948..15ab8be1 100644 --- a/tests/Unit/WorkflowContext/AwaitWithTimeoutTestCase.php +++ b/tests/Unit/WorkflowContext/AwaitWithTimeoutTestCase.php @@ -33,16 +33,8 @@ public function testAwaitWithTimeoutReturnsFalseIfTimeoutWasOff(): void { $this->worker->registerWorkflowObject( new - /** - * Support for PHP7.4 - * @Workflow\WorkflowInterface - */ #[Workflow\WorkflowInterface] class { - /** - * Support for PHP7.4 - * @Workflow\WorkflowMethod(name="AwaitWorkflow") - */ #[WorkflowMethod(name: 'AwaitWorkflow')] public function handler(): iterable { @@ -65,16 +57,8 @@ public function testAwaitWithTimeoutStartsTimerWithConditionIsNotMet(): void $this->worker->registerWorkflowObject( new - /** - * Support for PHP7.4 - * @Workflow\WorkflowInterface - */ #[Workflow\WorkflowInterface] class { - /** - * Support for PHP7.4 - * @Workflow\WorkflowMethod(name="AwaitWorkflow") - */ #[WorkflowMethod(name: 'AwaitWorkflow')] public function handler(): iterable { @@ -94,16 +78,8 @@ public function testAwaitWithTimeoutReturnsTrueWithMetCondition(): void { $this->worker->registerWorkflowObject( new - /** - * Support for PHP7.4 - * @Workflow\WorkflowInterface - */ #[Workflow\WorkflowInterface] class { - /** - * Support for PHP7.4 - * @Workflow\WorkflowMethod(name="AwaitWorkflow") - */ #[WorkflowMethod(name: 'AwaitWorkflow')] public function handler(): iterable { @@ -124,27 +100,17 @@ public function testTimerIsCanceledOnceConditionIsMet(): void $this->addToAssertionCount(1); $this->worker->registerWorkflowObject( new - /** - * Support for PHP7.4 - * @Workflow\WorkflowInterface - */ #[Workflow\WorkflowInterface] class { - /** - * Support for PHP7.4 - * @Workflow\WorkflowMethod(name="AwaitWorkflow") - */ + private bool $doCancel = false; #[WorkflowMethod(name: 'AwaitWorkflow')] public function handler(): iterable { - $this->doCancel = false; - - yield Workflow::awaitWithTimeout( + $result = yield Workflow::awaitWithTimeout( 50, - function() { - return $this->doCancel; - } + fn () => $this->doCancel, ); + assertTrue($result); if ($this->doCancel) { return 'CANCEL'; @@ -153,10 +119,6 @@ function() { return 'OK'; } - /** - * Support for PHP7.4 - * @Workflow\SignalMethod() - */ #[Workflow\SignalMethod] public function cancel(): void {