Skip to content

Commit

Permalink
Implement Mutex class; remove MutexInterface
Browse files Browse the repository at this point in the history
  • Loading branch information
roxblnfk committed Oct 8, 2024
1 parent dbfee5b commit ac04718
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 79 deletions.
12 changes: 1 addition & 11 deletions src/Internal/Workflow/WorkflowContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
use Temporal\Workflow\ChildWorkflowStubInterface;
use Temporal\Workflow\ContinueAsNewOptions;
use Temporal\Workflow\ExternalWorkflowStubInterface;
use Temporal\Workflow\MutexInterface;
use Temporal\Workflow\Mutex;
use Temporal\Workflow\WorkflowContextInterface;
use Temporal\Workflow\WorkflowExecution;
use Temporal\Workflow\WorkflowInfo;
Expand Down Expand Up @@ -642,16 +642,6 @@ public function uuid7(?\DateTimeInterface $dateTime = null): PromiseInterface
return $this->sideEffect(static fn(): UuidInterface => \Ramsey\Uuid\Uuid::uuid7($dateTime));
}

public function mutex(): MutexInterface
{
// todo
}

public function runLocked(MutexInterface $mutex, callable $callable): PromiseInterface
{
// todo
}

/**
* @internal
*/
Expand Down
47 changes: 13 additions & 34 deletions src/Workflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
use Temporal\Workflow\ChildWorkflowStubInterface;
use Temporal\Workflow\ContinueAsNewOptions;
use Temporal\Workflow\ExternalWorkflowStubInterface;
use Temporal\Workflow\MutexInterface;
use Temporal\Workflow\Mutex;
use Temporal\Workflow\ScopedContextInterface;
use Temporal\Workflow\UpdateContext;
use Temporal\Workflow\WorkflowContextInterface;
Expand Down Expand Up @@ -996,45 +996,24 @@ public static function uuid7(?\DateTimeInterface $dateTime = null): PromiseInter
return $context->uuid7($dateTime);
}

/**
* Create a new mutex.
*
* If a mutex is yielded without calling `lock()`, the Workflow will continue
* only when the lock is released.
*
* ```php
* $this->>mutex = Workflow::mutex();
*
* // Continue only when the lock is released
* yield $this->mutex;
*
* // Continue only when the lock is acquired
* yield Workflow::mutex('my-mutex')->lock();
* ```
*
* Note: in the last case, if the lock is already acquired, the Workflow will be blocked until it's released.
*/
public static function mutex(): MutexInterface
{
/** @var WorkflowContextInterface $context */
$context = self::getCurrentContext();

return $context->mutex();
}

/**
* Run a function when the mutex is released.
* The mutex is locked for the duration of the function.
*
* @param non-empty-string|MutexInterface $mutex Mutex name or instance.
*@see Workflow::conditionalMutex()
* @template T
* @param Mutex $mutex Mutex name or instance.
* @param callable(): T $callable Function to run.
*
* @return PromiseInterface<T>
*/
public function runLocked(string|MutexInterface $mutex, callable $callable): PromiseInterface
public function runLocked(Mutex $mutex, callable $callable): PromiseInterface
{
/** @var WorkflowContextInterface $context */
$context = self::getCurrentContext();

return $context->runLocked($mutex, $callable);
return $mutex->lock()->then(static function (Mutex $mutex) use ($callable): mixed {
try {
return $callable();
} finally {
$mutex->unlock();
}
});
}
}
82 changes: 82 additions & 0 deletions src/Workflow/Mutex.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?php

declare(strict_types=1);

namespace Temporal\Workflow;

use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use Temporal\Promise;

/**
* If a mutex is yielded without calling `lock()`, the Workflow will continue
* only when the lock is released.
*
* ```
* $this->mutex = new Mutex();
*
* // Continue only when the lock is released
* yield $this->mutex;
* ```
*/
final class Mutex
{
private bool $locked = false;

/** @var Deferred[] */
private array $waiters = [];

/**
* Lock the mutex.
*
* ```
* // Continue only when the lock is acquired
* yield $this->mutex->lock();
* ```
*
* @return PromiseInterface A promise that resolves when the lock is acquired.
*/
public function lock(): PromiseInterface
{
if (!$this->locked) {
$this->locked = true;
return Promise::resolve($this);
}

$deferred = new Deferred();
$this->waiters[] = $deferred;

return $deferred->promise();
}

/**
* Try to lock the mutex.
*
* @return bool Returns true if the mutex was successfully locked, false otherwise.
*/
public function tryLock(): bool
{
return !$this->locked and $this->locked = true;
}

/**
* Release the lock.
*/
public function unlock(): void
{
if ($this->waiters === []) {
$this->locked = false;
return;
}

\array_shift($this->waiters)->resolve($this);
}

/**
* Check if the mutex is locked.
*/
public function isLocked(): bool
{
return $this->locked;
}
}
23 changes: 0 additions & 23 deletions src/Workflow/MutexInterface.php

This file was deleted.

11 changes: 0 additions & 11 deletions src/Workflow/WorkflowContextInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -384,15 +384,4 @@ public function uuid4(): PromiseInterface;
* @return PromiseInterface<UuidInterface>
*/
public function uuid7(?\DateTimeInterface $dateTime = null): PromiseInterface;

/**
* Create a mutex.
*/
public function mutex(): MutexInterface;

/**
* Run a function when the mutex is released.
* The mutex will be locked for the duration of the function.
*/
public function runLocked(MutexInterface $mutex, callable $callable): PromiseInterface;
}

0 comments on commit ac04718

Please sign in to comment.