Skip to content

Commit

Permalink
Dispose of result faster when fetchRow() is used on PooledResult
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Dec 16, 2023
1 parent ad61674 commit de27f1c
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 78 deletions.
2 changes: 1 addition & 1 deletion src/CommandResult.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public function __construct(
) {
}

final public function getIterator(): \Traversable
final public function getIterator(): \EmptyIterator
{
return new \EmptyIterator;
}
Expand Down
103 changes: 65 additions & 38 deletions src/PooledResult.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,59 +15,80 @@
*/
abstract class PooledResult implements Result, \IteratorAggregate
{
/** @var null|\Closure():void */
private ?\Closure $release;

/** @var Future<TResult|null>|null */
private ?Future $next = null;

/** @var \Iterator<int, array<string, TFieldValue>> */
private readonly \Iterator $iterator;

/**
* @template Tr of Result
*
* @param Tr $result
* @param \Closure():void $release
*
* @return Tr
*/
abstract protected static function newInstanceFrom(Result $result, \Closure $release): Result;

/**
* @param TResult $result Result object created by pooled connection or statement.
* @param \Closure():void $release Callable to be invoked when the result set is destroyed.
*/
public function __construct(private readonly Result $result, \Closure $release)
public function __construct(private readonly Result $result, private readonly \Closure $release)
{
$this->release = $release;

if ($this->result instanceof CommandResult) {
$this->next = $this->fetchNextResult();
$this->iterator = $this->result->getIterator();
$this->next = self::fetchNextResult($this->result, $this->release);
return;
}

$next = &$this->next;
$this->iterator = (static function () use (&$next, $result, $release): \Generator {
try {
yield from $result;
} catch (\Throwable $exception) {
if (!$next) {
EventLoop::queue($release);
}
throw $exception;
}

$next ??= self::fetchNextResult($result, $release);
})();
}

public function __destruct()
{
$this->dispose();
EventLoop::queue(self::dispose(...), $this->iterator);
}

/**
* @param TResult $result
* @param \Closure():void $release
*
* @return TResult
*/
abstract protected function newInstanceFrom(Result $result, \Closure $release): Result;

private function dispose(): void
private static function dispose(\Iterator $iterator): void
{
if ($this->release !== null) {
EventLoop::queue($this->release);
$this->release = null;
try {
// Discard remaining rows in the result set.
while ($iterator->valid()) {
$iterator->next();
}
} catch (\Throwable) {
// Ignore errors while discarding result.
}
}

public function getIterator(): \Traversable
{
try {
yield from $this->result;
} catch (\Throwable $exception) {
$this->dispose();
throw $exception;
}
return $this->iterator;
}

public function fetchRow(): ?array
{
return $this->result->fetchRow();
if (!$this->iterator->valid()) {
return null;
}

$current = $this->iterator->current();
$this->iterator->next();
return $current;
}

public function getRowCount(): ?int
Expand All @@ -85,24 +106,30 @@ public function getColumnCount(): ?int
*/
public function getNextResult(): ?Result
{
return ($this->next ??= $this->fetchNextResult())->await();
$this->next ??= self::fetchNextResult($this->result, $this->release);
return $this->next->await();
}

private function fetchNextResult(): Future
/**
* @template Tr of Result
*
* @param Tr $result
* @param \Closure():void $release
*
* @return Future<Tr|null>
*/
private static function fetchNextResult(Result $result, \Closure $release): Future
{
return async(function (): ?Result {
/** @var TResult|null $result */
$result = $this->result->getNextResult();
return async(static function () use ($result, $release): ?Result {
/** @var Tr|null $result */
$result = $result->getNextResult();

if ($result === null || $this->release === null) {
$this->dispose();
if ($result === null) {
EventLoop::queue($release);
return null;
}

$result = $this->newInstanceFrom($result, $this->release);
$this->release = null;

return $result;
return static::newInstanceFrom($result, $release);
});
}
}
11 changes: 2 additions & 9 deletions test/ConnectionPoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use Amp\Future;
use Amp\PHPUnit\AsyncTestCase;
use Amp\Sql\Common\ConnectionPool;
use Amp\Sql\Common\PooledResult;
use Amp\Sql\Common\Test\Stub\StubPooledResult;
use Amp\Sql\Connection;
use Amp\Sql\Result;
use Amp\Sql\SqlConfig;
Expand Down Expand Up @@ -67,14 +67,7 @@ private function createPool(SqlConnector $connector, int $maxConnections = 100,
->getMockForAbstractClass();

$pool->method('createResult')
->willReturnCallback(function (Result $result, \Closure $release): PooledResult {
return new class($result, $release) extends PooledResult {
protected function newInstanceFrom(Result $result, \Closure $release): PooledResult
{
return new self($result, $release);
}
};
});
->willReturnCallback(fn (Result $result, \Closure $release) => new StubPooledResult($result, $release));

return $pool;
}
Expand Down
43 changes: 13 additions & 30 deletions test/PooledResultTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
namespace Amp\Sql\Common\Test;

use Amp\PHPUnit\AsyncTestCase;
use Amp\Sql\Common\PooledResult;
use Amp\Sql\Result;
use Amp\Sql\Common\Test\Stub\StubPooledResult;
use Amp\Sql\Common\Test\Stub\StubResult;
use function Amp\delay;

class PooledResultTest extends AsyncTestCase
Expand All @@ -17,33 +17,15 @@ public function testIdleConnectionsRemovedAfterTimeout()
$invoked = true;
};

$secondResult = $this->createMock(PooledResult::class);
$secondResult->method('getIterator')
->willReturn(new \ArrayIterator([['column' => 'value']]));
$secondResult->method('getNextResult')
->willReturn(null);
$expectedRow = ['column' => 'value'];

$firstResult = $this->createMock(PooledResult::class);
$firstResult->method('getIterator')
->willReturn(new \ArrayIterator([['column' => 'value']]));
$firstResult->method('getNextResult')
->willReturn($secondResult);
$secondResult = new StubResult([$expectedRow]);
$firstResult = new StubResult([$expectedRow], $secondResult);
$pooledResult = new StubPooledResult(new StubResult([$expectedRow], $firstResult), $release);

$result = $this->getMockBuilder(PooledResult::class)
->setConstructorArgs([$firstResult, $release])
->getMockForAbstractClass();
$iterator = $pooledResult->getIterator();

$result->expects(self::once())
->method('newInstanceFrom')
->willReturnCallback(function (Result $result, \Closure $release): PooledResult {
return $this->getMockBuilder(PooledResult::class)
->setConstructorArgs([$result, $release])
->getMockForAbstractClass();
});

$iterator = $result->getIterator();

$this->assertSame(['column' => 'value'], $iterator->current());
$this->assertSame($expectedRow, $iterator->current());

$this->assertFalse($invoked);

Expand All @@ -52,15 +34,16 @@ public function testIdleConnectionsRemovedAfterTimeout()

$this->assertFalse($invoked); // Next result set available.

$result = $result->getNextResult();
$iterator = $result->getIterator();
$pooledResult = $pooledResult->getNextResult();
$iterator = $pooledResult->getIterator();

$this->assertSame(['column' => 'value'], $iterator->current());
$this->assertSame($expectedRow, $iterator->current());

$iterator->next();
$this->assertFalse($iterator->valid());

$result->getNextResult();
$pooledResult = $pooledResult->getNextResult();
unset($pooledResult); // Manually unset to trigger destructor.

delay(0); // Tick event loop to dispose of result set.

Expand Down
14 changes: 14 additions & 0 deletions test/Stub/StubPooledResult.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php declare(strict_types=1);

namespace Amp\Sql\Common\Test\Stub;

use Amp\Sql\Common\PooledResult;
use Amp\Sql\Result;

final class StubPooledResult extends PooledResult
{
protected static function newInstanceFrom(Result $result, \Closure $release): self
{
return new self($result, $release);
}
}
42 changes: 42 additions & 0 deletions test/Stub/StubResult.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php declare(strict_types=1);

namespace Amp\Sql\Common\Test\Stub;

use Amp\Sql\Result;

final class StubResult implements Result, \IteratorAggregate
{
private readonly array $rows;

private int $current = 0;

public function __construct(array $rows, private readonly ?Result $next = null)
{
$this->rows = \array_values($rows);
}

public function getIterator(): \Iterator
{
yield from $this->rows;
}

public function fetchRow(): ?array
{
return $this->rows[$this->current++] ?? null;
}

public function getNextResult(): ?Result
{
return $this->next;
}

public function getRowCount(): ?int
{
return \count($this->rows);
}

public function getColumnCount(): ?int
{
return null;
}
}

0 comments on commit de27f1c

Please sign in to comment.