Skip to content
This repository has been archived by the owner on Oct 26, 2023. It is now read-only.

Commit

Permalink
Updated Event Hub functions to send messages to all partitions by (#7)
Browse files Browse the repository at this point in the history
default. Closes #6
  • Loading branch information
rasavant-ms authored and brandonh-msft committed Jan 21, 2020
1 parent f6453e4 commit 1b03402
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 29 deletions.
35 changes: 16 additions & 19 deletions Producer/EventHubs/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public static async Task<HttpResponseMessage> PostToEventHub(
{
var inputObject = JObject.Parse(await request.Content.ReadAsStringAsync());
var numberOfMessagesPerPartition = inputObject.Value<int>(@"NumberOfMessagesPerPartition");
var numberOfPartitions = inputObject.Value<int>(@"NumberOfPartitions");
var numberOfPartitions = Convert.ToInt32(Environment.GetEnvironmentVariable("EventHubPartitions"));

var workTime = -1;
if (inputObject.TryGetValue(@"WorkTime", out var workTimeVal))
Expand All @@ -37,37 +37,35 @@ public static async Task<HttpResponseMessage> PostToEventHub(
for (var c = 1; c <= numberOfPartitions; c++)
{
var partitionKey = Guid.NewGuid().ToString();
var orchId = await client.StartNewAsync(nameof(GenerateMessagesForEventHubPartition),
new PartitionCreateRequest
var orchId = await client.StartNewAsync(nameof(GenerateMessagesForEventHub),
new MessagesCreateRequest
{
TestRunId = testRunId,
PartitionId = partitionKey,
NumberOfMessagesPerPartition = numberOfMessagesPerPartition,
ConsumerWorkTime = workTime,
});

log.LogTrace($@"Kicked off message creation for session {partitionKey}...");
log.LogTrace($@"Kicked off message creation for session {c}...");

orchestrationIds.Add(orchId);
}

return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(request, orchestrationIds.First(), TimeSpan.FromMinutes(2));
}

[FunctionName(nameof(GenerateMessagesForEventHubPartition))]
public static async Task<JObject> GenerateMessagesForEventHubPartition(
[FunctionName(nameof(GenerateMessagesForEventHub))]
public static async Task<JObject> GenerateMessagesForEventHub(
[OrchestrationTrigger]DurableOrchestrationContext ctx,
ILogger log)
{
var req = ctx.GetInput<PartitionCreateRequest>();
var req = ctx.GetInput<MessagesCreateRequest>();

var messages = Enumerable.Range(1, req.NumberOfMessagesPerPartition)
.Select(m =>
{
var enqueueTime = DateTime.UtcNow;
return new PartitionMessagesCreateRequest
return new MessagesSendRequest
{
PartitionId = req.PartitionId,
MessageId = m,
EnqueueTimeUtc = enqueueTime,
TestRunId = req.TestRunId,
Expand All @@ -77,7 +75,7 @@ public static async Task<JObject> GenerateMessagesForEventHubPartition(

try
{
return await ctx.CallActivityAsync<bool>(nameof(PostMessagesToEventHubPartition), messages)
return await ctx.CallActivityAsync<bool>(nameof(PostMessagesToEventHub), messages)
? JObject.FromObject(new { req.TestRunId })
: JObject.FromObject(new { Error = $@"An error occurred executing orchestration {ctx.InstanceId}" });
}
Expand All @@ -97,18 +95,17 @@ public static async Task<JObject> GenerateMessagesForEventHubPartition(
}
});

[FunctionName(nameof(PostMessagesToEventHubPartition))]
public static async Task<bool> PostMessagesToEventHubPartition([ActivityTrigger]DurableActivityContext ctx,
[FunctionName(nameof(PostMessagesToEventHub))]
public static async Task<bool> PostMessagesToEventHub([ActivityTrigger]DurableActivityContext ctx,
[EventHub("%EventHubName%", Connection = @"EventHubConnection")]IAsyncCollector<EventData> queueMessages,
ILogger log)
{
var messages = ctx.GetInput<IEnumerable<PartitionMessagesCreateRequest>>();
var messages = ctx.GetInput<IEnumerable<MessagesSendRequest>>();

foreach (var messageToPost in messages.Select(m =>
{
var r = new EventData(Encoding.Default.GetBytes(_messageContent.Value));
r.Properties.Add(@"MessageId", m.MessageId);
r.Properties.Add(@"PartitionId", m.PartitionId);
r.Properties.Add(@"EnqueueTimeUtc", m.EnqueueTimeUtc);
r.Properties.Add(@"TestRunId", m.TestRunId);
Expand All @@ -133,21 +130,21 @@ public static async Task<bool> PostMessagesToEventHubPartition([ActivityTrigger]
}
catch (Exception ex)
{
log.LogError(ex, $@"Error posting message for partition '{messageToPost.Properties[@"PartitionId"]}'. Retrying...");
log.LogError(ex, $@"Error posting message with TestRunID '{messageToPost.Properties[@"TestRunId"]}' and MessageId '{messageToPost.Properties[@"MessageId"]}'. Retrying...");
retry = true;
}

if (retry && retryCount >= MAX_RETRY_ATTEMPTS)
{
log.LogError($@"Unable to post message to {messageToPost.Properties[@"PartitionId"]} after {retryCount} attempt(s). Giving up.");
log.LogError($@"Unable to post message with TestRunID '{messageToPost.Properties[@"TestRunId"]}' and MessageId '{messageToPost.Properties[@"MessageId"]}' after {retryCount} attempt(s). Giving up.");
break;
}
else
{
#if DEBUG
log.LogTrace($@"Posted message {messageToPost.Properties[@"MessageId"]} (Size: {messageToPost.Body.Count} bytes) for partition '{messageToPost.Properties[@"PartitionId"]}' in {retryCount} attempt(s)");
log.LogTrace($@"Posted message {messageToPost.Properties[@"MessageId"]} (Size: {messageToPost.Body.Count} bytes) in {retryCount} attempt(s)");
#else
log.LogTrace($@"Posted message for partition '{messageToPost.Properties[@"PartitionId"]}' in {retryCount} attempt(s)");
log.LogTrace($@"Posted message with TestRunID '{messageToPost.Properties[@"TestRunId"]}' and MessageId {messageToPost.Properties[@"MessageId"]} in {retryCount} attempt(s)");
#endif
}
} while (retry);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
namespace Producer.EventHubs
{
internal class PartitionCreateRequest
internal class MessagesCreateRequest
{
public string PartitionId { get; set; }
public int NumberOfMessagesPerPartition { get; set; }
public string TestRunId { get; set; }
public int ConsumerWorkTime { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

namespace Producer.EventHubs
{
internal class PartitionMessagesCreateRequest
internal class MessagesSendRequest
{
public string PartitionId { get; set; }
public int MessageId { get; set; }
public DateTime EnqueueTimeUtc { get; set; }
public string TestRunId { get; set; }
Expand Down
1 change: 1 addition & 0 deletions Producer/sample.local.settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"ServiceBusQueueName": "sample",
"EventHubConnection": "Endpoint=sb://...no-entity-path",
"EventHubName": "sample",
"EventHubPartitions": "32",
"StorageQueueConnection": "DefaultEndpointsProtocol=https;AccountName=...",
"StorageQueueName": "sample",

Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ Content-Type: application/json
cache-control: no-cache
{
"NumberOfPartitions": 2,
"NumberOfMessagesPerPartition": 2
}
```

Will post two messages across two paritions to the Event Hub specified by the `EventHubConnection` and `EventHubName` settings in your `local.settings.json` file or - when published to Azure - the Function App's application settings.
Will post two messages across per partition to the Event Hub specified by the `EventHubConnection` and `EventHubName` settings in your `local.settings.json` file or - when published to Azure - the Function App's application settings.

The number of messages per partition will differ by no more than 1.

## Storage Queues

Expand Down Expand Up @@ -84,7 +85,7 @@ You can deploy the solution in this repo directly to Azure by simply executing `
* Service Bus **Standard** namespace with a `sample` queue
* Event Hub **Basic** namespace
* A `collector` hub w/ 32 partitions - this is where each consumer posts messages when they consume from their source
* An `sample` hub with 2 partitions - this is where the Producer will post messages for the EH scenario
* A `sample` hub with 32 partitions - this is where the Producer will post messages for the EH scenario
* Azure Data Explorer **Dev** instance ingesting data from the above Event Hub
* Azure Storage instance for use by the Durable Functions and the Storage Queue producer/consumer paths (`sample` queue created)
* 1 Azure Function app with the Producer Function code
Expand Down
10 changes: 7 additions & 3 deletions azuredeploy.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@
"sku": {
"name": "Basic",
"tier": "Basic",
"capacity": 1
"capacity": 13
},
"properties": {
"isAutoInflateEnabled": false,
"maximumThroughputUnits": 0,
"isAutoInflateEnabled": true,
"maximumThroughputUnits": 20,
"kafkaEnabled": false
}
},
Expand Down Expand Up @@ -390,6 +390,10 @@
"name": "EventHubName",
"value": "sample"
},
{
"name": "EventHubPartitions",
"value": "32"
},
{
"name": "FUNCTIONS_EXTENSION_RUNTIME",
"value": "dotnet"
Expand Down

0 comments on commit 1b03402

Please sign in to comment.