Skip to content

Commit

Permalink
Fixes AsyncLocal issue to support start async transaction. (#1441)
Browse files Browse the repository at this point in the history
* Fixes AsyncLocal issue to support  start async transaction. (#1376, #1317, #1266)

* Add support async transaction
  • Loading branch information
yang-xiaodong authored Dec 5, 2023
1 parent 254927e commit f3d94a6
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 100 deletions.
5 changes: 2 additions & 3 deletions src/DotNetCore.CAP.MongoDB/ICapTransaction.MongoDB.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,8 @@ public static IClientSessionHandle StartTransaction(this IMongoClient client,
ICapPublisher publisher, bool autoCommit = false)
{
var clientSessionHandle = client.StartSession();
publisher.Transaction.Value =
ActivatorUtilities.CreateInstance<MongoDBCapTransaction>(publisher.ServiceProvider);
var capTrans = publisher.Transaction.Value.Begin(clientSessionHandle, autoCommit);
publisher.Transaction = ActivatorUtilities.CreateInstance<MongoDBCapTransaction>(publisher.ServiceProvider);
var capTrans = publisher.Transaction.Begin(clientSessionHandle, autoCommit);
return new CapMongoDbClientSessionHandle(capTrans);
}
}
122 changes: 93 additions & 29 deletions src/DotNetCore.CAP.MySql/ICapTransaction.MySql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,29 +91,12 @@ public override void Dispose()
{
(DbTransaction as IDisposable)?.Dispose();
DbTransaction = null;
GC.SuppressFinalize(this);
}
}

public static class CapTransactionExtensions
{
public static ICapTransaction Begin(this ICapTransaction transaction,
IDbContextTransaction dbTransaction, bool autoCommit = false)
{
transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit;

return transaction;
}

public static ICapTransaction Begin(this ICapTransaction transaction,
IDbTransaction dbTransaction, bool autoCommit = false)
{
transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit;

return transaction;
}

/// <summary>
/// Start the CAP transaction
/// </summary>
Expand All @@ -124,10 +107,7 @@ public static ICapTransaction Begin(this ICapTransaction transaction,
public static IDbContextTransaction BeginTransaction(this DatabaseFacade database,
ICapPublisher publisher, bool autoCommit = false)
{
var trans = database.BeginTransaction();
publisher.Transaction.Value = ActivatorUtilities.CreateInstance<MySqlCapTransaction>(publisher.ServiceProvider);
var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit);
return new CapEFDbTransaction(capTrans);
return BeginTransaction(database, IsolationLevel.Unspecified, publisher, autoCommit);
}

/// <summary>
Expand All @@ -141,10 +121,44 @@ public static IDbContextTransaction BeginTransaction(this DatabaseFacade databas
public static IDbContextTransaction BeginTransaction(this DatabaseFacade database,
IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false)
{
var trans = database.BeginTransaction(isolationLevel);
publisher.Transaction.Value = ActivatorUtilities.CreateInstance<MySqlCapTransaction>(publisher.ServiceProvider);
var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit);
return new CapEFDbTransaction(capTrans);
var dbTransaction = database.BeginTransaction(isolationLevel);
publisher.Transaction = ActivatorUtilities.CreateInstance<MySqlCapTransaction>(publisher.ServiceProvider);
publisher.Transaction.DbTransaction = dbTransaction;
publisher.Transaction.AutoCommit = autoCommit;
return new CapEFDbTransaction(publisher.Transaction);
}

/// <summary>
/// Start the CAP transaction async
/// </summary>
/// <param name="database">The <see cref="DatabaseFacade" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <param name="cancellationToken"></param>
/// <returns>The <see cref="IDbContextTransaction" /> of EF DbContext transaction object.</returns>
public static Task<IDbContextTransaction> BeginTransactionAsync(this DatabaseFacade database,
ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default)
{
return BeginTransactionAsync(database, IsolationLevel.Unspecified, publisher, autoCommit, cancellationToken);
}

/// <summary>
/// Start the CAP transaction async
/// </summary>
/// <param name="database">The <see cref="DatabaseFacade" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="isolationLevel">The <see cref="IsolationLevel" /> to use</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <param name="cancellationToken"></param>
/// <returns>The <see cref="IDbContextTransaction" /> of EF DbContext transaction object.</returns>
public static async Task<IDbContextTransaction> BeginTransactionAsync(this DatabaseFacade database,
IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default)
{
var dbTransaction = await database.BeginTransactionAsync(isolationLevel,cancellationToken);
publisher.Transaction = ActivatorUtilities.CreateInstance<MySqlCapTransaction>(publisher.ServiceProvider);
publisher.Transaction.DbTransaction = dbTransaction;
publisher.Transaction.AutoCommit = autoCommit;
return new CapEFDbTransaction(publisher.Transaction);
}

/// <summary>
Expand All @@ -156,11 +170,61 @@ public static IDbContextTransaction BeginTransaction(this DatabaseFacade databas
/// <returns>The <see cref="ICapTransaction" /> object.</returns>
public static ICapTransaction BeginTransaction(this IDbConnection dbConnection,
ICapPublisher publisher, bool autoCommit = false)
{
return BeginTransaction(dbConnection, IsolationLevel.Unspecified, publisher, autoCommit);
}

/// <summary>
/// Start the CAP transaction
/// </summary>
/// <param name="dbConnection">The <see cref="IDbConnection" />.</param>
/// <param name="isolationLevel">The <see cref="IsolationLevel" /> to use</param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <returns>The <see cref="ICapTransaction" /> object.</returns>
public static ICapTransaction BeginTransaction(this IDbConnection dbConnection,
IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false)
{
if (dbConnection.State == ConnectionState.Closed) dbConnection.Open();

var dbTransaction = dbConnection.BeginTransaction(isolationLevel);
publisher.Transaction = ActivatorUtilities.CreateInstance<MySqlCapTransaction>(publisher.ServiceProvider);
publisher.Transaction.DbTransaction = dbTransaction;
publisher.Transaction.AutoCommit = autoCommit;
return publisher.Transaction;
}

/// <summary>
/// Start the CAP transaction
/// </summary>
/// <param name="dbConnection">The <see cref="IDbConnection" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <param name="cancellationToken"></param>
/// <returns>The <see cref="ICapTransaction" /> object.</returns>
public static Task<ICapTransaction> BeginTransactionAsync(this IDbConnection dbConnection,
ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default)
{
return BeginTransactionAsync(dbConnection, IsolationLevel.Unspecified, publisher, autoCommit, cancellationToken);
}

/// <summary>
/// Start the CAP transaction
/// </summary>
/// <param name="dbConnection">The <see cref="IDbConnection" />.</param>
/// <param name="isolationLevel">The <see cref="IsolationLevel" /> to use</param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <param name="cancellationToken"></param>
public static async Task<ICapTransaction> BeginTransactionAsync(this IDbConnection dbConnection,
IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default)
{
if (dbConnection.State == ConnectionState.Closed) dbConnection.Open();

var dbTransaction = dbConnection.BeginTransaction();
publisher.Transaction.Value = ActivatorUtilities.CreateInstance<MySqlCapTransaction>(publisher.ServiceProvider);
return publisher.Transaction.Value.Begin(dbTransaction, autoCommit);
var dbTransaction = await ((DbConnection)dbConnection).BeginTransactionAsync(isolationLevel, cancellationToken);
publisher.Transaction = ActivatorUtilities.CreateInstance<MySqlCapTransaction>(publisher.ServiceProvider);
publisher.Transaction.DbTransaction = dbTransaction;
publisher.Transaction.AutoCommit = autoCommit;
return publisher.Transaction;
}
}
2 changes: 1 addition & 1 deletion src/DotNetCore.CAP.OpenTelemetry/DiagnosticListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void OnNext(KeyValuePair<string, object?> evt)
var eventData = (CapEventDataPubSend)evt.Value!;
var parentContext = Propagator.Extract(default, eventData.TransportMessage, (msg, key) =>
{
return msg.Headers.TryGetValue(key, out var value) ? (new[] { value }) : Enumerable.Empty<string>();
return msg.Headers.TryGetValue(key, out var value) ? (new[] { value }!) : Enumerable.Empty<string>();
});
_contexts.TryRemove(eventData.TransportMessage.GetId(), out var context);
var activity = ActivitySource.StartActivity(
Expand Down
114 changes: 87 additions & 27 deletions src/DotNetCore.CAP.PostgreSql/ICapTransaction.PostgreSql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,27 +90,42 @@ public override void Dispose()
{
(DbTransaction as IDisposable)?.Dispose();
DbTransaction = null;
GC.SuppressFinalize(this);
}
}

public static class CapTransactionExtensions
{
public static ICapTransaction Begin(this ICapTransaction transaction,
IDbTransaction dbTransaction, bool autoCommit = false)
/// <summary>
/// Start the CAP transaction
/// </summary>
/// <param name="dbConnection">The <see cref="IDbConnection" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
public static ICapTransaction BeginTransaction(this IDbConnection dbConnection,
ICapPublisher publisher, bool autoCommit = false)
{
transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit;

return transaction;
return BeginTransaction(dbConnection, IsolationLevel.Unspecified, publisher, autoCommit);
}

public static ICapTransaction Begin(this ICapTransaction transaction,
IDbContextTransaction dbTransaction, bool autoCommit = false)
/// <summary>
/// Start the CAP transaction
/// </summary>
/// <param name="dbConnection">The <see cref="IDbConnection" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <param name="isolationLevel"><see cref="IsolationLevel"/></param>
public static ICapTransaction BeginTransaction(this IDbConnection dbConnection,
IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false)
{
transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit;
if (dbConnection.State == ConnectionState.Closed) dbConnection.Open();
var dbTransaction = dbConnection.BeginTransaction(isolationLevel);

publisher.Transaction = ActivatorUtilities.CreateInstance<PostgreSqlCapTransaction>(publisher.ServiceProvider);
publisher.Transaction.DbTransaction = dbTransaction;
publisher.Transaction.AutoCommit = autoCommit;

return transaction;
return publisher.Transaction;
}

/// <summary>
Expand All @@ -119,16 +134,32 @@ public static ICapTransaction Begin(this ICapTransaction transaction,
/// <param name="dbConnection">The <see cref="IDbConnection" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <returns>The <see cref="ICapTransaction" /> object.</returns>
public static ICapTransaction BeginTransaction(this IDbConnection dbConnection,
ICapPublisher publisher, bool autoCommit = false)
/// <param name="cancellationToken"></param>
public static Task<ICapTransaction> BeginTransactionAsync(this IDbConnection dbConnection,
ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default)
{
return BeginTransactionAsync(dbConnection, IsolationLevel.Unspecified, publisher, autoCommit, cancellationToken);
}

/// <summary>
/// Start the CAP transaction
/// </summary>
/// <param name="dbConnection">The <see cref="IDbConnection" />.</param>
/// <param name="isolationLevel"><see cref="IsolationLevel"/></param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <param name="cancellationToken"></param>
public static async Task<ICapTransaction> BeginTransactionAsync(this IDbConnection dbConnection,
IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default)
{
if (dbConnection.State == ConnectionState.Closed) dbConnection.Open();
var dbTransaction = await ((DbConnection)dbConnection).BeginTransactionAsync(isolationLevel, cancellationToken);

publisher.Transaction = ActivatorUtilities.CreateInstance<PostgreSqlCapTransaction>(publisher.ServiceProvider);
publisher.Transaction.DbTransaction = dbTransaction;
publisher.Transaction.AutoCommit = autoCommit;

var dbTransaction = dbConnection.BeginTransaction();
publisher.Transaction.Value =
ActivatorUtilities.CreateInstance<PostgreSqlCapTransaction>(publisher.ServiceProvider);
return publisher.Transaction.Value.Begin(dbTransaction, autoCommit);
return publisher.Transaction;
}

/// <summary>
Expand All @@ -141,11 +172,7 @@ public static ICapTransaction BeginTransaction(this IDbConnection dbConnection,
public static IDbContextTransaction BeginTransaction(this DatabaseFacade database,
ICapPublisher publisher, bool autoCommit = false)
{
var trans = database.BeginTransaction();
publisher.Transaction.Value =
ActivatorUtilities.CreateInstance<PostgreSqlCapTransaction>(publisher.ServiceProvider);
var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit);
return new CapEFDbTransaction(capTrans);
return BeginTransaction(database, IsolationLevel.Unspecified, publisher, autoCommit);
}

/// <summary>
Expand All @@ -160,9 +187,42 @@ public static IDbContextTransaction BeginTransaction(this DatabaseFacade databas
IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false)
{
var trans = database.BeginTransaction(isolationLevel);
publisher.Transaction.Value =
ActivatorUtilities.CreateInstance<PostgreSqlCapTransaction>(publisher.ServiceProvider);
var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit);
return new CapEFDbTransaction(capTrans);
publisher.Transaction = ActivatorUtilities.CreateInstance<PostgreSqlCapTransaction>(publisher.ServiceProvider);
publisher.Transaction.DbTransaction = trans;
publisher.Transaction.AutoCommit = autoCommit;
return new CapEFDbTransaction(publisher.Transaction);
}

/// <summary>
/// Start the CAP transaction async
/// </summary>
/// <param name="database">The <see cref="DatabaseFacade" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <param name="cancellationToken"></param>
/// <returns>The <see cref="IDbContextTransaction" /> of EF DbContext transaction object.</returns>
public static Task<IDbContextTransaction> BeginTransactionAsync(this DatabaseFacade database,
ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default)
{
return BeginTransactionAsync(database, IsolationLevel.Unspecified, publisher, autoCommit, cancellationToken);
}

/// <summary>
/// Start the CAP transaction async
/// </summary>
/// <param name="database">The <see cref="DatabaseFacade" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="isolationLevel">The <see cref="IsolationLevel" /> to use</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <param name="cancellationToken"></param>
/// <returns>The <see cref="IDbContextTransaction" /> of EF DbContext transaction object.</returns>
public static async Task<IDbContextTransaction> BeginTransactionAsync(this DatabaseFacade database,
IsolationLevel isolationLevel, ICapPublisher publisher, bool autoCommit = false, CancellationToken cancellationToken = default)
{
var trans = await database.BeginTransactionAsync(isolationLevel, cancellationToken);
publisher.Transaction = ActivatorUtilities.CreateInstance<PostgreSqlCapTransaction>(publisher.ServiceProvider);
publisher.Transaction.DbTransaction = trans;
publisher.Transaction.AutoCommit = autoCommit;
return new CapEFDbTransaction(publisher.Transaction);
}
}
Loading

0 comments on commit f3d94a6

Please sign in to comment.