Skip to content
This repository has been archived by the owner on Oct 26, 2023. It is now read-only.

Commit

Permalink
CosmosDB Producer (#9)
Browse files Browse the repository at this point in the history
* added initial cosmosdb function

* wip

* small refactoring

* working CosmosDB producer sending messages to Cosmos

Co-authored-by: Brandon H <[email protected]>
  • Loading branch information
abubinski and brandonh-msft authored Jun 2, 2020
1 parent 1ac4515 commit 82aa8f4
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 0 deletions.
134 changes: 134 additions & 0 deletions Producer/CosmosDb/Functions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
using System;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;

namespace Producer.CosmosDb
{
public static class Functions
{
[FunctionName(nameof(PostToCosmosDb))]
public static async Task<HttpResponseMessage> PostToCosmosDb(
[HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestMessage request,
[OrchestrationClient]DurableOrchestrationClient client,
ILogger log)
{
var inputObject = JObject.Parse(await request.Content.ReadAsStringAsync());
var numberOfMessages = inputObject.Value<int>(@"NumberOfMessages");

var workTime = -1;
if (inputObject.TryGetValue(@"WorkTime", out var workTimeVal))
{
workTime = workTimeVal.Value<int>();
}

var testRunId = Guid.NewGuid().ToString();
var orchId = await client.StartNewAsync(nameof(GenerateMessagesForCosmosDb),
(numberOfMessages, testRunId, workTime));

log.LogTrace($@"Kicked off {numberOfMessages} message creation...");

return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(request, orchId, TimeSpan.FromMinutes(2));
}

[FunctionName(nameof(GenerateMessagesForCosmosDb))]
public static async Task<JObject> GenerateMessagesForCosmosDb(
[OrchestrationTrigger]DurableOrchestrationContext ctx,
ILogger log)
{
var req = ctx.GetInput<(int numOfMessages, string testRunId, int workTime)>();

var activities = Enumerable.Empty<Task<bool>>().ToList();
for (var i = 0; i < req.numOfMessages; i++)
{
try
{
activities.Add(ctx.CallActivityAsync<bool>(nameof(PostMessageToCosmosDb), (Guid.NewGuid(), req.testRunId, req.workTime)));
}
catch (Exception ex)
{
log.LogError(ex, @"An error occurred queuing message generation to Cosmos DB");
return JObject.FromObject(new { Error = $@"An error occurred executing orchestration {ctx.InstanceId}: {ex.ToString()}" });
}
}

return (await Task.WhenAll(activities)).All(r => r) // return 'true' if all are 'true', 'false' otherwise
? JObject.FromObject(new { TestRunId = req.testRunId })
: JObject.FromObject(new { Error = $@"An error occurred executing orchestration {ctx.InstanceId}" });
}

private const int MAX_RETRY_ATTEMPTS = 10;
private static readonly Lazy<string> _messageContent = new Lazy<string>(() =>
{
using (var sr = new StreamReader(Assembly.GetExecutingAssembly().GetManifestResourceStream($@"Producer.messagecontent.txt")))
{
return sr.ReadToEnd();
}
});

[FunctionName(nameof(PostMessageToCosmosDb))]
public static async Task<bool> PostMessageToCosmosDb([ActivityTrigger]DurableActivityContext ctx,
[CosmosDB(databaseName: "%CosmosDbDatabaseName%",
collectionName: "%CosmosDbCollectionName%",
ConnectionStringSetting = @"CosmosDbConnection",
PartitionKey = "/TestRunId",
CreateIfNotExists = true)]IAsyncCollector<JObject> queueMessages,
ILogger log)
{
var msgDetails = ctx.GetInput<(Guid id, string runId, int workTime)>();
var retryCount = 0;
var retry = false;

var messageToPost = JObject.FromObject(new
{
Content = _messageContent.Value,
EnqueueTimeUtc = DateTime.UtcNow,
id = msgDetails.id, // <- cosmos id field?
TestRunId = msgDetails.runId // <- cosmos partition field?
});

if (msgDetails.workTime > 0)
{
messageToPost.Add(@"workTime", msgDetails.workTime);
}

do
{
retryCount++;
try
{
await queueMessages.AddAsync(messageToPost);
retry = false;
}
catch (Exception ex)
{
log.LogError(ex, $@"Error posting message {msgDetails.id}. Retrying...");
retry = true;
}

if (retry && retryCount >= MAX_RETRY_ATTEMPTS)
{
log.LogError($@"Unable to post message {msgDetails.id} after {retryCount} attempt(s). Giving up.");
break;
}
else
{
#if DEBUG
log.LogTrace($@"Posted message {msgDetails.id} (Size: {_messageContent.Value.Length} bytes) in {retryCount} attempt(s)");
#else
log.LogTrace($@"Posted message in {retryCount} attempt(s)");
#endif
}
} while (retry);

return true;
}
}
}
5 changes: 5 additions & 0 deletions Producer/Producer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<PackageReference Include="AWSSDK.SQS" Version="3.3.100.37" />
<PackageReference Include="Confluent.Kafka" Version="1.2.2" />
<PackageReference Include="Google.Cloud.PubSub.V1" Version="1.0.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.CosmosDB" Version="3.0.7" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="1.8.2" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.EventHubs" Version="3.0.5" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.ServiceBus" Version="3.0.5" />
Expand All @@ -27,6 +28,10 @@
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
<CopyToPublishDirectory>Never</CopyToPublishDirectory>
</None>
<None Update="local.settings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
<CopyToPublishDirectory>Never</CopyToPublishDirectory>
</None>
<None Update="sample.local.settings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
<CopyToPublishDirectory>Never</CopyToPublishDirectory>
Expand Down
4 changes: 4 additions & 0 deletions Producer/sample.local.settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
"StorageQueueConnection": "DefaultEndpointsProtocol=https;AccountName=...",
"StorageQueueName": "sample",

"CosmosDbConnection": null,
"CosmosDbDatabaseName": null,
"CosmosDbCollectionName": null,

"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"DFTaskHubName": "producerlocalTaskHub"
Expand Down

0 comments on commit 82aa8f4

Please sign in to comment.