Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

在上下文 中 封装事务 报错 #273

Open
xiantaibai opened this issue Aug 1, 2024 · 9 comments
Open

在上下文 中 封装事务 报错 #273

xiantaibai opened this issue Aug 1, 2024 · 9 comments

Comments

@xiantaibai
Copy link

执行 dbcontext.SaveEntitiesAsync();
错误类型
System.InvalidOperationException: The specified transaction is not associated with the current connection. Only transactions associated with the current connection may be used.
at Microsoft.EntityFrameworkCore.Storage.RelationalTransaction..ctor(IRelationalConnection connection, DbTransaction transaction, Guid transactionId, IDiagnosticsLogger1 logger, Boolean transactionOwned, ISqlGenerationHelper sqlGenerationHelper) at Microsoft.EntityFrameworkCore.Storage.RelationalTransactionFactory.Create(IRelationalConnection connection, DbTransaction transaction, Guid transactionId, IDiagnosticsLogger1 logger, Boolean transactionOwned)
at Microsoft.EntityFrameworkCore.Storage.RelationalConnection.CreateRelationalTransaction(DbTransaction transaction, Guid transactionId, Boolean transactionOwned)
at Microsoft.EntityFrameworkCore.Storage.RelationalConnection.UseTransaction(DbTransaction transaction, Guid transactionId)
at Microsoft.EntityFrameworkCore.RelationalDatabaseFacadeExtensions.UseTransaction(DatabaseFacade databaseFacade, DbTransaction transaction, Guid transactionId)
at Microsoft.EntityFrameworkCore.RelationalDatabaseFacadeExtensions.UseTransaction(DatabaseFacade databaseFacade, DbTransaction transaction)
at ShardingCore.Sharding.ShardingDbContextExecutors.DataSourceDbContext.JoinCurrentTransaction()
at ShardingCore.Sharding.ShardingDbContextExecutors.DataSourceDbContext.NotifyTransaction()
at ShardingCore.Sharding.ShardingDbContextExecutors.ShardingDbContextExecutor.NotifyShardingTransaction()
at ShardingCore.EFCores.ShardingRelationalTransactionManager.BeginTransaction(IsolationLevel isolationLevel)
at ShardingCore.EFCores.ShardingRelationalTransactionManager.BeginTransaction()
at Microsoft.EntityFrameworkCore.Infrastructure.DatabaseFacade.BeginTransaction()

数据库上下文

public class IotDbContext : AbstractShardingDbContext, IShardingTableDbContext
{
    public DbSet<DeviceInfo> DeviceInfo { get; set; }
    public DbSet<Message> Message { get; set; }

    public IotDbContext(DbContextOptions options, IMediator mediator, IServiceProvider provider) : base(options)
    {

    }
    //public IotDbContext(DbContextOptions options) : base(options)
    //{
    //}

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        if (modelBuilder is null)
        {
            throw new ArgumentNullException(nameof(modelBuilder));
        }

        // modelBuilder.ApplyConfiguration(new DeviceInfoEntityTypeConfiguration());
        modelBuilder.ApplyConfigurationsFromAssembly(typeof(DeviceInfoEntityTypeConfiguration).Assembly);
    }

    public IRouteTail RouteTail { get; set; }

    public IDbContextTransaction? CurrentTransaction { get; private set; }

    public IDbContextTransaction BeginTransaction()
    {
        try
        {
    
            if (_publisherTransactionFactory != null)
            {
                CurrentTransaction = _publisherTransactionFactory.BeginTransaction(this);
            }
            else
            {
                CurrentTransaction = Database.BeginTransaction();
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.ToString());
        }
        return CurrentTransaction;
    }
    
    
    public async Task CommitAsync(CancellationToken cancellationToken = default)
    {
        if (CurrentTransaction != null)
        {
            await CurrentTransaction.CommitAsync(cancellationToken);
            CurrentTransaction = null;
        }
    }
    
    public async Task RollbackAsync(CancellationToken cancellationToken = default)
    {
        if (CurrentTransaction != null)
        {
            await CurrentTransaction.RollbackAsync(cancellationToken);
            CurrentTransaction = null;
        }
    }

    public async Task<bool> SaveEntitiesAsync(CancellationToken cancellationToken = default)
    {
        if (CurrentTransaction == null)
        {
            CurrentTransaction = this.BeginTransaction();
            await using (CurrentTransaction)
            {
                try
                {
                    await base.SaveChangesAsync(cancellationToken);
                    await _mediator.DispatchDomainEventsAsync(this, cancellationToken);
                    await CommitAsync(cancellationToken);
                    return true;
                }
                catch
                {
                    await RollbackAsync(cancellationToken);
                    throw;
                }
            }
        }
        else
        {
            await base.SaveChangesAsync(cancellationToken);
            await _mediator.DispatchDomainEventsAsync(this, cancellationToken);
            return true;
        }
    }
}
@xuejmnet
Copy link
Collaborator

xuejmnet commented Aug 1, 2024

不太清楚这段代码的意义

_publisherTransactionFactory.BeginTransaction(this);

@xuejmnet
Copy link
Collaborator

xuejmnet commented Aug 1, 2024

直接使用

using(var tran=dbcontex.Database.beginTransaction()){

tran.commit()}

@xiantaibai
Copy link
Author

_publisherTransactionFactory.BeginTransaction(this); 这个是用外部事务。
我重新封装了一个UnitOfWork,在仓储里面使用 事务。当执行这段代码时候 _context.Database.BeginTransaction(); 就会出现上面的错误

public class IotUnitOfWork : ITransactionUnitOfWork
{
    private readonly IotDbContext _context;

    public IotUnitOfWork(IotDbContext context)
    {
        _context = context;
    }

    public IDbContextTransaction? CurrentTransaction { get; private set; }

    public IDbContextTransaction BeginTransaction()
    {
        CurrentTransaction = _context.Database.BeginTransaction();
        return CurrentTransaction;
    }

    public async Task CommitAsync(CancellationToken cancellationToken = default)
    {
        if (CurrentTransaction != null)
        {
            await CurrentTransaction.CommitAsync(cancellationToken);
            CurrentTransaction = null;
        }
    }

    public void Dispose()
    {
        _context.Dispose();
    }

    public async Task RollbackAsync(CancellationToken cancellationToken = default)
    {
        if (CurrentTransaction != null)
        {
            await CurrentTransaction.RollbackAsync(cancellationToken);
            CurrentTransaction = null;
        }
    }

    public Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
    {
       return _context.SaveChangesAsync(cancellationToken);
    }

    public async Task<bool> SaveEntitiesAsync(CancellationToken cancellationToken = default)
    {
        if (CurrentTransaction == null)
        {
            CurrentTransaction = this.BeginTransaction();
            await using (CurrentTransaction)
            {
                try
                {
                    await SaveChangesAsync(cancellationToken);
                    // await _mediator.DispatchDomainEventsAsync(this, cancellationToken);
                    await CommitAsync(cancellationToken);
                    return true;
                }
                catch
                {
                    await RollbackAsync(cancellationToken);
                    throw;
                }
            }
        }
        else
        {
            await SaveChangesAsync(cancellationToken);
            // await _mediator.DispatchDomainEventsAsync(this, cancellationToken);
            return true;
        }
    }
}

开启事务

msgRepository.Add(message);

var result = await msgRepository.UnitOfWork.SaveEntitiesAsync(cancellationToken);

@xuejmnet
Copy link
Collaborator

xuejmnet commented Aug 1, 2024

@xiantaibai 先尝试一下普通的事务开启是否可以

@xiantaibai
Copy link
Author

我使用原生 注入 ,是没问题的。改成 AddShardingDbContext 就会出现上面的问题

builder.Services.AddDbContext<IotDbContext>(options =>
{
    options.UseMySql(constr, ServerVersion.AutoDetect(constr), builder =>
    {
        builder.UseRelationalNulls();
    });
});

@xuejmnet
Copy link
Collaborator

xuejmnet commented Aug 1, 2024

我说的是addShardingDbcontext加普通模式开启事务

using(var tran=dbcontex.Database.beginTransaction()){

tran.commit()}

@xiantaibai
Copy link
Author

addShardingDbcontext加普通模式开启事务 报同样的错误,执行到 iotDbContext.Add(message); 报错

using (var transaction = await iotDbContext.Database.BeginTransactionAsync())
{
    try
    {
        iotDbContext.Add(message);
        var result = await iotDbContext.SaveChangesAsync(cancellationToken) > 0;
        await transaction.CommitAsync();
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString());
        await transaction.RollbackAsync();
    }
}

@xiantaibai
Copy link
Author

这是启动配置

builder.Services.AddShardingDbContext<IotDbContext>()
//builder.Services.AddShardingConfigure<IotDbContext>()
 .UseRouteConfig(op =>
{
    op.AddShardingTableRoute<MessageVirtualTableRoute>();
})
.UseConfig(options =>
{
    //当无法获取路由时会返回默认值而不是报错
    options.ThrowIfQueryRouteNotMatch = false;

    options.UseShardingQuery((conStr, builder) =>
    {
        builder.UseMySql(constr, ServerVersion.AutoDetect(constr), builder =>
        {
            builder.UseRelationalNulls();
        }).UseLoggerFactory(efLogger);
    });

    options.UseShardingTransaction((conStr, builder) =>
    {
        builder.UseMySql(constr, ServerVersion.AutoDetect(constr), builder =>
        {
            builder.UseRelationalNulls();
        }).UseLoggerFactory(efLogger);
    });
    options.UseShardingMigrationConfigure(b =>
    {
        b.ReplaceService<IMigrationsSqlGenerator, ShardingMySqlMigrationsSqlGenerator>();
    });
    options.AddDefaultDataSource("ds0", constr);
})
 .ReplaceService<IDbContextCreator, AppDbContextCreator>(ServiceLifetime.Singleton)
.AddShardingCore();
builder.Services.Replace(ServiceDescriptor.Singleton<IDbContextCreator, AppDbContextCreator>());

builder.Services.AddScoped<IDeviceInfoRepository, DeviceInfoRepository>();
builder.Services.AddScoped<IMessageRepository, MessageRepositoryy>();

@xuejmnet
Copy link
Collaborator

xuejmnet commented Aug 3, 2024

@xiantaibai
the current connection may be used.这句话的意思是你并发了吧这个链接在a线程被使用了b线程不能被用了,所以看是否有并发的用法导致的比如Task.Run内

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants