From 7cc3b6611cce2961a4cb6048e6173c0d98c7c9ec Mon Sep 17 00:00:00 2001 From: MregXN Date: Fri, 8 Dec 2023 10:05:37 +0800 Subject: [PATCH] add fan-out-fan-in sample Signed-off-by: MregXN --- .../Activities/NotifyActivity.cs | 22 ++++++++ .../Workflow/WorkflowFanOutFanIn/Program.cs | 55 +++++++++++++++++++ .../WorkflowFanOutFanIn.csproj | 14 +++++ .../Workflows/DemoWorkflow.cs | 37 +++++++++++++ 4 files changed, 128 insertions(+) create mode 100644 examples/Workflow/WorkflowFanOutFanIn/Activities/NotifyActivity.cs create mode 100644 examples/Workflow/WorkflowFanOutFanIn/Program.cs create mode 100644 examples/Workflow/WorkflowFanOutFanIn/WorkflowFanOutFanIn.csproj create mode 100644 examples/Workflow/WorkflowFanOutFanIn/Workflows/DemoWorkflow.cs diff --git a/examples/Workflow/WorkflowFanOutFanIn/Activities/NotifyActivity.cs b/examples/Workflow/WorkflowFanOutFanIn/Activities/NotifyActivity.cs new file mode 100644 index 000000000..62856b648 --- /dev/null +++ b/examples/Workflow/WorkflowFanOutFanIn/Activities/NotifyActivity.cs @@ -0,0 +1,22 @@ +using Dapr.Workflow; +using Microsoft.Extensions.Logging; + +namespace WorkflowFanOutFanIn.Activities +{ + public class NotifyActivity : WorkflowActivity + { + readonly ILogger logger; + + public NotifyActivity(ILoggerFactory loggerFactory) + { + this.logger = loggerFactory.CreateLogger(); + } + + public override Task RunAsync(WorkflowActivityContext context, string message) + { + this.logger.LogInformation(message); + + return Task.FromResult(null); + } + } +} diff --git a/examples/Workflow/WorkflowFanOutFanIn/Program.cs b/examples/Workflow/WorkflowFanOutFanIn/Program.cs new file mode 100644 index 000000000..451034c71 --- /dev/null +++ b/examples/Workflow/WorkflowFanOutFanIn/Program.cs @@ -0,0 +1,55 @@ +using Dapr.Client; +using Dapr.Workflow; +using WorkflowFanOutFanIn.Activities; +using WorkflowFanOutFanIn.Workflows; +using Microsoft.Extensions.Hosting; + +const string DaprWorkflowComponent = "dapr"; + +var builder = Host.CreateDefaultBuilder(args).ConfigureServices(services => +{ + services.AddDaprWorkflow(options => + { + options.RegisterWorkflow(); + options.RegisterActivity(); + }); +}); + +Console.WriteLine("SetEnvironmentVariable"); +if (string.IsNullOrEmpty(Environment.GetEnvironmentVariable("DAPR_GRPC_PORT"))) +{ + Environment.SetEnvironmentVariable("DAPR_GRPC_PORT", "4001"); +} + +// Start the app - this is the point where we connect to the Dapr sidecar to +// listen for workflow work-items to execute. +using var host = builder.Build(); +host.Start(); + +Console.WriteLine("Init Client"); +DaprClient daprClient = new DaprClientBuilder().Build(); + +Console.WriteLine("Wait for the sidecar to become available"); + +while (!await daprClient.CheckHealthAsync()) +{ + Thread.Sleep(TimeSpan.FromSeconds(5)); +} + +using (daprClient) +{ + // Start the workflow + Console.WriteLine("Starting workflow..."); + await daprClient.WaitForSidecarAsync(); + + await daprClient.StartWorkflowAsync( + workflowComponent: DaprWorkflowComponent, + workflowName: nameof(WorkflowFanOutFanIn), + instanceId: "demo-workflow1", + input: "test input"); + + + await daprClient.WaitForWorkflowCompletionAsync( + workflowComponent: DaprWorkflowComponent, + instanceId: "demo-workflow1"); +} \ No newline at end of file diff --git a/examples/Workflow/WorkflowFanOutFanIn/WorkflowFanOutFanIn.csproj b/examples/Workflow/WorkflowFanOutFanIn/WorkflowFanOutFanIn.csproj new file mode 100644 index 000000000..9acf06b47 --- /dev/null +++ b/examples/Workflow/WorkflowFanOutFanIn/WorkflowFanOutFanIn.csproj @@ -0,0 +1,14 @@ + + + + + + + + Exe + net6 + enable + 612,618 + + + diff --git a/examples/Workflow/WorkflowFanOutFanIn/Workflows/DemoWorkflow.cs b/examples/Workflow/WorkflowFanOutFanIn/Workflows/DemoWorkflow.cs new file mode 100644 index 000000000..af0509852 --- /dev/null +++ b/examples/Workflow/WorkflowFanOutFanIn/Workflows/DemoWorkflow.cs @@ -0,0 +1,37 @@ +using Dapr.Workflow; +using WorkflowFanOutFanIn.Activities; + +namespace WorkflowFanOutFanIn.Workflows +{ + public class DemoWorkflow : Workflow + { + readonly WorkflowTaskOptions defaultActivityRetryOptions = new WorkflowTaskOptions + { + // NOTE: Beware that changing the number of retries is a breaking change for existing workflows. + RetryPolicy = new WorkflowRetryPolicy( + maxNumberOfAttempts: 3, + firstRetryInterval: TimeSpan.FromSeconds(5)), + }; + + public override async Task RunAsync(WorkflowContext context, string input) + { + // Console.WriteLine("Workflow Started."); + // try + // { + // Task t1 =context.CallActivityAsync(nameof(NotifyActivity),"calling task 1 ..."); + // Task t2 =context.CallActivityAsync(nameof(NotifyActivity),"calling task 2 ..."); + // Task t3 =context.CallActivityAsync(nameof(NotifyActivity),"calling task 3 ..."); + // await Task.WhenAll(t1, t2, t3); + // } + // catch (Exception ex) + // { + // Console.WriteLine("error!"); + // Console.WriteLine(ex.Message); + // Console.WriteLine("error!"); + // } + await context.CreateTimer(TimeSpan.FromSeconds(5)); + Console.WriteLine("Workflow Completed."); + return "Workflow Completed."; + } + } +}