Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#906 -Added methods in status API supports for saving and reading binary data #1116

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
5 changes: 3 additions & 2 deletions examples/Client/StateManagement/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -24,7 +24,8 @@ class Program
new StateStoreExample(),
new StateStoreTransactionsExample(),
new StateStoreETagsExample(),
new BulkStateExample()
new BulkStateExample(),
new StateStoreBinaryExample()
};

static async Task<int> Main(string[] args)
Expand Down
47 changes: 47 additions & 0 deletions examples/Client/StateManagement/StateStoreBinaryExample.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System;
using System.Collections.Generic;
using System.Text;
using Dapr.Client;
using System.Threading.Tasks;
using System.Threading;
using Google.Protobuf;

namespace Samples.Client
{
public class StateStoreBinaryExample : Example
{

private static readonly string stateKeyName = "binarydata";
private static readonly string storeName = "statestore";

public override string DisplayName => "Using the State Store with binary data";

public override async Task RunAsync(CancellationToken cancellationToken)
{
using var client = new DaprClientBuilder().Build();

var state = "Test Binary Data";
// convert variable in to byte array
var stateBytes = Encoding.UTF8.GetBytes(state);
await client.SaveByteStateAsync(storeName, stateKeyName, stateBytes.AsMemory(), cancellationToken: cancellationToken);
Console.WriteLine("Saved State!");

var responseBytes = await client.GetByteStateAsync(storeName, stateKeyName, cancellationToken: cancellationToken);
var savedState = Encoding.UTF8.GetString(ByteString.CopyFrom(responseBytes.Span).ToByteArray());

if (savedState == null)
{
Console.WriteLine("State not found in store");
}
else
{
Console.WriteLine($"Got State: {savedState}");
}

await client.DeleteStateAsync(storeName, stateKeyName, cancellationToken: cancellationToken);
Console.WriteLine("Deleted State!");
}


}
}
74 changes: 74 additions & 0 deletions src/Dapr.Client/DaprClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,80 @@ public abstract Task SaveStateAsync<TValue>(
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default);


/// <summary>
/// Saves the provided <paramref name="binaryValue" /> associated with the provided <paramref name="key" /> to the Dapr state
/// store
/// </summary>
/// <param name="storeName">The name of the state store.</param>
/// <param name="key">The state key.</param>
/// <param name="binaryValue">The binary data that will be stored in the state store.</param>
/// <param name="stateOptions">Options for performing save state operation.</param>
/// <param name="metadata">A collection of metadata key-value pairs that will be provided to the state store. The valid metadata keys and values are determined by the type of state store used.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task" /> that will complete when the operation has completed.</returns>
public abstract Task SaveByteStateAsync(
divzi-p marked this conversation as resolved.
Show resolved Hide resolved
string storeName,
string key,
ReadOnlyMemory<byte> binaryValue,
StateOptions stateOptions = default,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default);

/// <summary>
///Saves the provided <paramref name="binaryValue" /> associated with the provided <paramref name="key" /> using the
/// <paramref name="etag"/> to the Dapr state. State store implementation will allow the update only if the attached ETag matches with the latest ETag in the state store.
/// </summary>
/// <param name="storeName">The name of the state store.</param>
/// <param name="key">The state key.</param>
/// <param name="binaryValue">The binary data that will be stored in the state store.</param>
/// <param name="etag">An ETag.</param>
/// <param name="stateOptions">Options for performing save state operation.</param>
/// <param name="metadata">A collection of metadata key-value pairs that will be provided to the state store. The valid metadata keys and values are determined by the type of state store used.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task" /> that will complete when the operation has completed.</returns>
public abstract Task<bool> TrySaveByteStateAsync(
string storeName,
string key,
ReadOnlyMemory<byte> binaryValue,
string etag,
StateOptions stateOptions = default,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default);


/// <summary>
/// Gets the current binary value associated with the <paramref name="key" /> from the Dapr state store.
/// </summary>
/// <param name="storeName">The name of state store to read from.</param>
/// <param name="key">The state key.</param>
/// <param name="consistencyMode">The consistency mode <see cref="ConsistencyMode" />.</param>
/// <param name="metadata">A collection of metadata key-value pairs that will be provided to the state store. The valid metadata keys and values are determined by the type of state store used.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task{T}" /> that will return the value when the operation has completed.</returns>
public abstract Task<ReadOnlyMemory<byte>> GetByteStateAsync(
string storeName,
string key,
ConsistencyMode? consistencyMode = default,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default);

/// <summary>
divzi-p marked this conversation as resolved.
Show resolved Hide resolved
/// Gets the current binary value associated with the <paramref name="key" /> from the Dapr state store and an ETag.
/// </summary>
/// <param name="storeName">The name of the state store.</param>
/// <param name="key">The state key.</param>
/// <param name="consistencyMode">The consistency mode <see cref="ConsistencyMode" />.</param>
/// <param name="metadata">A collection of metadata key-value pairs that will be provided to the state store. The valid metadata keys and values are determined by the type of state store used.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task{T}" /> that will return the value when the operation has completed. This wraps the read value and an ETag.</returns>
public abstract Task<(ReadOnlyMemory<byte>, string etag)> GetByteStateAndETagAsync(
string storeName,
string key,
ConsistencyMode? consistencyMode = default,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Tries to save the state <paramref name="value" /> associated with the provided <paramref name="key" /> using the
/// <paramref name="etag"/> to the Dapr state. State store implementation will allow the update only if the attached ETag matches with the latest ETag in the state store.
Expand Down
193 changes: 193 additions & 0 deletions src/Dapr.Client/DaprClientGrpc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,199 @@ private async Task<bool> MakeSaveStateCallAsync<TValue>(
throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}
}

/// <inheritdoc/>
public override async Task SaveByteStateAsync(
string storeName,
string key,
ReadOnlyMemory<byte> value,
StateOptions stateOptions = default,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));
_ = await this.MakeSaveByteStateCallAsync(
storeName,
key,
ByteString.CopyFrom(value.Span),
etag: null,
stateOptions,
metadata,
cancellationToken);
}
/// <inheritdoc/>
public override async Task<bool> TrySaveByteStateAsync(
string storeName,
string key,
ReadOnlyMemory<byte> value,
string etag,
StateOptions stateOptions = default,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));
ArgumentVerifier.ThrowIfNull(etag, nameof(etag));
return await this.MakeSaveByteStateCallAsync(storeName, key, ByteString.CopyFrom(value.Span), etag, stateOptions, metadata, cancellationToken);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're not doing anything with the result of the awaited task you can optimize the code a bit by dropping the await here and remove async from the method signature

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this 'await' is needed as MakeSaveByteStateCallAsync is an asyc task. Similar to method-TrySaveStateAsync. Thanks.

}

// Method MakeSaveStateCallAsync to save binary value
private async Task<bool> MakeSaveByteStateCallAsync(
string storeName,
string key,
ByteString value,
string etag = default,
StateOptions stateOptions = default,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
var envelope = new Autogenerated.SaveStateRequest()
{
StoreName = storeName,
};


var stateItem = new Autogenerated.StateItem()
{
Key = key,
};

if (metadata != null)
{
foreach (var kvp in metadata)
{
stateItem.Metadata.Add(kvp.Key, kvp.Value);
}
}

if (etag != null)
{
stateItem.Etag = new Autogenerated.Etag() { Value = etag };
}

if (stateOptions != null)
{
stateItem.Options = ToAutoGeneratedStateOptions(stateOptions);
}

if (value != null)
{

stateItem.Value = value;
}

envelope.States.Add(stateItem);

var options = CreateCallOptions(headers: null, cancellationToken);
try
{
await client.SaveStateAsync(envelope, options);
return true;
}
catch (RpcException rpc) when (etag != null && rpc.StatusCode == StatusCode.Aborted)
{
// This kind of failure indicates an ETag mismatch. Aborted doesn't seem like
// the right status code at first, but check the docs, it fits this use-case.
//
// When an ETag is used we surface this though the Try... pattern
return false;
}
catch (RpcException ex)
{
throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}
}

/// <inheritdoc/>
public override async Task<ReadOnlyMemory<byte>> GetByteStateAsync(
string storeName,
string key,
ConsistencyMode? consistencyMode = default,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));
var envelope = new Autogenerated.GetStateRequest()
{
StoreName = storeName,
Key = key,
};
if (consistencyMode != null)
{
envelope.Consistency = GetStateConsistencyForConsistencyMode(consistencyMode.Value);
}
if (metadata != null)
{
foreach (var kvp in metadata)
{
envelope.Metadata.Add(kvp.Key, kvp.Value);
}
}
var options = CreateCallOptions(headers: null, cancellationToken);
try
{
var response = await client.GetStateAsync(envelope, options);
return response.Data.ToByteArray().AsMemory();
}
catch (RpcException ex)
{
throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}
}

/// <inheritdoc/>
public override async Task<(ReadOnlyMemory<byte>, string etag)> GetByteStateAndETagAsync(
string storeName,
string key,
ConsistencyMode? consistencyMode = default,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));

var envelope = new Autogenerated.GetStateRequest()
{
StoreName = storeName,
Key = key
};

if (metadata != null)
{
foreach (var kvp in metadata)
{
envelope.Metadata.Add(kvp.Key, kvp.Value);
}
}

if (consistencyMode != null)
{
envelope.Consistency = GetStateConsistencyForConsistencyMode(consistencyMode.Value);
}

var options = CreateCallOptions(headers: null, cancellationToken);
Autogenerated.GetStateResponse response;

try
{
response = await client.GetStateAsync(envelope, options);
}
catch (RpcException ex)
{
throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}

try
{
return (response.Data.ToByteArray().AsMemory(), response.Etag);
}
catch (JsonException ex)
{
throw new DaprException("State operation failed: the state payload could not be deserialized. See InnerException for details.", ex);
}
}


/// <inheritdoc/>
Expand Down
Loading