Skip to content
This repository has been archived by the owner on Oct 26, 2023. It is now read-only.

Commit

Permalink
Upgrading project to Functions v3 and DF v2.
Browse files Browse the repository at this point in the history
Updating all nupkgs to latest GA versions
  • Loading branch information
brandonh-msft committed Jul 23, 2020
1 parent 19d711f commit 4b7bad4
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 104 deletions.
12 changes: 6 additions & 6 deletions Consumer/Consumer.csproj
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp2.1</TargetFramework>
<AzureFunctionsVersion>v2</AzureFunctionsVersion>
<TargetFramework>netcoreapp3.1</TargetFramework>
<AzureFunctionsVersion>v3</AzureFunctionsVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.EventGrid" Version="2.1.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.EventHubs" Version="3.0.5" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.ServiceBus" Version="3.1.0-beta3" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="3.0.6" />
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="1.0.29" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.EventHubs" Version="4.1.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.ServiceBus" Version="4.1.2" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="4.0.2" />
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="3.0.9" />
</ItemGroup>
<ItemGroup>
<None Update="host.json">
Expand Down
2 changes: 1 addition & 1 deletion Consumer/EventHubs/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static async System.Threading.Tasks.Task EventHubProcessorAsync(

jsonMessage.Add(@"_elapsedTimeMs", elapsedTimeMs);

log.LogTrace($@"[{ehMessage.Properties[@"TestRunId"]}]: Message received at {timestamp}: {jsonMessage.ToString()}");
log.LogTrace($@"[{ehMessage.Properties[@"TestRunId"]}]: Message received at {timestamp}: {jsonMessage}");

log.LogMetric("messageProcessTimeMs",
elapsedTimeMs,
Expand Down
2 changes: 1 addition & 1 deletion Consumer/ServiceBus/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public static async Task ServiceBusQueueProcessorAsync(
ILogger log)
{
var timestamp = DateTime.UtcNow;
log.LogTrace($@"[{sbMessage.UserProperties[@"TestRunId"]}]: Message received at {timestamp}: {JObject.FromObject(sbMessage).ToString()}");
log.LogTrace($@"[{sbMessage.UserProperties[@"TestRunId"]}]: Message received at {timestamp}: {JObject.FromObject(sbMessage)}");

var enqueuedTime = sbMessage.ScheduledEnqueueTimeUtc;
var elapsedTimeMs = (timestamp - enqueuedTime).TotalMilliseconds;
Expand Down
2 changes: 1 addition & 1 deletion Consumer/StorageQueues/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public static async System.Threading.Tasks.Task StorageQueueProcessorAsync(
await collector.AddAsync(collectorItem.ToString());

jsonMessage.Add(@"_elapsedTimeMs", elapsedTimeMs);
log.LogTrace($@"[{jsonContent.Value<string>(@"TestRunId")}]: Message received at {timestamp}: {jsonMessage.ToString()}");
log.LogTrace($@"[{jsonContent.Value<string>(@"TestRunId")}]: Message received at {timestamp}: {jsonMessage}");

log.LogMetric("messageProcessTimeMs",
elapsedTimeMs,
Expand Down
31 changes: 15 additions & 16 deletions Producer/CosmosDb/Functions.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
using System;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
Expand All @@ -15,12 +16,12 @@ namespace Producer.CosmosDb
public static class Functions
{
[FunctionName(nameof(PostToCosmosDb))]
public static async Task<HttpResponseMessage> PostToCosmosDb(
[HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestMessage request,
[OrchestrationClient]DurableOrchestrationClient client,
public static async Task<IActionResult> PostToCosmosDb(
[HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequest request,
[DurableClient] IDurableOrchestrationClient client,
ILogger log)
{
var inputObject = JObject.Parse(await request.Content.ReadAsStringAsync());
var inputObject = JObject.Parse(await request.ReadAsStringAsync());
var numberOfMessages = inputObject.Value<int>(@"NumberOfMessages");

var workTime = -1;
Expand All @@ -31,7 +32,7 @@ public static async Task<HttpResponseMessage> PostToCosmosDb(

var testRunId = Guid.NewGuid().ToString();
var orchId = await client.StartNewAsync(nameof(GenerateMessagesForCosmosDb),
(numberOfMessages, testRunId, workTime));
Tuple.Create(numberOfMessages, testRunId, workTime));

log.LogTrace($@"Kicked off {numberOfMessages} message creation...");

Expand All @@ -40,7 +41,7 @@ public static async Task<HttpResponseMessage> PostToCosmosDb(

[FunctionName(nameof(GenerateMessagesForCosmosDb))]
public static async Task<JObject> GenerateMessagesForCosmosDb(
[OrchestrationTrigger]DurableOrchestrationContext ctx,
[OrchestrationTrigger] IDurableOrchestrationContext ctx,
ILogger log)
{
var req = ctx.GetInput<(int numOfMessages, string testRunId, int workTime)>();
Expand All @@ -50,12 +51,12 @@ public static async Task<JObject> GenerateMessagesForCosmosDb(
{
try
{
activities.Add(ctx.CallActivityAsync<bool>(nameof(PostMessageToCosmosDb), (Guid.NewGuid(), req.testRunId, req.workTime)));
activities.Add(ctx.CallActivityAsync<bool>(nameof(PostMessageToCosmosDb), (ctx.NewGuid(), req.testRunId, req.workTime)));
}
catch (Exception ex)
{
log.LogError(ex, @"An error occurred queuing message generation to Cosmos DB");
return JObject.FromObject(new { Error = $@"An error occurred executing orchestration {ctx.InstanceId}: {ex.ToString()}" });
return JObject.FromObject(new { Error = $@"An error occurred executing orchestration {ctx.InstanceId}: {ex}" });
}
}

Expand All @@ -67,14 +68,12 @@ public static async Task<JObject> GenerateMessagesForCosmosDb(
private const int MAX_RETRY_ATTEMPTS = 10;
private static readonly Lazy<string> _messageContent = new Lazy<string>(() =>
{
using (var sr = new StreamReader(Assembly.GetExecutingAssembly().GetManifestResourceStream($@"Producer.messagecontent.txt")))
{
return sr.ReadToEnd();
}
using var sr = new StreamReader(Assembly.GetExecutingAssembly().GetManifestResourceStream($@"Producer.messagecontent.txt"));
return sr.ReadToEnd();
});

[FunctionName(nameof(PostMessageToCosmosDb))]
public static async Task<bool> PostMessageToCosmosDb([ActivityTrigger]DurableActivityContext ctx,
public static async Task<bool> PostMessageToCosmosDb([ActivityTrigger] IDurableActivityContext ctx,
[CosmosDB(databaseName: "%CosmosDbDatabaseName%",
collectionName: "%CosmosDbCollectionName%",
ConnectionStringSetting = @"CosmosDbConnection",
Expand All @@ -90,7 +89,7 @@ public static async Task<bool> PostMessageToCosmosDb([ActivityTrigger]DurableAct
{
Content = _messageContent.Value,
EnqueueTimeUtc = DateTime.UtcNow,
id = msgDetails.id, // <- cosmos id field?
msgDetails.id, // <- cosmos id field?
TestRunId = msgDetails.runId // <- cosmos partition field?
});

Expand Down
24 changes: 12 additions & 12 deletions Producer/EventGrid/Functions.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
using System;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.EventGrid.Models;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.EventGrid;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
Expand All @@ -16,12 +18,12 @@ namespace Producer.EventGrid
public class Functions
{
[FunctionName(nameof(PostToEventGrid))]
public async Task<HttpResponseMessage> PostToEventGrid(
[HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestMessage request,
[OrchestrationClient] DurableOrchestrationClient client,
public async Task<IActionResult> PostToEventGrid(
[HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequest request,
[DurableClient] IDurableOrchestrationClient client,
ILogger log)
{
var inputObject = JObject.Parse(await request.Content.ReadAsStringAsync());
var inputObject = JObject.Parse(await request.ReadAsStringAsync());
var numberOfMessages = inputObject.Value<int>(@"NumberOfMessages");

var workTime = -1;
Expand All @@ -41,7 +43,7 @@ public async Task<HttpResponseMessage> PostToEventGrid(

[FunctionName(nameof(GenerateMessagesForEventGrid))]
public async Task<JObject> GenerateMessagesForEventGrid(
[OrchestrationTrigger] DurableOrchestrationContext ctx,
[OrchestrationTrigger] IDurableOrchestrationContext ctx,
ILogger log)
{
var req = ctx.GetInput<(int numOfMessages, string testRunId, int workTime)>();
Expand All @@ -56,7 +58,7 @@ public async Task<JObject> GenerateMessagesForEventGrid(
catch (Exception ex)
{
log.LogError(ex, @"An error occurred queuing message generation to Storage Queue");
return JObject.FromObject(new { Error = $@"An error occurred executing orchestration {ctx.InstanceId}: {ex.ToString()}" });
return JObject.FromObject(new { Error = $@"An error occurred executing orchestration {ctx.InstanceId}: {ex}" });
}
}

Expand All @@ -69,14 +71,12 @@ public async Task<JObject> GenerateMessagesForEventGrid(
private const int MAX_RETRY_ATTEMPTS = 10;
private static readonly Lazy<string> _messageContent = new Lazy<string>(() =>
{
using (var sr = new StreamReader(Assembly.GetExecutingAssembly().GetManifestResourceStream($@"Producer.messagecontent.txt")))
{
return sr.ReadToEnd();
}
using var sr = new StreamReader(Assembly.GetExecutingAssembly().GetManifestResourceStream($@"Producer.messagecontent.txt"));
return sr.ReadToEnd();
});

[FunctionName(nameof(PostMessageToEventGrid))]
public async Task<bool> PostMessageToEventGrid([ActivityTrigger] DurableActivityContext ctx,
public async Task<bool> PostMessageToEventGrid([ActivityTrigger] IDurableActivityContext ctx,
[EventGrid(TopicEndpointUri = "EventGridTopicEndpoint", TopicKeySetting = "EventGridTopicKey")] IAsyncCollector<EventGridEvent> gridMessages,
ILogger log)
{
Expand Down
28 changes: 14 additions & 14 deletions Producer/EventHubs/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
Expand All @@ -17,12 +19,12 @@ namespace Producer.EventHubs
public static class Functions
{
[FunctionName(nameof(PostToEventHub))]
public static async Task<HttpResponseMessage> PostToEventHub(
[HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestMessage request,
[OrchestrationClient]DurableOrchestrationClient client,
public static async Task<IActionResult> PostToEventHub(
[HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequest request,
[DurableClient] IDurableOrchestrationClient client,
ILogger log)
{
var inputObject = JObject.Parse(await request.Content.ReadAsStringAsync());
var inputObject = JObject.Parse(await request.ReadAsStringAsync());
var numberOfMessagesPerPartition = inputObject.Value<int>(@"NumberOfMessagesPerPartition");
var numberOfPartitions = Convert.ToInt32(Environment.GetEnvironmentVariable("EventHubPartitions"));

Expand Down Expand Up @@ -55,15 +57,15 @@ public static async Task<HttpResponseMessage> PostToEventHub(

[FunctionName(nameof(GenerateMessagesForEventHub))]
public static async Task<JObject> GenerateMessagesForEventHub(
[OrchestrationTrigger]DurableOrchestrationContext ctx,
[OrchestrationTrigger] IDurableOrchestrationContext ctx,
ILogger log)
{
var req = ctx.GetInput<MessagesCreateRequest>();

var messages = Enumerable.Range(1, req.NumberOfMessagesPerPartition)
.Select(m =>
{
var enqueueTime = DateTime.UtcNow;
var enqueueTime = ctx.CurrentUtcDateTime;
return new MessagesSendRequest
{
MessageId = m,
Expand All @@ -82,22 +84,20 @@ public static async Task<JObject> GenerateMessagesForEventHub(
catch (Exception ex)
{
log.LogError(ex, @"An error occurred queuing message generation to Event Hub");
return JObject.FromObject(new { Error = $@"An error occurred executing orchestration {ctx.InstanceId}: {ex.ToString()}" });
return JObject.FromObject(new { Error = $@"An error occurred executing orchestration {ctx.InstanceId}: {ex}" });
}
}

private const int MAX_RETRY_ATTEMPTS = 10;
private static readonly Lazy<string> _messageContent = new Lazy<string>(() =>
{
using (var sr = new StreamReader(Assembly.GetExecutingAssembly().GetManifestResourceStream($@"Producer.messagecontent.txt")))
{
return sr.ReadToEnd();
}
using var sr = new StreamReader(Assembly.GetExecutingAssembly().GetManifestResourceStream($@"Producer.messagecontent.txt"));
return sr.ReadToEnd();
});

[FunctionName(nameof(PostMessagesToEventHub))]
public static async Task<bool> PostMessagesToEventHub([ActivityTrigger]DurableActivityContext ctx,
[EventHub("%EventHubName%", Connection = @"EventHubConnection")]IAsyncCollector<EventData> queueMessages,
public static async Task<bool> PostMessagesToEventHub([ActivityTrigger] IDurableActivityContext ctx,
[EventHub("%EventHubName%", Connection = @"EventHubConnection")] IAsyncCollector<EventData> queueMessages,
ILogger log)
{
var messages = ctx.GetInput<IEnumerable<MessagesSendRequest>>();
Expand Down
31 changes: 15 additions & 16 deletions Producer/EventHubsKafka/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,29 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Reflection;
using System.Threading.Tasks;
using System.Configuration;
using Confluent.Kafka;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using Confluent.Kafka;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace Producer.EventHubsKafka
{
public static class Functions
{
[FunctionName(nameof(PostToEventHubKafka))]
public static async Task<HttpResponseMessage> PostToEventHubKafka(
[HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestMessage request,
[OrchestrationClient]DurableOrchestrationClient client,
public static async Task<IActionResult> PostToEventHubKafka(
[HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequest request,
[DurableClient] IDurableOrchestrationClient client,
ILogger log)
{
var inputObject = JObject.Parse(await request.Content.ReadAsStringAsync());
var inputObject = JObject.Parse(await request.ReadAsStringAsync());
var numberOfMessagesPerPartition = inputObject.Value<int>(@"NumberOfMessagesPerPartition");
var numberOfPartitions = Convert.ToInt32(Environment.GetEnvironmentVariable("EventHubKafkaPartitions"));

Expand Down Expand Up @@ -55,15 +56,15 @@ public static async Task<HttpResponseMessage> PostToEventHubKafka(

[FunctionName(nameof(GenerateMessagesForEventHubKafka))]
public static async Task<JObject> GenerateMessagesForEventHubKafka(
[OrchestrationTrigger]DurableOrchestrationContext ctx,
[OrchestrationTrigger] IDurableOrchestrationContext ctx,
ILogger log)
{
var req = ctx.GetInput<MessagesCreateRequest>();

var messages = Enumerable.Range(1, req.NumberOfMessagesPerPartition)
.Select(m =>
{
var enqueueTime = DateTime.UtcNow;
var enqueueTime = ctx.CurrentUtcDateTime;
return new MessagesSendRequest
{
MessageId = m,
Expand All @@ -82,21 +83,19 @@ public static async Task<JObject> GenerateMessagesForEventHubKafka(
catch (Exception ex)
{
log.LogError(ex, @"An error occurred queuing message generation to Event Hub");
return JObject.FromObject(new { Error = $@"An error occurred executing orchestration {ctx.InstanceId}: {ex.ToString()}" });
return JObject.FromObject(new { Error = $@"An error occurred executing orchestration {ctx.InstanceId}: {ex}" });
}
}

private const int MAX_RETRY_ATTEMPTS = 10;
private static readonly Lazy<string> _messageContent = new Lazy<string>(() =>
{
using (var sr = new StreamReader(Assembly.GetExecutingAssembly().GetManifestResourceStream($@"Producer.messagecontent.txt")))
{
return sr.ReadToEnd();
}
using var sr = new StreamReader(Assembly.GetExecutingAssembly().GetManifestResourceStream($@"Producer.messagecontent.txt"));
return sr.ReadToEnd();
});

[FunctionName(nameof(PostMessagesToEventHubKafka))]
public static async Task<bool> PostMessagesToEventHubKafka([ActivityTrigger]DurableActivityContext ctx,
public static async Task<bool> PostMessagesToEventHubKafka([ActivityTrigger] IDurableActivityContext ctx,
ILogger log)
{
string brokerList = Environment.GetEnvironmentVariable("EventHubKafkaFQDN");
Expand Down
Loading

0 comments on commit 4b7bad4

Please sign in to comment.