Skip to content

Commit

Permalink
Separate transaction from nested executor
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Nov 20, 2023
1 parent 037a1ef commit 92f7734
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Amp\Sql\Common;

use Amp\Sql\Executor;
use Amp\Sql\Result;
use Amp\Sql\Statement;
use Amp\Sql\Transaction;
Expand All @@ -12,9 +13,9 @@
* @template TStatement of Statement<TResult>
* @template TTransaction of Transaction
*
* @extends Transaction<TResult, TStatement, TTransaction>
* @extends Executor<TResult, TStatement, TTransaction>
*/
interface NestableTransaction extends Transaction
interface NestableTransactionExecutor extends Executor
{
/**
* Creates a savepoint with the given identifier.
Expand Down
55 changes: 20 additions & 35 deletions src/NestedTransaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Amp\Sql\Common;

use Amp\DeferredFuture;
use Amp\Sql\Executor;
use Amp\Sql\Result;
use Amp\Sql\Statement;
use Amp\Sql\Transaction;
Expand All @@ -14,11 +15,11 @@
* @template TResult of Result
* @template TStatement of Statement<TResult>
* @template TTransaction of Transaction
* @template TNestedTransaction of NestableTransaction<TResult, TStatement, TTransaction>
* @template TNestedExecutor of NestableTransactionExecutor<TResult, TStatement, TTransaction>
*
* @implements NestableTransaction<TResult, TStatement, TTransaction>
* @implements Transaction<TResult, TStatement, TTransaction>
*/
abstract class NestedTransaction implements NestableTransaction
abstract class NestedTransaction implements Transaction
{
/** @var \Closure():void */
private readonly \Closure $release;
Expand Down Expand Up @@ -58,25 +59,27 @@ abstract protected function createStatement(Statement $statement, \Closure $rele
abstract protected function createResult(Result $result, \Closure $release): Result;

/**
* @param TNestedTransaction $transaction
* @param TNestedExecutor $executor
* @param non-empty-string $identifier
* @param \Closure():void $release
*
* @return TTransaction
*/
abstract protected function createNestedTransaction(
NestableTransaction $transaction,
NestableTransactionExecutor $executor,
string $identifier,
\Closure $release,
): Transaction;

/**
* @param TNestedTransaction $transaction Transaction object created by connection.
* @param TTransaction $transaction Transaction object created by connection.
* @param TNestedExecutor $executor
* @param non-empty-string $identifier
* @param \Closure():void $release Callable to be invoked when the transaction completes or is destroyed.
*/
public function __construct(
protected readonly Transaction $transaction,
protected readonly Executor $executor,
private readonly string $identifier,
\Closure $release,
) {
Expand Down Expand Up @@ -106,7 +109,7 @@ public function query(string $sql): Result
++$this->refCount;

try {
$result = $this->transaction->query($sql);
$result = $this->executor->query($sql);
return $this->createResult($result, $this->release);
} catch (\Throwable $exception) {
EventLoop::queue($this->release);
Expand All @@ -120,7 +123,7 @@ public function prepare(string $sql): Statement
++$this->refCount;

try {
$statement = $this->transaction->prepare($sql);
$statement = $this->executor->prepare($sql);
return $this->createStatement($statement, $this->release);
} catch (\Throwable $exception) {
EventLoop::queue($this->release);
Expand All @@ -134,7 +137,7 @@ public function execute(string $sql, array $params = []): Result
++$this->refCount;

try {
$result = $this->transaction->execute($sql, $params);
$result = $this->executor->execute($sql, $params);
return $this->createResult($result, $this->release);
} catch (\Throwable $exception) {
EventLoop::queue($this->release);
Expand All @@ -151,8 +154,8 @@ public function beginTransaction(): Transaction
$identifier = $this->identifier . '-' . $this->nextId++;

try {
$this->transaction->createSavepoint($identifier);
return $this->createNestedTransaction($this->transaction, $identifier, $this->release);
$this->executor->createSavepoint($identifier);
return $this->createNestedTransaction($this->executor, $identifier, $this->release);
} catch (\Throwable $exception) {
EventLoop::queue($this->release);
throw $exception;
Expand All @@ -161,7 +164,7 @@ public function beginTransaction(): Transaction

public function isClosed(): bool
{
return $this->transaction->isClosed();
return $this->executor->isClosed();
}

/**
Expand All @@ -176,24 +179,24 @@ public function close(): void

public function onClose(\Closure $onClose): void
{
$this->transaction->onClose($onClose);
$this->executor->onClose($onClose);
}

public function isActive(): bool
{
return $this->transaction->isActive();
return !$this->isClosed();
}

public function commit(): void
{
$this->awaitPendingNestedTransaction();
$this->active = false;

$this->transaction->releaseSavepoint($this->identifier);
$this->executor->releaseSavepoint($this->identifier);
EventLoop::queue($this->release);

$onRollback = $this->onRollback;
$this->transaction->onRollback(static fn() => $onRollback->isComplete() || $onRollback->complete());
$this->transaction->onRollback(static fn () => $onRollback->isComplete() || $onRollback->complete());
$this->onClose->complete();
}

Expand All @@ -202,31 +205,13 @@ public function rollback(): void
$this->awaitPendingNestedTransaction();
$this->active = false;

$this->transaction->rollbackTo($this->identifier);
$this->executor->rollbackTo($this->identifier);
EventLoop::queue($this->release);

$this->onRollback->complete();
$this->onClose->complete();
}

public function createSavepoint(string $identifier): void
{
$this->awaitPendingNestedTransaction();
$this->transaction->createSavepoint($identifier);
}

public function releaseSavepoint(string $identifier): void
{
$this->awaitPendingNestedTransaction();
$this->transaction->releaseSavepoint($identifier);
}

public function rollbackTo(string $identifier): void
{
$this->awaitPendingNestedTransaction();
$this->transaction->rollbackTo($identifier);
}

public function onCommit(\Closure $onCommit): void
{
$this->transaction->onCommit($onCommit);
Expand Down

0 comments on commit 92f7734

Please sign in to comment.