diff --git a/Producer/CosmosDb/Functions.cs b/Producer/CosmosDb/Functions.cs new file mode 100644 index 0000000..06bbae3 --- /dev/null +++ b/Producer/CosmosDb/Functions.cs @@ -0,0 +1,134 @@ +using System; +using System.IO; +using System.Linq; +using System.Net.Http; +using System.Reflection; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs; +using Microsoft.Azure.WebJobs.Host; +using Microsoft.Azure.WebJobs.Extensions.Http; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json.Linq; + +namespace Producer.CosmosDb +{ + public static class Functions + { + [FunctionName(nameof(PostToCosmosDb))] + public static async Task PostToCosmosDb( + [HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestMessage request, + [OrchestrationClient]DurableOrchestrationClient client, + ILogger log) + { + var inputObject = JObject.Parse(await request.Content.ReadAsStringAsync()); + var numberOfMessages = inputObject.Value(@"NumberOfMessages"); + + var workTime = -1; + if (inputObject.TryGetValue(@"WorkTime", out var workTimeVal)) + { + workTime = workTimeVal.Value(); + } + + var testRunId = Guid.NewGuid().ToString(); + var orchId = await client.StartNewAsync(nameof(GenerateMessagesForCosmosDb), + (numberOfMessages, testRunId, workTime)); + + log.LogTrace($@"Kicked off {numberOfMessages} message creation..."); + + return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(request, orchId, TimeSpan.FromMinutes(2)); + } + + [FunctionName(nameof(GenerateMessagesForCosmosDb))] + public static async Task GenerateMessagesForCosmosDb( + [OrchestrationTrigger]DurableOrchestrationContext ctx, + ILogger log) + { + var req = ctx.GetInput<(int numOfMessages, string testRunId, int workTime)>(); + + var activities = Enumerable.Empty>().ToList(); + for (var i = 0; i < req.numOfMessages; i++) + { + try + { + activities.Add(ctx.CallActivityAsync(nameof(PostMessageToCosmosDb), (Guid.NewGuid(), req.testRunId, req.workTime))); + } + catch (Exception ex) + { + log.LogError(ex, @"An error occurred queuing message generation to Cosmos DB"); + return JObject.FromObject(new { Error = $@"An error occurred executing orchestration {ctx.InstanceId}: {ex.ToString()}" }); + } + } + + return (await Task.WhenAll(activities)).All(r => r) // return 'true' if all are 'true', 'false' otherwise + ? JObject.FromObject(new { TestRunId = req.testRunId }) + : JObject.FromObject(new { Error = $@"An error occurred executing orchestration {ctx.InstanceId}" }); + } + + private const int MAX_RETRY_ATTEMPTS = 10; + private static readonly Lazy _messageContent = new Lazy(() => + { + using (var sr = new StreamReader(Assembly.GetExecutingAssembly().GetManifestResourceStream($@"Producer.messagecontent.txt"))) + { + return sr.ReadToEnd(); + } + }); + + [FunctionName(nameof(PostMessageToCosmosDb))] + public static async Task PostMessageToCosmosDb([ActivityTrigger]DurableActivityContext ctx, + [CosmosDB(databaseName: "%CosmosDbDatabaseName%", + collectionName: "%CosmosDbCollectionName%", + ConnectionStringSetting = @"CosmosDbConnection", + PartitionKey = "/TestRunId", + CreateIfNotExists = true)]IAsyncCollector queueMessages, + ILogger log) + { + var msgDetails = ctx.GetInput<(Guid id, string runId, int workTime)>(); + var retryCount = 0; + var retry = false; + + var messageToPost = JObject.FromObject(new + { + Content = _messageContent.Value, + EnqueueTimeUtc = DateTime.UtcNow, + id = msgDetails.id, // <- cosmos id field? + TestRunId = msgDetails.runId // <- cosmos partition field? + }); + + if (msgDetails.workTime > 0) + { + messageToPost.Add(@"workTime", msgDetails.workTime); + } + + do + { + retryCount++; + try + { + await queueMessages.AddAsync(messageToPost); + retry = false; + } + catch (Exception ex) + { + log.LogError(ex, $@"Error posting message {msgDetails.id}. Retrying..."); + retry = true; + } + + if (retry && retryCount >= MAX_RETRY_ATTEMPTS) + { + log.LogError($@"Unable to post message {msgDetails.id} after {retryCount} attempt(s). Giving up."); + break; + } + else + { +#if DEBUG + log.LogTrace($@"Posted message {msgDetails.id} (Size: {_messageContent.Value.Length} bytes) in {retryCount} attempt(s)"); +#else + log.LogTrace($@"Posted message in {retryCount} attempt(s)"); +#endif + } + } while (retry); + + return true; + } + } +} diff --git a/Producer/Producer.csproj b/Producer/Producer.csproj index 7a41339..83ffde6 100644 --- a/Producer/Producer.csproj +++ b/Producer/Producer.csproj @@ -13,6 +13,7 @@ + @@ -27,6 +28,10 @@ PreserveNewest Never + + PreserveNewest + Never + PreserveNewest Never diff --git a/Producer/sample.local.settings.json b/Producer/sample.local.settings.json index 5f9310b..1cac534 100644 --- a/Producer/sample.local.settings.json +++ b/Producer/sample.local.settings.json @@ -13,6 +13,10 @@ "StorageQueueConnection": "DefaultEndpointsProtocol=https;AccountName=...", "StorageQueueName": "sample", + "CosmosDbConnection": null, + "CosmosDbDatabaseName": null, + "CosmosDbCollectionName": null, + "FUNCTIONS_WORKER_RUNTIME": "dotnet", "AzureWebJobsStorage": "UseDevelopmentStorage=true", "DFTaskHubName": "producerlocalTaskHub"