diff --git a/composer.json b/composer.json index b985710d7..e934dda5f 100644 --- a/composer.json +++ b/composer.json @@ -33,7 +33,7 @@ "react/promise": "^2.9", "roadrunner-php/roadrunner-api-dto": "^1.9.0", "roadrunner-php/version-checker": "^1.0", - "spiral/attributes": "^3.1.4", + "spiral/attributes": "^3.1.6", "spiral/roadrunner": "^2024.1", "spiral/roadrunner-cli": "^2.5", "spiral/roadrunner-kv": "^4.2", diff --git a/psalm-baseline.xml b/psalm-baseline.xml index 32146d8dc..264f05305 100644 --- a/psalm-baseline.xml +++ b/psalm-baseline.xml @@ -1540,14 +1540,6 @@ - - - - - - - - diff --git a/src/Client/Common/Paginator.php b/src/Client/Common/Paginator.php index f0f8179bf..ba89c5db3 100644 --- a/src/Client/Common/Paginator.php +++ b/src/Client/Common/Paginator.php @@ -15,8 +15,6 @@ * * @template TItem * @implements IteratorAggregate - * @internal - * @psalm-internal Temporal\Client */ final class Paginator implements IteratorAggregate, Countable { @@ -46,6 +44,8 @@ private function __construct( * @param null|callable(): int<0, max> $counter Returns total number of items. * * @return self + * + * @internal */ public static function createFromGenerator(Generator $loader, ?callable $counter): self { diff --git a/src/Internal/Marshaller/Type/EnumType.php b/src/Internal/Marshaller/Type/EnumType.php index bb2168c1e..8e9342396 100644 --- a/src/Internal/Marshaller/Type/EnumType.php +++ b/src/Internal/Marshaller/Type/EnumType.php @@ -28,10 +28,6 @@ class EnumType extends Type implements RuleFactoryInterface public function __construct(MarshallerInterface $marshaller, string $class = null) { - if (PHP_VERSION_ID < 80104) { - throw new \RuntimeException('Enums are not available in this version of PHP'); - } - if ($class === null) { throw new \RuntimeException('Enum is required'); } diff --git a/src/Internal/Marshaller/Type/EnumValueType.php b/src/Internal/Marshaller/Type/EnumValueType.php new file mode 100644 index 000000000..8aa45f0ba --- /dev/null +++ b/src/Internal/Marshaller/Type/EnumValueType.php @@ -0,0 +1,80 @@ + + */ +class EnumValueType extends Type implements RuleFactoryInterface +{ + private const ERROR_MESSAGE = 'Invalid Enum value. Expected: int or string scalar value for BackedEnum. %s given.'; + + /** @var class-string<\BackedEnum> */ + private string $classFQCN; + + /** + * @param class-string<\BackedEnum>|null $class + */ + public function __construct(MarshallerInterface $marshaller, ?string $class = null) + { + $this->classFQCN = $class ?? throw new \RuntimeException('Enum is required.'); + \is_a($class, BackedEnum::class, true) ?: throw new \RuntimeException( + 'Class for EnumValueType must be an instance of BackedEnum.', + ); + parent::__construct($marshaller); + } + + /** + * {@inheritDoc} + */ + public static function makeRule(\ReflectionProperty $property): ?MarshallingRule + { + $type = $property->getType(); + + if (!$type instanceof \ReflectionNamedType || !\is_subclass_of($type->getName(), \UnitEnum::class)) { + return null; + } + + return $type->allowsNull() + ? new MarshallingRule( + $property->getName(), + NullableType::class, + new MarshallingRule(type: self::class, of: $type->getName()), + ) + : new MarshallingRule($property->getName(), self::class, $type->getName()); + } + + /** + * {@inheritDoc} + */ + public function parse($value, $current) + { + if (\is_object($value)) { + return $value; + } + + if (\is_int($value) || \is_string($value)) { + return $this->classFQCN::from($value); + } + + throw new \InvalidArgumentException(\sprintf(self::ERROR_MESSAGE, \ucfirst(\get_debug_type($value)))); + } + + public function serialize($value): int|string + { + return $value->value; + } +} diff --git a/src/Internal/Marshaller/TypeFactory.php b/src/Internal/Marshaller/TypeFactory.php index 806582a9d..bc679ab22 100644 --- a/src/Internal/Marshaller/TypeFactory.php +++ b/src/Internal/Marshaller/TypeFactory.php @@ -17,6 +17,7 @@ use Temporal\Internal\Marshaller\Type\DetectableTypeInterface; use Temporal\Internal\Marshaller\Type\EncodedCollectionType; use Temporal\Internal\Marshaller\Type\EnumType; +use Temporal\Internal\Marshaller\Type\EnumValueType; use Temporal\Internal\Marshaller\Type\ObjectType; use Temporal\Internal\Marshaller\Type\OneOfType; use Temporal\Internal\Marshaller\Type\RuleFactoryInterface as TypeRuleFactoryInterface; @@ -139,10 +140,8 @@ private function createMatchers(iterable $matchers): void */ private function getDefaultMatchers(): iterable { - if (PHP_VERSION_ID >= 80104) { - yield EnumType::class; - } - + yield EnumType::class; + yield EnumValueType::class; yield DateTimeType::class; yield DateIntervalType::class; yield UuidType::class; diff --git a/src/Internal/Transport/Router/GetWorkerInfo.php b/src/Internal/Transport/Router/GetWorkerInfo.php index 280c62b15..8d51dde88 100644 --- a/src/Internal/Transport/Router/GetWorkerInfo.php +++ b/src/Internal/Transport/Router/GetWorkerInfo.php @@ -84,6 +84,7 @@ private function workerToArray(WorkerInterface $worker): array // ActivityInfo[] 'Activities' => $this->map($worker->getActivities(), $activityMap), 'PhpSdkVersion' => SdkVersion::getSdkVersion(), + 'Flags' => (object)[], ]; } diff --git a/src/Worker/WorkerOptions.php b/src/Worker/WorkerOptions.php index c24fccc52..7ce72b555 100644 --- a/src/Worker/WorkerOptions.php +++ b/src/Worker/WorkerOptions.php @@ -12,8 +12,10 @@ namespace Temporal\Worker; use JetBrains\PhpStorm\Pure; +use Temporal\Activity\ActivityOptions; use Temporal\Internal\Marshaller\Meta\Marshal; use Temporal\Internal\Marshaller\Type\DateIntervalType; +use Temporal\Internal\Marshaller\Type\EnumValueType; use Temporal\Internal\Marshaller\Type\NullableType; use Temporal\Internal\Support\DateInterval; @@ -23,8 +25,7 @@ class WorkerOptions { /** - * Optional: To set the maximum concurrent activity executions this worker - * can have. + * Optional: To set the maximum concurrent activity executions this worker can have. * * The zero value of this uses the default value. */ @@ -83,6 +84,8 @@ class WorkerOptions * used to protect down stream services from flooding. * * The zero value of this uses the default value. + * + * @note Setting this to a non zero value will also disable eager activities. */ #[Marshal(name: 'TaskQueueActivitiesPerSecond')] public float $taskQueueActivitiesPerSecond = 0; @@ -101,6 +104,8 @@ class WorkerOptions * worker can have. * * The zero value of this uses the default value. + * Due to internal logic where pollers alternate between stick and non-sticky queues, this + * value cannot be 1 and will panic if set to that value. */ #[Marshal(name: 'MaxConcurrentWorkflowTaskExecutionSize')] public int $maxConcurrentWorkflowTaskExecutionSize = 0; @@ -109,20 +114,63 @@ class WorkerOptions * Optional: Sets the maximum number of goroutines that will concurrently * poll the temporal-server to retrieve workflow tasks. Changing this value * will affect the rate at which the worker is able to consume tasks from - * a task queue. + * a task queue. Due to + * internal logic where pollers alternate between stick and non-sticky queues, this + * value cannot be 1 and will panic if set to that value. */ #[Marshal(name: 'MaxConcurrentWorkflowTaskPollers')] public int $maxConcurrentWorkflowTaskPollers = 0; + /** + * Optional: Sets the maximum concurrent nexus task executions this worker can have. + * The zero value of this uses the default value. + */ + #[Marshal(name: 'MaxConcurrentNexusTaskExecutionSize')] + public int $maxConcurrentNexusTaskExecutionSize = 0; + + /** + * Optional: Sets the maximum number of goroutines that will concurrently poll the + * temporal-server to retrieve nexus tasks. Changing this value will affect the + * rate at which the worker is able to consume tasks from a task queue. + */ + #[Marshal(name: 'MaxConcurrentNexusTaskPollers')] + public int $maxConcurrentNexusTaskPollers = 0; + + /** + * Optional: Enable logging in replay. + * + * In the workflow code you can use workflow.GetLogger(ctx) to write logs. By default, the logger will skip log + * entry during replay mode so you won't see duplicate logs. This option will enable the logging in replay mode. + * This is only useful for debugging purpose. + */ + #[Marshal(name: 'EnableLoggingInReplay')] + public bool $enableLoggingInReplay = false; + /** * Optional: Sticky schedule to start timeout. * - * The resolution is seconds. See details about StickyExecution on the - * comments for DisableStickyExecution. + * The resolution is seconds. + * + * Sticky Execution is to run the workflow tasks for one workflow execution on same worker host. This is an + * optimization for workflow execution. When sticky execution is enabled, worker keeps the workflow state in + * memory. New workflow task contains the new history events will be dispatched to the same worker. If this + * worker crashes, the sticky workflow task will timeout after StickyScheduleToStartTimeout, and temporal server + * will clear the stickiness for that workflow execution and automatically reschedule a new workflow task that + * is available for any worker to pick up and resume the progress. + * + * Default: 5s */ #[Marshal(name: 'StickyScheduleToStartTimeout', type: NullableType::class, of: DateIntervalType::class)] public ?\DateInterval $stickyScheduleToStartTimeout = null; + /** + * Optional: Sets how workflow worker deals with non-deterministic history events + * (presumably arising from non-deterministic workflow definitions or non-backward compatible workflow + * definition changes) and other panics raised from workflow code. + */ + #[Marshal(name: 'WorkflowPanicPolicy', type: EnumValueType::class, of: WorkflowPanicPolicy::class)] + public WorkflowPanicPolicy $workflowPanicPolicy = WorkflowPanicPolicy::BlockWorkflow; + /** * Optional: worker graceful stop timeout. */ @@ -151,15 +199,116 @@ class WorkerOptions public ?string $sessionResourceId = null; /** - * Optional: Sets the maximum number of concurrently running sessions the - * resource support. + * Optional: Sets the maximum number of concurrently running sessions the resource supports. */ #[Marshal(name: 'MaxConcurrentSessionExecutionSize')] public int $maxConcurrentSessionExecutionSize = 1000; /** - * @return static + * Optional: If set to true, a workflow worker is not started for this + * worker and workflows cannot be registered with this worker. Use this if + * you only want your worker to execute activities. + */ + #[Marshal(name: 'DisableWorkflowWorker')] + public bool $disableWorkflowWorker = false; + + /** + * Optional: If set to true worker would only handle workflow tasks and local activities. + * Non-local activities will not be executed by this worker. + */ + #[Marshal(name: 'LocalActivityWorkerOnly')] + public bool $localActivityWorkerOnly = false; + + /** + * Optional: If set overwrites the client level Identify value. + * default: client identity + */ + #[Marshal(name: 'Identity')] + public string $identity = ''; + + /** + * Optional: If set defines maximum amount of time that workflow task will be allowed to run. + * Default: 1 sec. + */ + #[Marshal(name: 'DeadlockDetectionTimeout', type: NullableType::class, of: DateIntervalType::class)] + public ?\DateInterval $deadlockDetectionTimeout = null; + + /** + * Optional: The default amount of time between sending each pending heartbeat to the server. + * This is used if the ActivityOptions do not provide a HeartbeatTimeout. + * Otherwise, the interval becomes a value a bit smaller than the given HeartbeatTimeout. + * + * Default: 30 seconds + */ + #[Marshal(name: 'MaxHeartbeatThrottleInterval', type: NullableType::class, of: DateIntervalType::class)] + public ?\DateInterval $maxHeartbeatThrottleInterval = null; + + /** + * Optional: Disable eager activities. If set to true, activities will not + * be requested to execute eagerly from the same workflow regardless + * of {@see self::$maxConcurrentEagerActivityExecutionSize}. + * + * Eager activity execution means the server returns requested eager + * activities directly from the workflow task back to this worker which is + * faster than non-eager which may be dispatched to a separate worker. + * + * @note Eager activities will automatically be disabled if {@see self::$taskQueueActivitiesPerSecond} is set. + */ + #[Marshal(name: 'DisableEagerActivities')] + public bool $disableEagerActivities = false; + + /** + * Optional: Maximum number of eager activities that can be running. + * + * When non-zero, eager activity execution will not be requested for + * activities schedule by the workflow if it would cause the total number of + * running eager activities to exceed this value. For example, if this is + * set to 1000 and there are already 998 eager activities executing and a + * workflow task schedules 3 more, only the first 2 will request eager + * execution. + * + * The default of 0 means unlimited and therefore only bound by {@see self::$maxConcurrentActivityExecutionSize}. + * + * @see self::$disableEagerActivities for a description of eager activity execution. + */ + #[Marshal(name: 'MaxConcurrentEagerActivityExecutionSize')] + public int $maxConcurrentEagerActivityExecutionSize = 0; + + /** + * Optional: Disable allowing workflow and activity functions that are + * registered with custom names from being able to be called with their + * function references. + * + * Users are strongly recommended to set this as true if they register any + * workflow or activity functions with custom names. By leaving this as + * false, the historical default, ambiguity can occur between function names + * and aliased names when not using string names when executing child + * workflow or activities. + */ + #[Marshal(name: 'DisableRegistrationAliasing')] + public bool $disableRegistrationAliasing = false; + + /** + * Assign a BuildID to this worker. This replaces the deprecated binary checksum concept, + * and is used to provide a unique identifier for a set of worker code, and is necessary + * to opt in to the Worker Versioning feature. See {@see self::$useBuildIDForVersioning}. + * + * @internal Experimental */ + #[Marshal(name: 'BuildID')] + public string $buildID = ''; + + /** + * Optional: If set, opts this worker into the Worker Versioning feature. + * It will only operate on workflows it claims to be compatible with. + * You must set {@see self::$buildID} if this flag is true. + * + * @internal Experimental + * @note Cannot be enabled at the same time as {@see self::$enableSessionWorker} + */ + #[Marshal(name: 'UseBuildIDForVersioning')] + public bool $useBuildIDForVersioning = false; + #[Pure] public static function new(): self { @@ -167,25 +316,21 @@ public static function new(): self } /** - * Optional: To set the maximum concurrent activity executions this worker - * can have. + * Optional: To set the maximum concurrent activity executions this worker can have. * * The zero value of this uses the default value. * * @psalm-suppress ImpureMethodCall * * @param int<0, max> $size - * @return self */ #[Pure] public function withMaxConcurrentActivityExecutionSize(int $size): self { - assert($size >= 0); + \assert($size >= 0); $self = clone $this; - $self->maxConcurrentActivityExecutionSize = $size; - return $self; } @@ -201,19 +346,14 @@ public function withMaxConcurrentActivityExecutionSize(int $size): self * The zero value of this uses the default value. * * @psalm-suppress ImpureMethodCall - * - * @param float $interval - * @return self */ #[Pure] public function withWorkerActivitiesPerSecond(float $interval): self { - assert($interval >= 0); + \assert($interval >= 0); $self = clone $this; - $self->workerActivitiesPerSecond = $interval; - return $self; } @@ -226,17 +366,14 @@ public function withWorkerActivitiesPerSecond(float $interval): self * @psalm-suppress ImpureMethodCall * * @param int<0, max> $size - * @return self */ #[Pure] public function withMaxConcurrentLocalActivityExecutionSize(int $size): self { - assert($size >= 0); + \assert($size >= 0); $self = clone $this; - $self->maxConcurrentLocalActivityExecutionSize = $size; - return $self; } @@ -253,19 +390,14 @@ public function withMaxConcurrentLocalActivityExecutionSize(int $size): self * The zero value of this uses the default value. * * @psalm-suppress ImpureMethodCall - * - * @param float $interval - * @return self */ #[Pure] public function withWorkerLocalActivitiesPerSecond(float $interval): self { - assert($interval >= 0); + \assert($interval >= 0); $self = clone $this; - $self->workerLocalActivitiesPerSecond = $interval; - return $self; } @@ -284,20 +416,17 @@ public function withWorkerLocalActivitiesPerSecond(float $interval): self * * The zero value of this uses the default value. * - * @psalm-suppress ImpureMethodCall + * @note Setting this to a non zero value will also disable eager activities. * - * @param float $interval - * @return self + * @psalm-suppress ImpureMethodCall */ #[Pure] public function withTaskQueueActivitiesPerSecond(float $interval): self { - assert($interval >= 0); + \assert($interval >= 0); $self = clone $this; - $self->taskQueueActivitiesPerSecond = $interval; - return $self; } @@ -310,17 +439,14 @@ public function withTaskQueueActivitiesPerSecond(float $interval): self * @psalm-suppress ImpureMethodCall * * @param int<0, max> $pollers - * @return self */ #[Pure] public function withMaxConcurrentActivityTaskPollers(int $pollers): self { - assert($pollers >= 0); + \assert($pollers >= 0); $self = clone $this; - $self->maxConcurrentActivityTaskPollers = $pollers; - return $self; } @@ -329,21 +455,20 @@ public function withMaxConcurrentActivityTaskPollers(int $pollers): self * worker can have. * * The zero value of this uses the default value. + * Due to internal logic where pollers alternate between stick and non-sticky queues, this + * value cannot be 1 and will panic if set to that value. * * @psalm-suppress ImpureMethodCall * * @param int<0, max> $size - * @return self */ #[Pure] public function withMaxConcurrentWorkflowTaskExecutionSize(int $size): self { - assert($size >= 0); + \assert($size >= 0); $self = clone $this; - $self->maxConcurrentWorkflowTaskExecutionSize = $size; - return $self; } @@ -351,62 +476,124 @@ public function withMaxConcurrentWorkflowTaskExecutionSize(int $size): self * Optional: Sets the maximum number of goroutines that will concurrently * poll the temporal-server to retrieve workflow tasks. Changing this value * will affect the rate at which the worker is able to consume tasks from - * a task queue. + * a task queue. Due to + * internal logic where pollers alternate between stick and non-sticky queues, this + * value cannot be 1 and will panic if set to that value. * * @psalm-suppress ImpureMethodCall * * @param int<0, max> $pollers - * @return self */ #[Pure] public function withMaxConcurrentWorkflowTaskPollers(int $pollers): self { - assert($pollers >= 0); + \assert($pollers >= 0); $self = clone $this; - $self->maxConcurrentWorkflowTaskPollers = $pollers; + return $self; + } + /** + * Optional: Sets the maximum concurrent nexus task executions this worker can have. + * The zero value of this uses the default value. + * + * @param int<0, max> $size + */ + #[Pure] + public function withMaxConcurrentNexusTaskExecutionSize(int $size): self + { + \assert($size >= 0); + + $self = clone $this; + $self->maxConcurrentNexusTaskExecutionSize = $size; + return $self; + } + + /** + * Optional: Sets the maximum number of goroutines that will concurrently + * poll the temporal-server to retrieve nexus tasks. Changing this value will affect the + * rate at which the worker is able to consume tasks from a task queue. + * + * @param int<0, max> $pollers + */ + #[Pure] + public function withMaxConcurrentNexusTaskPollers(int $pollers): self + { + \assert($pollers >= 0); + + $self = clone $this; + $self->maxConcurrentNexusTaskPollers = $pollers; + return $self; + } + + /** + * Optional: Enable logging in replay. + * + * In the workflow code you can use workflow.GetLogger(ctx) to write logs. By default, the logger will skip log + * entry during replay mode so you won't see duplicate logs. This option will enable the logging in replay mode. + * This is only useful for debugging purpose. + */ + #[Pure] + public function withEnableLoggingInReplay(bool $enable = true): self + { + $self = clone $this; + $self->enableLoggingInReplay = $enable; return $self; } /** * Optional: Sticky schedule to start timeout. * - * The resolution is seconds. See details about StickyExecution on the - * comments for DisableStickyExecution. + * Sticky Execution is to run the workflow tasks for one workflow execution on same worker host. This is an + * optimization for workflow execution. When sticky execution is enabled, worker keeps the workflow state in + * memory. New workflow task contains the new history events will be dispatched to the same worker. If this + * worker crashes, the sticky workflow task will timeout after StickyScheduleToStartTimeout, and temporal server + * will clear the stickiness for that workflow execution and automatically reschedule a new workflow task that + * is available for any worker to pick up and resume the progress. * * @psalm-suppress ImpureMethodCall * * @param DateIntervalValue $timeout - * @return self */ #[Pure] public function withStickyScheduleToStartTimeout($timeout): self { - assert(DateInterval::assert($timeout)); + \assert(DateInterval::assert($timeout)); $timeout = DateInterval::parse($timeout, DateInterval::FORMAT_SECONDS); - assert($timeout->totalMicroseconds >= 0); + \assert($timeout->totalMicroseconds >= 0); $self = clone $this; $self->stickyScheduleToStartTimeout = $timeout; return $self; } + /** + * Optional: Sets how workflow worker deals with non-deterministic history events + * (presumably arising from non-deterministic workflow definitions or non-backward compatible workflow + * definition changes) and other panics raised from workflow code. + */ + #[Pure] + public function withWorkflowPanicPolicy(WorkflowPanicPolicy $policy): self + { + $self = clone $this; + $self->workflowPanicPolicy = $policy; + return $self; + } + /** * Optional: worker graceful stop timeout. * * @psalm-suppress ImpureMethodCall * * @param DateIntervalValue $timeout - * @return self */ #[Pure] public function withWorkerStopTimeout($timeout): self { - assert(DateInterval::assert($timeout)); + \assert(DateInterval::assert($timeout)); $timeout = DateInterval::parse($timeout, DateInterval::FORMAT_SECONDS); - assert($timeout->totalMicroseconds >= 0); + \assert($timeout->totalMicroseconds >= 0); $self = clone $this; $self->workerStopTimeout = $timeout; @@ -418,17 +605,12 @@ public function withWorkerStopTimeout($timeout): self * * Session workers is for activities within a session. * Enable this option to allow worker to process sessions. - * - * @param bool $enable - * @return self */ #[Pure] public function withEnableSessionWorker(bool $enable = true): self { $self = clone $this; - $self->enableSessionWorker = $enable; - return $self; } @@ -442,15 +624,12 @@ public function withEnableSessionWorker(bool $enable = true): self * resourceID. * * @param string|null $identifier - * @return self */ #[Pure] public function withSessionResourceId(?string $identifier): self { $self = clone $this; - $self->sessionResourceId = $identifier === '' ? null : $identifier; - return $self; } @@ -461,17 +640,191 @@ public function withSessionResourceId(?string $identifier): self * @psalm-suppress ImpureMethodCall * * @param int<0, max> $size - * @return self */ #[Pure] public function withMaxConcurrentSessionExecutionSize(int $size): self { - assert($size >= 0); + \assert($size >= 0); $self = clone $this; - $self->maxConcurrentSessionExecutionSize = $size; + return $self; + } + + /** + * Optional: If set to true, a workflow worker is not started for this + * worker and workflows cannot be registered with this worker. Use this if + * you only want your worker to execute activities. + */ + #[Pure] + public function withDisableWorkflowWorker(bool $disable = true): self + { + $self = clone $this; + $self->disableWorkflowWorker = $disable; + return $self; + } + + /** + * Optional: If set to true worker would only handle workflow tasks and local activities. + * Non-local activities will not be executed by this worker. + */ + #[Pure] + public function withLocalActivityWorkerOnly(bool $localOnly = true): self + { + $self = clone $this; + $self->localActivityWorkerOnly = $localOnly; + return $self; + } + + /** + * Optional: If set overwrites the client level Identify value. + * default: client identity + * + * @param non-empty-string $identity + */ + #[Pure] + public function withIdentity(string $identity): self + { + $self = clone $this; + $self->identity = $identity; + return $self; + } + + /** + * Optional: If set defines maximum amount of time that workflow task will be allowed to run. + * Default: 1 sec. + * + * @psalm-suppress ImpureMethodCall + * + * @param DateIntervalValue $timeout + */ + #[Pure] + public function withDeadlockDetectionTimeout($timeout): self + { + \assert(DateInterval::assert($timeout)); + $timeout = DateInterval::parse($timeout, DateInterval::FORMAT_SECONDS); + \assert($timeout->totalMicroseconds >= 0); + + $self = clone $this; + $self->deadlockDetectionTimeout = $timeout; + return $self; + } + /** + * Optional: The default amount of time between sending each pending heartbeat to the server. + * This is used if the {@see ActivityOptions} do not provide a HeartbeatTimeout. + * Otherwise, the interval becomes a value a bit smaller than the given HeartbeatTimeout. + * + * Default: 30 seconds + * + * @psalm-suppress ImpureMethodCall + * + * @param DateIntervalValue $interval + */ + #[Pure] + public function withMaxHeartbeatThrottleInterval($interval): self + { + \assert(DateInterval::assert($interval)); + $interval = DateInterval::parse($interval, DateInterval::FORMAT_SECONDS); + \assert($interval->totalMicroseconds >= 0); + + $self = clone $this; + $self->maxHeartbeatThrottleInterval = $interval; + return $self; + } + + /** + * Optional: Disable eager activities. If set to true, activities will not + * be requested to execute eagerly from the same workflow regardless + * of {@see self::$maxConcurrentEagerActivityExecutionSize}. + * + * Eager activity execution means the server returns requested eager + * activities directly from the workflow task back to this worker which is + * faster than non-eager which may be dispatched to a separate worker. + * + * @note Eager activities will automatically be disabled if {@see self::$taskQueueActivitiesPerSecond} is set. + */ + #[Pure] + public function withDisableEagerActivities(bool $disable = true): self + { + $self = clone $this; + $self->disableEagerActivities = $disable; + return $self; + } + + /** + * Optional: Maximum number of eager activities that can be running. + * + * When non-zero, eager activity execution will not be requested for + * activities schedule by the workflow if it would cause the total number of + * running eager activities to exceed this value. For example, if this is + * set to 1000 and there are already 998 eager activities executing and a + * workflow task schedules 3 more, only the first 2 will request eager + * execution. + * + * The default of 0 means unlimited and therefore only bound by {@see self::$maxConcurrentActivityExecutionSize}. + * + * @see self::$disableEagerActivities for a description of eager activity execution. + */ + #[Pure] + public function withMaxConcurrentEagerActivityExecutionSize(int $size): self + { + \assert($size >= 0); + + $self = clone $this; + $self->maxConcurrentEagerActivityExecutionSize = $size; + return $self; + } + + /** + * Optional: Disable allowing workflow and activity functions that are + * registered with custom names from being able to be called with their + * function references. + * + * Users are strongly recommended to set this as true if they register any + * workflow or activity functions with custom names. By leaving this as + * false, the historical default, ambiguity can occur between function names + * and aliased names when not using string names when executing child + * workflow or activities. + */ + #[Pure] + public function withDisableRegistrationAliasing(bool $disable = true): self + { + $self = clone $this; + $self->disableRegistrationAliasing = $disable; + return $self; + } + + /** + * Assign a BuildID to this worker. This replaces the deprecated binary checksum concept, + * and is used to provide a unique identifier for a set of worker code, and is necessary + * to opt in to the Worker Versioning feature. See {@see self::$useBuildIDForVersioning}. + * + * @param non-empty-string $buildID + * + * @internal Experimental + */ + #[Pure] + public function withBuildID(string $buildID): self + { + $self = clone $this; + $self->buildID = $buildID; + return $self; + } + + /** + * Optional: If set, opts this worker into the Worker Versioning feature. + * It will only operate on workflows it claims to be compatible with. + * You must set {@see self::$buildID} if this flag is true. + * + * @internal Experimental + * @note Cannot be enabled at the same time as {@see self::$enableSessionWorker} + */ + #[Pure] + public function withUseBuildIDForVersioning(bool $useBuildIDForVersioning = true): self + { + $self = clone $this; + $self->useBuildIDForVersioning = $useBuildIDForVersioning; return $self; } } diff --git a/src/Worker/WorkflowPanicPolicy.php b/src/Worker/WorkflowPanicPolicy.php new file mode 100644 index 000000000..5fa77a02a --- /dev/null +++ b/src/Worker/WorkflowPanicPolicy.php @@ -0,0 +1,23 @@ +scalarEnum = ScalarEnum::TESTED_ENUM; + $dto->autoScalarEnum = ScalarEnum::TESTED_ENUM; + $dto->nullable = null; + + $result = $this->marshal($dto); + $this->assertSame('tested', $result['scalarEnum']); + $this->assertSame('tested', $result['autoScalarEnum']); + $this->assertNull($result['nullable']); + } + + public function testMarshalEnumIntoNullable(): void + { + $dto = new EnumDto(); + $dto->nullable = ScalarEnum::TESTED_ENUM; + + $result = $this->marshal($dto); + $this->assertSame('tested', $result['nullable']); + } + + public function testUnmarshalBackedEnumUsingScalarValue(): void + { + $dto = $this->unmarshal([ + 'scalarEnum' => ScalarEnum::TESTED_ENUM->value, + ], new EnumDto()); + + $this->assertSame(ScalarEnum::TESTED_ENUM, $dto->scalarEnum); + } + + public function testUnmarshalEnumUsingNameInArray(): void + { + $this->expectException(\Temporal\Exception\MarshallerException::class); + + $this->unmarshal([ + 'scalarEnum' => ['name' => ScalarEnum::TESTED_ENUM->name], + ], new EnumDto()); + } + + public function testMarshalAndUnmarshalSame(): void + { + $dto = new EnumDTO(); + $dto->scalarEnum = ScalarEnum::TESTED_ENUM; + $dto->autoScalarEnum = ScalarEnum::TESTED_ENUM; + $dto->nullable = null; + + $result = $this->marshal($dto); + $unmarshal = $this->unmarshal($result, new EnumDTO()); + + $this->assertEquals($dto, $unmarshal); + } + + public function testUnmarshalNullToNotNullable(): void + { + try { + $this->unmarshal([ + 'autoScalarEnum' => null, + ], new EnumDto()); + + $this->fail('Null value should not be allowed.'); + } catch (\Throwable $e) { + $this->assertStringContainsString( + '`autoScalarEnum`', + $e->getMessage(), + ); + $this->assertInstanceOf(\InvalidArgumentException::class, $e->getPrevious()); + $this->assertStringContainsString( + 'Invalid Enum value', + $e->getPrevious()->getMessage(), + ); + } + } + + protected function getTypeMatchers(): array + { + return [ + EnumType::class, + ]; + } +} diff --git a/tests/Unit/DTO/Type/EnumType/Stub/EnumValueDto.php b/tests/Unit/DTO/Type/EnumType/Stub/EnumValueDto.php new file mode 100644 index 000000000..21ea56d38 --- /dev/null +++ b/tests/Unit/DTO/Type/EnumType/Stub/EnumValueDto.php @@ -0,0 +1,25 @@ + 0, 'MaxConcurrentWorkflowTaskExecutionSize' => 0, 'MaxConcurrentWorkflowTaskPollers' => 0, + 'MaxConcurrentNexusTaskExecutionSize' => 0, + 'MaxConcurrentNexusTaskPollers' => 0, + 'EnableLoggingInReplay' => false, 'StickyScheduleToStartTimeout' => null, + 'WorkflowPanicPolicy' => 0, 'WorkerStopTimeout' => null, 'EnableSessionWorker' => false, 'SessionResourceID' => null, 'MaxConcurrentSessionExecutionSize' => 1000, + 'DisableWorkflowWorker' => false, + 'LocalActivityWorkerOnly' => false, + 'Identity' => "", + 'DeadlockDetectionTimeout' => null, + 'MaxHeartbeatThrottleInterval' => null, + 'DisableEagerActivities' => false, + 'MaxConcurrentEagerActivityExecutionSize' => 0, + 'DisableRegistrationAliasing' => false, + 'BuildID' => "", + 'UseBuildIDForVersioning' => false, ]; $this->assertSame($expected, $this->marshal($dto)); } + + public function testMaxConcurrentActivityExecutionSize(): void + { + $dto = new WorkerOptions(); + $result = $dto->withMaxConcurrentActivityExecutionSize(10); + + self::assertNotSame($dto, $result); + self::assertSame(0, $dto->maxConcurrentActivityExecutionSize); + self::assertSame(10, $result->maxConcurrentActivityExecutionSize); + } + + public function testWorkerActivitiesPerSecond(): void + { + $dto = new WorkerOptions(); + $result = $dto->withWorkerActivitiesPerSecond(10.0); + + self::assertNotSame($dto, $result); + self::assertSame(0.0, $dto->workerActivitiesPerSecond); + self::assertSame(10.0, $result->workerActivitiesPerSecond); + } + + public function testMaxConcurrentLocalActivityExecutionSize(): void + { + $dto = new WorkerOptions(); + $result = $dto->withMaxConcurrentLocalActivityExecutionSize(10); + + self::assertNotSame($dto, $result); + self::assertSame(0, $dto->maxConcurrentLocalActivityExecutionSize); + self::assertSame(10, $result->maxConcurrentLocalActivityExecutionSize); + } + + public function testWorkerLocalActivitiesPerSecond(): void + { + $dto = new WorkerOptions(); + $result = $dto->withWorkerLocalActivitiesPerSecond(10.0); + + self::assertNotSame($dto, $result); + self::assertSame(0.0, $dto->workerLocalActivitiesPerSecond); + self::assertSame(10.0, $result->workerLocalActivitiesPerSecond); + } + + public function testTaskQueueActivitiesPerSecond(): void + { + $dto = new WorkerOptions(); + $result = $dto->withTaskQueueActivitiesPerSecond(10.0); + + self::assertNotSame($dto, $result); + self::assertSame(0.0, $dto->taskQueueActivitiesPerSecond); + self::assertSame(10.0, $result->taskQueueActivitiesPerSecond); + } + + public function testMaxConcurrentActivityTaskPollers(): void + { + $dto = new WorkerOptions(); + $result = $dto->withMaxConcurrentActivityTaskPollers(10); + + self::assertNotSame($dto, $result); + self::assertSame(0, $dto->maxConcurrentActivityTaskPollers); + self::assertSame(10, $result->maxConcurrentActivityTaskPollers); + } + + public function testMaxConcurrentWorkflowTaskExecutionSize(): void + { + $dto = new WorkerOptions(); + $result = $dto->withMaxConcurrentWorkflowTaskExecutionSize(10); + + self::assertNotSame($dto, $result); + self::assertSame(0, $dto->maxConcurrentWorkflowTaskExecutionSize); + self::assertSame(10, $result->maxConcurrentWorkflowTaskExecutionSize); + } + + public function testMaxConcurrentWorkflowTaskPollers(): void + { + $dto = new WorkerOptions(); + $result = $dto->withMaxConcurrentWorkflowTaskPollers(10); + + self::assertNotSame($dto, $result); + self::assertSame(0, $dto->maxConcurrentWorkflowTaskPollers); + self::assertSame(10, $result->maxConcurrentWorkflowTaskPollers); + } + + public function testMaxConcurrentNexusTaskExecutionSize(): void + { + $dto = new WorkerOptions(); + $result = $dto->withMaxConcurrentNexusTaskExecutionSize(10); + + self::assertNotSame($dto, $result); + self::assertSame(0, $dto->maxConcurrentNexusTaskExecutionSize); + self::assertSame(10, $result->maxConcurrentNexusTaskExecutionSize); + } + + public function testMaxConcurrentNexusTaskPollers(): void + { + $dto = new WorkerOptions(); + $result = $dto->withMaxConcurrentNexusTaskPollers(10); + + self::assertNotSame($dto, $result); + self::assertSame(0, $dto->maxConcurrentNexusTaskPollers); + self::assertSame(10, $result->maxConcurrentNexusTaskPollers); + } + + public function testEnableLoggingInReplay(): void + { + $dto = new WorkerOptions(); + $result = $dto->withEnableLoggingInReplay(true); + + self::assertNotSame($dto, $result); + self::assertFalse($dto->enableLoggingInReplay); + self::assertTrue($result->enableLoggingInReplay); + } + + public function testStickyScheduleToStartTimeout(): void + { + $dto = new WorkerOptions(); + $result = $dto->withStickyScheduleToStartTimeout(10); + + self::assertNotSame($dto, $result); + self::assertNull($dto->stickyScheduleToStartTimeout); + self::assertSame(10, $result->stickyScheduleToStartTimeout->seconds); + } + + public function testWorkflowPanicPolicy(): void + { + $dto = new WorkerOptions(); + $result = $dto->withWorkflowPanicPolicy(WorkflowPanicPolicy::FailWorkflow); + + self::assertNotSame($dto, $result); + self::assertSame(WorkflowPanicPolicy::BlockWorkflow, $dto->workflowPanicPolicy); + self::assertSame(WorkflowPanicPolicy::FailWorkflow, $result->workflowPanicPolicy); + } + + public function testWorkerStopTimeout(): void + { + $dto = new WorkerOptions(); + $result = $dto->withWorkerStopTimeout(10); + + self::assertNotSame($dto, $result); + self::assertNull($dto->workerStopTimeout); + self::assertSame(10, $result->workerStopTimeout->seconds); + } + + public function testEnableSessionWorker(): void + { + $dto = new WorkerOptions(); + $result = $dto->withEnableSessionWorker(true); + + self::assertNotSame($dto, $result); + self::assertFalse($dto->enableSessionWorker); + self::assertTrue($result->enableSessionWorker); + } + + public function testSessionResourceID(): void + { + $dto = new WorkerOptions(); + $result = $dto->withSessionResourceID('test'); + + self::assertNotSame($dto, $result); + self::assertNull($dto->sessionResourceId); + self::assertSame('test', $result->sessionResourceId); + } + + public function testMaxConcurrentSessionExecutionSize(): void + { + $dto = new WorkerOptions(); + $result = $dto->withMaxConcurrentSessionExecutionSize(10); + + self::assertNotSame($dto, $result); + self::assertSame(1000, $dto->maxConcurrentSessionExecutionSize); + self::assertSame(10, $result->maxConcurrentSessionExecutionSize); + } + + public function testDisableWorkflowWorker(): void + { + $dto = new WorkerOptions(); + $result = $dto->withDisableWorkflowWorker(true); + + self::assertNotSame($dto, $result); + self::assertFalse($dto->disableWorkflowWorker); + self::assertTrue($result->disableWorkflowWorker); + } + + public function testLocalActivityWorkerOnly(): void + { + $dto = new WorkerOptions(); + $result = $dto->withLocalActivityWorkerOnly(true); + + self::assertNotSame($dto, $result); + self::assertFalse($dto->localActivityWorkerOnly); + self::assertTrue($result->localActivityWorkerOnly); + } + + public function testIdentity(): void + { + $dto = new WorkerOptions(); + $result = $dto->withIdentity('test'); + + self::assertNotSame($dto, $result); + self::assertSame('', $dto->identity); + self::assertSame('test', $result->identity); + } + + public function testDeadlockDetectionTimeout(): void + { + $dto = new WorkerOptions(); + $result = $dto->withDeadlockDetectionTimeout(10); + + self::assertNotSame($dto, $result); + self::assertNull($dto->deadlockDetectionTimeout); + self::assertSame(10, $result->deadlockDetectionTimeout->seconds); + } + + public function testMaxHeartbeatThrottleInterval(): void + { + $dto = new WorkerOptions(); + $result = $dto->withMaxHeartbeatThrottleInterval(10); + + self::assertNotSame($dto, $result); + self::assertNull($dto->maxHeartbeatThrottleInterval); + self::assertSame(10, $result->maxHeartbeatThrottleInterval->seconds); + } + + public function testDisableEagerActivities(): void + { + $dto = new WorkerOptions(); + $result = $dto->withDisableEagerActivities(true); + + self::assertNotSame($dto, $result); + self::assertFalse($dto->disableEagerActivities); + self::assertTrue($result->disableEagerActivities); + } + + public function testMaxConcurrentEagerActivityExecutionSize(): void + { + $dto = new WorkerOptions(); + $result = $dto->withMaxConcurrentEagerActivityExecutionSize(10); + + self::assertNotSame($dto, $result); + self::assertSame(0, $dto->maxConcurrentEagerActivityExecutionSize); + self::assertSame(10, $result->maxConcurrentEagerActivityExecutionSize); + } + + public function testDisableRegistrationAliasing(): void + { + $dto = new WorkerOptions(); + $result = $dto->withDisableRegistrationAliasing(true); + + self::assertNotSame($dto, $result); + self::assertFalse($dto->disableRegistrationAliasing); + self::assertTrue($result->disableRegistrationAliasing); + } + + public function testBuildID(): void + { + $dto = new WorkerOptions(); + $result = $dto->withBuildID('test'); + + self::assertNotSame($dto, $result); + self::assertSame('', $dto->buildID); + self::assertSame('test', $result->buildID); + } + + public function testUseBuildIDForVersioning(): void + { + $dto = new WorkerOptions(); + $result = $dto->withUseBuildIDForVersioning(true); + + self::assertNotSame($dto, $result); + self::assertFalse($dto->useBuildIDForVersioning); + self::assertTrue($result->useBuildIDForVersioning); + } }