From 9d756935236866bfcbc4fd9401a44dc6c533a967 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Mon, 9 Sep 2024 16:33:49 +0400 Subject: [PATCH] Add getter for `ActivityPrototype::$factory` --- src/Internal/Workflow/WorkflowContext.php | 18 ++++- src/Workflow.php | 85 +++++++++++++++++++++++ src/Workflow/MutexInterface.php | 21 ++++++ src/Workflow/WorkflowContextInterface.php | 23 ++++++ 4 files changed, 146 insertions(+), 1 deletion(-) create mode 100644 src/Workflow/MutexInterface.php diff --git a/src/Internal/Workflow/WorkflowContext.php b/src/Internal/Workflow/WorkflowContext.php index 1d734b5e..bb39edc0 100644 --- a/src/Internal/Workflow/WorkflowContext.php +++ b/src/Internal/Workflow/WorkflowContext.php @@ -63,6 +63,7 @@ use Temporal\Workflow\ChildWorkflowStubInterface; use Temporal\Workflow\ContinueAsNewOptions; use Temporal\Workflow\ExternalWorkflowStubInterface; +use Temporal\Workflow\MutexInterface; use Temporal\Workflow\WorkflowContextInterface; use Temporal\Workflow\WorkflowExecution; use Temporal\Workflow\WorkflowInfo; @@ -309,7 +310,7 @@ public function panic(\Throwable $failure = null): PromiseInterface public function continueAsNew( string $type, array $args = [], - ContinueAsNewOptions $options = null + ContinueAsNewOptions $options = null, ): PromiseInterface { return $this->callsInterceptor->with( function (ContinueAsNewInput $input): PromiseInterface { @@ -632,6 +633,21 @@ public function uuid7(?DateTimeInterface $dateTime = null): PromiseInterface return $this->sideEffect(static fn(): UuidInterface => \Ramsey\Uuid\Uuid::uuid7($dateTime)); } + public function mutex(string $name): MutexInternface + { + // todo + } + + public function conditionalMutex(string $name, PromiseInterface|callable ...$lockConditions): MutexInterface + { + // todo + } + + public function runLocked(string|MutexInterface $name, callable $context): PromiseInterface + { + // todo + } + /** * @internal */ diff --git a/src/Workflow.php b/src/Workflow.php index 012fa339..7defd83c 100644 --- a/src/Workflow.php +++ b/src/Workflow.php @@ -28,8 +28,10 @@ use Temporal\Workflow\ChildWorkflowStubInterface; use Temporal\Workflow\ContinueAsNewOptions; use Temporal\Workflow\ExternalWorkflowStubInterface; +use Temporal\Workflow\MutexInterface; use Temporal\Workflow\ScopedContextInterface; use Temporal\Workflow\UpdateContext; +use Temporal\Workflow\WorkflowContextInterface; use Temporal\Workflow\WorkflowExecution; use Temporal\Workflow\WorkflowInfo; use Temporal\Internal\Support\DateInterval; @@ -970,4 +972,87 @@ public static function uuid7(?DateTimeInterface $dateTime = null): PromiseInterf return $context->uuid7($dateTime); } + + /** + * Get a mutex by name or create a new one. + * + * If a mutex is yielded without calling `lock()`, the Workflow will continue + * only when the lock is released. + * + * ```php + * yield Workflow::mutex('my-mutex'); + * ``` + * + * Now to continue only when the lock is acquired: + * + * ```php + * yield Workflow::mutex('my-mutex')->lock(); + * ``` + * + * Note: in this case, if the lock is already acquired, the Workflow will be blocked until it's released + * + * @param non-empty-string $name The name of the mutex. + */ + public static function mutex(string $name): MutexInterface + { + /** @var WorkflowContextInterface $context */ + $context = self::getCurrentContext(); + + return $context->mutex($name); + } + + /** + * Create a conditional mutex. + * + * @param non-empty-string $name + * + * Example: + * + * Monitor when the Workflow should continue as new and the number of threads exceeds 10: + * + * ```php + * function start() { + * // Register a conditional mutex that will be locked when the Workflow should continue as new + * // or the number of threads exceeds 10 + * Workflow::conditionalMutex( + * 'limit', + * fn() => Workflow::getInfo()->shouldContinueAsNew, + * fn() => count($this->threads) >= 10, + * ); + * // ... + * } + * + * function signal(Task $task) { + * Workflow::runLocked('limit', function() { + * $key = array_key_last($this->threads) + 1; + * yield $this->threads[$key] = Workflow::executeChildWorkflow(...); + * unset($this->threads[$key]); + * }); + * } + * ``` + */ + public function conditionalMutex(string $name, PromiseInterface|callable ...$lockConditions): MutexInterface + { + /** @var WorkflowContextInterface $context */ + $context = self::getCurrentContext(); + + return $context->conditionalMutex($name, ...$lockConditions); + } + + /** + * Run a function when the mutex is released. + * The mutex is locked for the duration of the function if it's not a conditional mutex. + * Conditional mutexes are locked only when all conditions are met. + * + * @see Workflow::conditionalMutex() + * + * @param non-empty-string|MutexInterface $name Mutex name or instance. + */ + public function runLocked(string|MutexInterface $name, callable $callable): PromiseInterface + { + /** @var WorkflowContextInterface $context */ + $context = self::getCurrentContext(); + + return $context->runLocked($name, $callable); + } } diff --git a/src/Workflow/MutexInterface.php b/src/Workflow/MutexInterface.php new file mode 100644 index 00000000..ad65f190 --- /dev/null +++ b/src/Workflow/MutexInterface.php @@ -0,0 +1,21 @@ + */ public function uuid7(?DateTimeInterface $dateTime = null): PromiseInterface; + + /** + * Get a mutex by name or create a new one. + * + * @param non-empty-string $name The name of the mutex. + */ + public function mutex(string $name): MutexInterface; + + /** + * Create a conditional mutex. + * + * @param non-empty-string $name + */ + public function conditionalMutex(string $name, PromiseInterface|callable ...$lockConditions): MutexInterface; + + /** + * Run a function when the mutex is released. + * The mutex is locked for the duration of the function if it's not a conditional mutex. + * Conditional mutexes are locked only when all conditions are met. + * + * @param non-empty-string|MutexInterface $name Mutex name or instance. + */ + public function runLocked(string|MutexInterface $name, callable $callable): PromiseInterface; }