Skip to content

Commit

Permalink
Merge pull request #268 from temporalio/hotfix/add-gc: fix memory lea…
Browse files Browse the repository at this point in the history
…k when a workflow with an active internal timer is destroyed

- Fixed memory leaks in the case when an uncompleted workflow with an active internal timer (see awaitWithTimeout) is destroyed.
- Call `gc_collect_cycles()` after each Workflow destroy.
- Remove the `WorkflowContext::$timers` collection. Ut is unnecessary since Temporal PHP SDK 2.0 .
- Remove the `WorkflowContext::$asyncAwaits` collection because it is unnecessary.
- Remove unused $loop parameter from the `Temporal\Internal\Transport\Client` constructor.
- Other small improvements.
  • Loading branch information
roxblnfk authored Dec 19, 2022
2 parents 1d07158 + 9ee9beb commit 1a95df8
Show file tree
Hide file tree
Showing 13 changed files with 305 additions and 167 deletions.
5 changes: 1 addition & 4 deletions src/Internal/Transport/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,14 @@ final class Client implements ClientInterface
'a request with that identifier was not sent';

private QueueInterface $queue;
private LoopInterface $loop;
private array $requests = [];

/**
* @param QueueInterface $queue
* @param LoopInterface $loop
*/
public function __construct(QueueInterface $queue, LoopInterface $loop)
public function __construct(QueueInterface $queue)
{
$this->queue = $queue;
$this->loop = $loop;
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/Internal/Transport/Router/DestroyWorkflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public function handle(RequestInterface $request, array $headers, Deferred $reso
$this->kill($runId);

$resolver->resolve(EncodedValues::fromValues([null]));

\gc_collect_cycles();
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/Internal/Workflow/Process/Scope.php
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public function then(
*/
public function onAwait(Deferred $deferred): void
{
$this->onCancel[++$this->cancelID] = function (\Throwable $e = null) use ($deferred): void {
$this->onCancel[++$this->cancelID] = static function (\Throwable $e = null) use ($deferred): void {
$deferred->reject($e ?? new CanceledFailure(''));
};

Expand Down
30 changes: 1 addition & 29 deletions src/Internal/Workflow/ScopeContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use Temporal\Exception\Failure\CanceledFailure;
use Temporal\Internal\Support\DateInterval;
use Temporal\Internal\Transport\CompletableResult;
use Temporal\Internal\Transport\Request\NewTimer;
use Temporal\Internal\Workflow\Process\Scope;
use Temporal\Worker\Transport\Command\RequestInterface;
use Temporal\Workflow\CancellationScopeInterface;
Expand All @@ -28,6 +26,7 @@ class ScopeContext extends WorkflowContext implements ScopedContextInterface
{
private WorkflowContext $parent;
private Scope $scope;
/** @var callable */
private $onRequest;

/**
Expand Down Expand Up @@ -120,21 +119,6 @@ protected function addCondition(string $conditionGroupId, callable $condition):
);
}

protected function addAsyncCondition(string $conditionGroupId, PromiseInterface $condition): PromiseInterface
{
$this->parent->asyncAwaits[$conditionGroupId][] = $condition;

return $condition->then(
function ($result) use ($conditionGroupId) {
$this->resolveConditionGroup($conditionGroupId);
return $result;
},
function () use ($conditionGroupId) {
$this->rejectConditionGroup($conditionGroupId);
}
);
}

/**
* Calculate unblocked conditions.
*/
Expand All @@ -153,18 +137,6 @@ public function rejectConditionGroup(string $conditionGroupId): void
$this->parent->rejectConditionGroup($conditionGroupId);
}

/**
* {@inheritDoc}
*/
public function timer($interval): PromiseInterface
{
$request = new NewTimer(DateInterval::parse($interval, DateInterval::FORMAT_SECONDS));
$result = $this->request($request);
$this->parent->timers->attach($result, $request);

return $result;
}

/**
* {@inheritDoc}
*/
Expand Down
125 changes: 44 additions & 81 deletions src/Internal/Workflow/WorkflowContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
use Temporal\Internal\Transport\Request\Panic;
use Temporal\Internal\Transport\Request\SideEffect;
use Temporal\Promise;
use Temporal\Worker\Transport\Command\CommandInterface;
use Temporal\Worker\Transport\Command\RequestInterface;
use Temporal\Workflow\ActivityStubInterface;
use Temporal\Workflow\ChildWorkflowOptions;
Expand All @@ -59,12 +58,11 @@ class WorkflowContext implements WorkflowContextInterface
protected WorkflowInstanceInterface $workflowInstance;
protected ?ValuesInterface $lastCompletionResult = null;

protected array $awaits = [];
protected array $asyncAwaits = [];
/**
* @var <CompletableResultInterface, CommandInterface>
* Contains conditional groups that contains tuple of a condition callable and its promise
* @var array<non-empty-string, array<int<0, max>, array{callable, Deferred}>>
*/
protected \SplObjectStorage $timers;
protected array $awaits = [];

private array $trace = [];
private bool $continueAsNew = false;
Expand All @@ -89,7 +87,6 @@ public function __construct(
$this->workflowInstance = $workflowInstance;
$this->input = $input;
$this->lastCompletionResult = $lastCompletionResult;
$this->timers = new \SplObjectStorage();
}

/**
Expand Down Expand Up @@ -396,10 +393,7 @@ public function newActivityStub(string $class, ActivityOptionsInterface $options
public function timer($interval): PromiseInterface
{
$request = new NewTimer(DateInterval::parse($interval, DateInterval::FORMAT_SECONDS));
$result = $this->request($request);
$this->timers->attach($result, $request);

return $result;
return $this->request($request);
}

/**
Expand Down Expand Up @@ -447,32 +441,58 @@ public function await(...$conditions): PromiseInterface
return resolve(true);
}
$result[] = $this->addCondition($conditionGroupId, $condition);
continue;
}

if ($condition instanceof PromiseInterface)
{
$result[] = $this->addAsyncCondition($conditionGroupId, $condition);
if ($condition instanceof PromiseInterface) {
$result[] = $condition;
}
}

if (\count($result) === 1) {
return $result[0];
}

return Promise::any($result);
return Promise::any($result)->then(
function ($result) use ($conditionGroupId) {
$this->resolveConditionGroup($conditionGroupId);
return $result;
},
function ($reason) use ($conditionGroupId) {
$this->rejectConditionGroup($conditionGroupId);
// Throw the first reason
// It need to avoid memory leak when the related workflow is destroyed
if (\is_iterable($reason)) {
foreach ($reason as $exception) {
if ($exception instanceof \Throwable) {
throw $exception;
}
}
}
},
);
}

/**
* {@inheritDoc}
*/
public function awaitWithTimeout($interval, ...$conditions): PromiseInterface
{
$timer = $this->timer($interval);

$conditions[] = $timer;

return $this->await(...$conditions)
->then(static fn (): bool => !$timer->isComplete());
/** Bypassing {@see timer()} to acquire a timer request ID */
$request = new NewTimer(DateInterval::parse($interval, DateInterval::FORMAT_SECONDS));
$requestId = $request->getID();
$timer = $this->request($request);
\assert($timer instanceof CompletableResultInterface);

return $this->await($timer, ...$conditions)
->then(function () use ($timer, $requestId): bool {
$isCompleted = $timer->isComplete();
if (!$isCompleted) {
// If internal timer was not completed then cancel it
$this->request(new Cancel($requestId));
}
return !$isCompleted;
});
}

/**
Expand All @@ -481,8 +501,7 @@ public function awaitWithTimeout($interval, ...$conditions): PromiseInterface
public function resolveConditions(): void
{
foreach ($this->awaits as $awaitsGroupId => $awaitsGroup) {
foreach ($awaitsGroup as $i => $cond) {
[$condition, $deferred] = $cond;
foreach ($awaitsGroup as $i => [$condition, $deferred]) {
if ($condition()) {
$deferred->resolve();
unset($this->awaits[$awaitsGroupId][$i]);
Expand All @@ -493,7 +512,7 @@ public function resolveConditions(): void
}

/**
* @param string $conditionGroupId
* @param non-empty-string $conditionGroupId
* @param callable $condition
* @return PromiseInterface
*/
Expand All @@ -505,20 +524,6 @@ protected function addCondition(string $conditionGroupId, callable $condition):
return $deferred->promise();
}

protected function addAsyncCondition(string $conditionGroupId, PromiseInterface $condition): PromiseInterface
{
$this->asyncAwaits[$conditionGroupId][] = $condition;
return $condition->then(
function ($result) use ($conditionGroupId) {
$this->resolveConditionGroup($conditionGroupId);
return $result;
},
function () use ($conditionGroupId) {
$this->rejectConditionGroup($conditionGroupId);
}
);
}

/**
* Record last stack trace of the call.
*
Expand All @@ -531,53 +536,11 @@ protected function recordTrace(): void

public function resolveConditionGroup(string $conditionGroupId): void
{
// First resolve pending promises
if (isset($this->awaits[$conditionGroupId])) {
foreach ($this->awaits[$conditionGroupId] as $i => $cond) {
[$_, $deferred] = $cond;
unset($this->awaits[$conditionGroupId][$i]);
$deferred->resolve();
}
unset($this->awaits[$conditionGroupId]);
}

$this->clearAsyncAwaits($conditionGroupId);
unset($this->awaits[$conditionGroupId]);
}

public function rejectConditionGroup(string $conditionGroupId): void
{
if (isset($this->awaits[$conditionGroupId])) {
foreach ($this->awaits[$conditionGroupId] as $i => $cond) {
[$_, $deferred] = $cond;
unset($this->awaits[$conditionGroupId][$i]);
$deferred->reject();
}
unset($this->awaits[$conditionGroupId]);
}

$this->clearAsyncAwaits($conditionGroupId);
}

private function clearAsyncAwaits(string $conditionGroupId): void
{
// Check pending timers in this group
if (!isset($this->asyncAwaits[$conditionGroupId])) {
return;
}

// Then cancel any pending timers if exist
foreach ($this->asyncAwaits[$conditionGroupId] as $index => $awaitCondition) {
if (!$awaitCondition->isComplete()) {
/** @var NewTimer $timer */
$timer = $this->timers->offsetGet($awaitCondition);
if ($timer !== null) {
$request = new Cancel($timer->getID());
$this->request($request);
$this->timers->offsetUnset($awaitCondition);
}
}
unset($this->asyncAwaits[$conditionGroupId][$index]);
}
unset($this->asyncAwaits[$conditionGroupId]);
unset($this->awaits[$conditionGroupId]);
}
}
30 changes: 20 additions & 10 deletions tests/Fixtures/Splitter.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
class Splitter
{
/** @var string */
private string $filename;
/** @var string[] */
private array $lines;

/** @var array */
private array $in = [];
Expand All @@ -28,9 +28,9 @@ class Splitter
/**
* @param string $filename
*/
public function __construct(string $filename)
public function __construct(array $lines)
{
$this->filename = $filename;
$this->lines = $lines;
$this->parse();
}

Expand All @@ -49,12 +49,10 @@ public function getQueue(): array
*/
private function parse()
{
$lines = file($this->filename);

// skip get worker info
$offset = 0;
while (isset($lines[$offset])) {
$line = $lines[$offset];
while (isset($this->lines[$offset])) {
$line = $this->lines[$offset];

if (preg_match('/(?:\[0m\t)(\[.*\])\s*({.*})(?:[\r\n]*)$/', $line, $matches)) {
$ctx = json_decode($matches[2], true);
Expand All @@ -72,10 +70,22 @@ private function parse()
}

/**
* Create from file
*
* @return Splitter
*/
public static function create(string $name): self
{
return new self(file(__DIR__ . '/data/' . $name));
}

/**
* Create from text block
*
* @return Splitter
*/
public static function create(string $name)
public static function createFromString(string $text): self
{
return new self(__DIR__ . '/data/' . $name);
return new self(\explode("\n", $text));
}
}
27 changes: 27 additions & 0 deletions tests/Fixtures/src/Workflow/AwaitWithSingleTimeoutWorkflow.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

/**
* This file is part of Temporal package.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Temporal\Tests\Workflow;

use Temporal\Workflow;
use Temporal\Workflow\WorkflowMethod;

#[Workflow\WorkflowInterface]
class AwaitWithSingleTimeoutWorkflow
{
#[WorkflowMethod()]
public function handler()
{
yield Workflow::await(Workflow::timer(5000));

return 'ok';
}
}
Loading

0 comments on commit 1a95df8

Please sign in to comment.