diff --git a/src/Internal/Workflow/WorkflowContext.php b/src/Internal/Workflow/WorkflowContext.php index 680e7d4b..3f42be58 100644 --- a/src/Internal/Workflow/WorkflowContext.php +++ b/src/Internal/Workflow/WorkflowContext.php @@ -61,7 +61,7 @@ use Temporal\Workflow\ChildWorkflowStubInterface; use Temporal\Workflow\ContinueAsNewOptions; use Temporal\Workflow\ExternalWorkflowStubInterface; -use Temporal\Workflow\MutexInterface; +use Temporal\Workflow\Mutex; use Temporal\Workflow\WorkflowContextInterface; use Temporal\Workflow\WorkflowExecution; use Temporal\Workflow\WorkflowInfo; @@ -642,16 +642,6 @@ public function uuid7(?\DateTimeInterface $dateTime = null): PromiseInterface return $this->sideEffect(static fn(): UuidInterface => \Ramsey\Uuid\Uuid::uuid7($dateTime)); } - public function mutex(): MutexInterface - { - // todo - } - - public function runLocked(MutexInterface $mutex, callable $callable): PromiseInterface - { - // todo - } - /** * @internal */ diff --git a/src/Workflow.php b/src/Workflow.php index 6e296d46..497c2f31 100644 --- a/src/Workflow.php +++ b/src/Workflow.php @@ -27,7 +27,7 @@ use Temporal\Workflow\ChildWorkflowStubInterface; use Temporal\Workflow\ContinueAsNewOptions; use Temporal\Workflow\ExternalWorkflowStubInterface; -use Temporal\Workflow\MutexInterface; +use Temporal\Workflow\Mutex; use Temporal\Workflow\ScopedContextInterface; use Temporal\Workflow\UpdateContext; use Temporal\Workflow\WorkflowContextInterface; @@ -996,45 +996,24 @@ public static function uuid7(?\DateTimeInterface $dateTime = null): PromiseInter return $context->uuid7($dateTime); } - /** - * Create a new mutex. - * - * If a mutex is yielded without calling `lock()`, the Workflow will continue - * only when the lock is released. - * - * ```php - * $this->>mutex = Workflow::mutex(); - * - * // Continue only when the lock is released - * yield $this->mutex; - * - * // Continue only when the lock is acquired - * yield Workflow::mutex('my-mutex')->lock(); - * ``` - * - * Note: in the last case, if the lock is already acquired, the Workflow will be blocked until it's released. - */ - public static function mutex(): MutexInterface - { - /** @var WorkflowContextInterface $context */ - $context = self::getCurrentContext(); - - return $context->mutex(); - } - /** * Run a function when the mutex is released. * The mutex is locked for the duration of the function. * - * @param non-empty-string|MutexInterface $mutex Mutex name or instance. - *@see Workflow::conditionalMutex() + * @template T + * @param Mutex $mutex Mutex name or instance. + * @param callable(): T $callable Function to run. * + * @return PromiseInterface */ - public function runLocked(string|MutexInterface $mutex, callable $callable): PromiseInterface + public function runLocked(Mutex $mutex, callable $callable): PromiseInterface { - /** @var WorkflowContextInterface $context */ - $context = self::getCurrentContext(); - - return $context->runLocked($mutex, $callable); + return $mutex->lock()->then(static function (Mutex $mutex) use ($callable): mixed { + try { + return $callable(); + } finally { + $mutex->unlock(); + } + }); } } diff --git a/src/Workflow/Mutex.php b/src/Workflow/Mutex.php new file mode 100644 index 00000000..639d4b6c --- /dev/null +++ b/src/Workflow/Mutex.php @@ -0,0 +1,82 @@ +mutex = new Mutex(); + * + * // Continue only when the lock is released + * yield $this->mutex; + * ``` + */ +final class Mutex +{ + private bool $locked = false; + + /** @var Deferred[] */ + private array $waiters = []; + + /** + * Lock the mutex. + * + * ``` + * // Continue only when the lock is acquired + * yield $this->mutex->lock(); + * ``` + * + * @return PromiseInterface A promise that resolves when the lock is acquired. + */ + public function lock(): PromiseInterface + { + if (!$this->locked) { + $this->locked = true; + return Promise::resolve($this); + } + + $deferred = new Deferred(); + $this->waiters[] = $deferred; + + return $deferred->promise(); + } + + /** + * Try to lock the mutex. + * + * @return bool Returns true if the mutex was successfully locked, false otherwise. + */ + public function tryLock(): bool + { + return !$this->locked and $this->locked = true; + } + + /** + * Release the lock. + */ + public function unlock(): void + { + if ($this->waiters === []) { + $this->locked = false; + return; + } + + \array_shift($this->waiters)->resolve($this); + } + + /** + * Check if the mutex is locked. + */ + public function isLocked(): bool + { + return $this->locked; + } +} diff --git a/src/Workflow/MutexInterface.php b/src/Workflow/MutexInterface.php deleted file mode 100644 index 54298da1..00000000 --- a/src/Workflow/MutexInterface.php +++ /dev/null @@ -1,23 +0,0 @@ - */ public function uuid7(?\DateTimeInterface $dateTime = null): PromiseInterface; - - /** - * Create a mutex. - */ - public function mutex(): MutexInterface; - - /** - * Run a function when the mutex is released. - * The mutex will be locked for the duration of the function. - */ - public function runLocked(MutexInterface $mutex, callable $callable): PromiseInterface; }