From 1b034023b5da8208f87c74f7d71a88653e181528 Mon Sep 17 00:00:00 2001 From: "Rajasa (RJ)" Date: Tue, 21 Jan 2020 14:40:36 -0800 Subject: [PATCH] Updated Event Hub functions to send messages to all partitions by (#7) default. Closes #6 --- Producer/EventHubs/Functions.cs | 35 +++++++++---------- ...ateRequest.cs => MessagesCreateRequest.cs} | 3 +- ...reateRequest.cs => MessagesSendRequest.cs} | 3 +- Producer/sample.local.settings.json | 1 + README.md | 7 ++-- azuredeploy.json | 10 ++++-- 6 files changed, 30 insertions(+), 29 deletions(-) rename Producer/EventHubs/{PartitionCreateRequest.cs => MessagesCreateRequest.cs} (69%) rename Producer/EventHubs/{PartitionMessagesCreateRequest.cs => MessagesSendRequest.cs} (72%) diff --git a/Producer/EventHubs/Functions.cs b/Producer/EventHubs/Functions.cs index 56b59fd..53ff939 100644 --- a/Producer/EventHubs/Functions.cs +++ b/Producer/EventHubs/Functions.cs @@ -24,7 +24,7 @@ public static async Task PostToEventHub( { var inputObject = JObject.Parse(await request.Content.ReadAsStringAsync()); var numberOfMessagesPerPartition = inputObject.Value(@"NumberOfMessagesPerPartition"); - var numberOfPartitions = inputObject.Value(@"NumberOfPartitions"); + var numberOfPartitions = Convert.ToInt32(Environment.GetEnvironmentVariable("EventHubPartitions")); var workTime = -1; if (inputObject.TryGetValue(@"WorkTime", out var workTimeVal)) @@ -37,16 +37,15 @@ public static async Task PostToEventHub( for (var c = 1; c <= numberOfPartitions; c++) { var partitionKey = Guid.NewGuid().ToString(); - var orchId = await client.StartNewAsync(nameof(GenerateMessagesForEventHubPartition), - new PartitionCreateRequest + var orchId = await client.StartNewAsync(nameof(GenerateMessagesForEventHub), + new MessagesCreateRequest { TestRunId = testRunId, - PartitionId = partitionKey, NumberOfMessagesPerPartition = numberOfMessagesPerPartition, ConsumerWorkTime = workTime, }); - log.LogTrace($@"Kicked off message creation for session {partitionKey}..."); + log.LogTrace($@"Kicked off message creation for session {c}..."); orchestrationIds.Add(orchId); } @@ -54,20 +53,19 @@ public static async Task PostToEventHub( return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(request, orchestrationIds.First(), TimeSpan.FromMinutes(2)); } - [FunctionName(nameof(GenerateMessagesForEventHubPartition))] - public static async Task GenerateMessagesForEventHubPartition( + [FunctionName(nameof(GenerateMessagesForEventHub))] + public static async Task GenerateMessagesForEventHub( [OrchestrationTrigger]DurableOrchestrationContext ctx, ILogger log) { - var req = ctx.GetInput(); + var req = ctx.GetInput(); var messages = Enumerable.Range(1, req.NumberOfMessagesPerPartition) .Select(m => { var enqueueTime = DateTime.UtcNow; - return new PartitionMessagesCreateRequest + return new MessagesSendRequest { - PartitionId = req.PartitionId, MessageId = m, EnqueueTimeUtc = enqueueTime, TestRunId = req.TestRunId, @@ -77,7 +75,7 @@ public static async Task GenerateMessagesForEventHubPartition( try { - return await ctx.CallActivityAsync(nameof(PostMessagesToEventHubPartition), messages) + return await ctx.CallActivityAsync(nameof(PostMessagesToEventHub), messages) ? JObject.FromObject(new { req.TestRunId }) : JObject.FromObject(new { Error = $@"An error occurred executing orchestration {ctx.InstanceId}" }); } @@ -97,18 +95,17 @@ public static async Task GenerateMessagesForEventHubPartition( } }); - [FunctionName(nameof(PostMessagesToEventHubPartition))] - public static async Task PostMessagesToEventHubPartition([ActivityTrigger]DurableActivityContext ctx, + [FunctionName(nameof(PostMessagesToEventHub))] + public static async Task PostMessagesToEventHub([ActivityTrigger]DurableActivityContext ctx, [EventHub("%EventHubName%", Connection = @"EventHubConnection")]IAsyncCollector queueMessages, ILogger log) { - var messages = ctx.GetInput>(); + var messages = ctx.GetInput>(); foreach (var messageToPost in messages.Select(m => { var r = new EventData(Encoding.Default.GetBytes(_messageContent.Value)); r.Properties.Add(@"MessageId", m.MessageId); - r.Properties.Add(@"PartitionId", m.PartitionId); r.Properties.Add(@"EnqueueTimeUtc", m.EnqueueTimeUtc); r.Properties.Add(@"TestRunId", m.TestRunId); @@ -133,21 +130,21 @@ public static async Task PostMessagesToEventHubPartition([ActivityTrigger] } catch (Exception ex) { - log.LogError(ex, $@"Error posting message for partition '{messageToPost.Properties[@"PartitionId"]}'. Retrying..."); + log.LogError(ex, $@"Error posting message with TestRunID '{messageToPost.Properties[@"TestRunId"]}' and MessageId '{messageToPost.Properties[@"MessageId"]}'. Retrying..."); retry = true; } if (retry && retryCount >= MAX_RETRY_ATTEMPTS) { - log.LogError($@"Unable to post message to {messageToPost.Properties[@"PartitionId"]} after {retryCount} attempt(s). Giving up."); + log.LogError($@"Unable to post message with TestRunID '{messageToPost.Properties[@"TestRunId"]}' and MessageId '{messageToPost.Properties[@"MessageId"]}' after {retryCount} attempt(s). Giving up."); break; } else { #if DEBUG - log.LogTrace($@"Posted message {messageToPost.Properties[@"MessageId"]} (Size: {messageToPost.Body.Count} bytes) for partition '{messageToPost.Properties[@"PartitionId"]}' in {retryCount} attempt(s)"); + log.LogTrace($@"Posted message {messageToPost.Properties[@"MessageId"]} (Size: {messageToPost.Body.Count} bytes) in {retryCount} attempt(s)"); #else - log.LogTrace($@"Posted message for partition '{messageToPost.Properties[@"PartitionId"]}' in {retryCount} attempt(s)"); + log.LogTrace($@"Posted message with TestRunID '{messageToPost.Properties[@"TestRunId"]}' and MessageId {messageToPost.Properties[@"MessageId"]} in {retryCount} attempt(s)"); #endif } } while (retry); diff --git a/Producer/EventHubs/PartitionCreateRequest.cs b/Producer/EventHubs/MessagesCreateRequest.cs similarity index 69% rename from Producer/EventHubs/PartitionCreateRequest.cs rename to Producer/EventHubs/MessagesCreateRequest.cs index 4d1a431..1343216 100644 --- a/Producer/EventHubs/PartitionCreateRequest.cs +++ b/Producer/EventHubs/MessagesCreateRequest.cs @@ -1,8 +1,7 @@ namespace Producer.EventHubs { - internal class PartitionCreateRequest + internal class MessagesCreateRequest { - public string PartitionId { get; set; } public int NumberOfMessagesPerPartition { get; set; } public string TestRunId { get; set; } public int ConsumerWorkTime { get; set; } diff --git a/Producer/EventHubs/PartitionMessagesCreateRequest.cs b/Producer/EventHubs/MessagesSendRequest.cs similarity index 72% rename from Producer/EventHubs/PartitionMessagesCreateRequest.cs rename to Producer/EventHubs/MessagesSendRequest.cs index 466343f..b7b145a 100644 --- a/Producer/EventHubs/PartitionMessagesCreateRequest.cs +++ b/Producer/EventHubs/MessagesSendRequest.cs @@ -2,9 +2,8 @@ namespace Producer.EventHubs { - internal class PartitionMessagesCreateRequest + internal class MessagesSendRequest { - public string PartitionId { get; set; } public int MessageId { get; set; } public DateTime EnqueueTimeUtc { get; set; } public string TestRunId { get; set; } diff --git a/Producer/sample.local.settings.json b/Producer/sample.local.settings.json index a2931cc..50188f8 100644 --- a/Producer/sample.local.settings.json +++ b/Producer/sample.local.settings.json @@ -5,6 +5,7 @@ "ServiceBusQueueName": "sample", "EventHubConnection": "Endpoint=sb://...no-entity-path", "EventHubName": "sample", + "EventHubPartitions": "32", "StorageQueueConnection": "DefaultEndpointsProtocol=https;AccountName=...", "StorageQueueName": "sample", diff --git a/README.md b/README.md index 8cc66f1..c5b27c9 100644 --- a/README.md +++ b/README.md @@ -43,12 +43,13 @@ Content-Type: application/json cache-control: no-cache { - "NumberOfPartitions": 2, "NumberOfMessagesPerPartition": 2 } ``` -Will post two messages across two paritions to the Event Hub specified by the `EventHubConnection` and `EventHubName` settings in your `local.settings.json` file or - when published to Azure - the Function App's application settings. +Will post two messages across per partition to the Event Hub specified by the `EventHubConnection` and `EventHubName` settings in your `local.settings.json` file or - when published to Azure - the Function App's application settings. + +The number of messages per partition will differ by no more than 1. ## Storage Queues @@ -84,7 +85,7 @@ You can deploy the solution in this repo directly to Azure by simply executing ` * Service Bus **Standard** namespace with a `sample` queue * Event Hub **Basic** namespace * A `collector` hub w/ 32 partitions - this is where each consumer posts messages when they consume from their source - * An `sample` hub with 2 partitions - this is where the Producer will post messages for the EH scenario + * A `sample` hub with 32 partitions - this is where the Producer will post messages for the EH scenario * Azure Data Explorer **Dev** instance ingesting data from the above Event Hub * Azure Storage instance for use by the Durable Functions and the Storage Queue producer/consumer paths (`sample` queue created) * 1 Azure Function app with the Producer Function code diff --git a/azuredeploy.json b/azuredeploy.json index 4426443..fbc0398 100644 --- a/azuredeploy.json +++ b/azuredeploy.json @@ -49,11 +49,11 @@ "sku": { "name": "Basic", "tier": "Basic", - "capacity": 1 + "capacity": 13 }, "properties": { - "isAutoInflateEnabled": false, - "maximumThroughputUnits": 0, + "isAutoInflateEnabled": true, + "maximumThroughputUnits": 20, "kafkaEnabled": false } }, @@ -390,6 +390,10 @@ "name": "EventHubName", "value": "sample" }, + { + "name": "EventHubPartitions", + "value": "32" + }, { "name": "FUNCTIONS_EXTENSION_RUNTIME", "value": "dotnet"