Skip to content

Commit

Permalink
Merge pull request #190 from clue-labs/good-connection
Browse files Browse the repository at this point in the history
Refactor to move command queue to `MysqlClient` and connection logic to `Connection` class
  • Loading branch information
WyriHaximus authored Dec 5, 2023
2 parents bc5ecf3 + 836ca2d commit 3cf70c6
Show file tree
Hide file tree
Showing 6 changed files with 1,874 additions and 404 deletions.
107 changes: 96 additions & 11 deletions src/Io/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace React\Mysql\Io;

use Evenement\EventEmitter;
use React\EventLoop\LoopInterface;
use React\Mysql\Commands\CommandInterface;
use React\Mysql\Commands\PingCommand;
use React\Mysql\Commands\QueryCommand;
Expand All @@ -29,30 +30,66 @@ class Connection extends EventEmitter
private $executor;

/**
* @var integer
* @var int one of the state constants (may change, but should be used readonly from outside)
* @see self::STATE_*
*/
private $state = self::STATE_AUTHENTICATED;
public $state = self::STATE_AUTHENTICATED;

/**
* @var SocketConnectionInterface
*/
private $stream;

/** @var Parser */
private $parser;

/** @var LoopInterface */
private $loop;

/** @var float */
private $idlePeriod = 0.001;

/** @var ?\React\EventLoop\TimerInterface */
private $idleTimer;

/** @var int */
private $pending = 0;

/**
* Connection constructor.
*
* @param SocketConnectionInterface $stream
* @param Executor $executor
* @param Parser $parser
* @param LoopInterface $loop
* @param ?float $idlePeriod
*/
public function __construct(SocketConnectionInterface $stream, Executor $executor)
public function __construct(SocketConnectionInterface $stream, Executor $executor, Parser $parser, LoopInterface $loop, $idlePeriod)
{
$this->stream = $stream;
$this->executor = $executor;
$this->parser = $parser;

$this->loop = $loop;
if ($idlePeriod !== null) {
$this->idlePeriod = $idlePeriod;
}

$stream->on('error', [$this, 'handleConnectionError']);
$stream->on('close', [$this, 'handleConnectionClosed']);
}

/**
* busy executing some command such as query or ping
*
* @return bool
* @throws void
*/
public function isBusy()
{
return $this->parser->isBusy() || !$this->executor->isIdle();
}

/**
* {@inheritdoc}
*/
Expand All @@ -71,6 +108,7 @@ public function query($sql, array $params = [])
return \React\Promise\reject($e);
}

$this->awake();
$deferred = new Deferred();

// store all result set rows until result set end
Expand All @@ -86,11 +124,13 @@ public function query($sql, array $params = [])

$rows = [];

$this->idle();
$deferred->resolve($result);
});

// resolve / reject status reply (response without result set)
$command->on('error', function ($error) use ($deferred) {
$this->idle();
$deferred->reject($error);
});
$command->on('success', function () use ($command, $deferred) {
Expand All @@ -99,6 +139,7 @@ public function query($sql, array $params = [])
$result->insertId = $command->insertId;
$result->warningCount = $command->warningCount;

$this->idle();
$deferred->resolve($result);
});

Expand All @@ -115,20 +156,30 @@ public function queryStream($sql, $params = [])
$command = new QueryCommand();
$command->setQuery($query);
$this->_doCommand($command);
$this->awake();

$stream = new QueryStream($command, $this->stream);
$stream->on('close', function () {
$this->idle();
});

return new QueryStream($command, $this->stream);
return $stream;
}

public function ping()
{
return new Promise(function ($resolve, $reject) {
$this->_doCommand(new PingCommand())
->on('error', function ($reason) use ($reject) {
$reject($reason);
})
->on('success', function () use ($resolve) {
$resolve(null);
});
$command = $this->_doCommand(new PingCommand());
$this->awake();

$command->on('success', function () use ($resolve) {
$this->idle();
$resolve(null);
});
$command->on('error', function ($reason) use ($reject) {
$this->idle();
$reject($reason);
});
});
}

Expand All @@ -137,6 +188,10 @@ public function quit()
return new Promise(function ($resolve, $reject) {
$command = $this->_doCommand(new QuitCommand());
$this->state = self::STATE_CLOSING;

// mark connection as "awake" until it is closed, so never "idle"
$this->awake();

$command->on('success', function () use ($resolve) {
$resolve(null);
$this->close();
Expand All @@ -158,6 +213,11 @@ public function close()
$remoteClosed = $this->stream->isReadable() === false && $this->stream->isWritable() === false;
$this->stream->close();

if ($this->idleTimer !== null) {
$this->loop->cancelTimer($this->idleTimer);
$this->idleTimer = null;
}

// reject all pending commands if connection is closed
while (!$this->executor->isIdle()) {
$command = $this->executor->dequeue();
Expand Down Expand Up @@ -223,4 +283,29 @@ protected function _doCommand(CommandInterface $command)

return $this->executor->enqueue($command);
}

private function awake()
{
++$this->pending;

if ($this->idleTimer !== null) {
$this->loop->cancelTimer($this->idleTimer);
$this->idleTimer = null;
}
}

private function idle()
{
--$this->pending;

if ($this->pending < 1 && $this->idlePeriod >= 0 && $this->state === self::STATE_AUTHENTICATED) {
$this->idleTimer = $this->loop->addTimer($this->idlePeriod, function () {
// soft-close connection and emit close event afterwards both on success or on error
$this->idleTimer = null;
$this->quit()->then(null, function () {
// ignore to avoid reporting unhandled rejection
});
});
}
}
}
5 changes: 3 additions & 2 deletions src/Io/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,12 @@ public function createConnection(
$connecting->cancel();
});

$connecting->then(function (SocketConnectionInterface $stream) use ($authCommand, $deferred, $uri) {
$idlePeriod = isset($args['idle']) ? (float) $args['idle'] : null;
$connecting->then(function (SocketConnectionInterface $stream) use ($authCommand, $deferred, $uri, $idlePeriod) {
$executor = new Executor();
$parser = new Parser($stream, $executor);

$connection = new Connection($stream, $executor);
$connection = new Connection($stream, $executor, $parser, $this->loop, $idlePeriod);
$command = $executor->enqueue($authCommand);
$parser->start();

Expand Down
11 changes: 11 additions & 0 deletions src/Io/Parser.php
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ public function __construct(DuplexStreamInterface $stream, Executor $executor)
});
}

/**
* busy executing some command such as query or ping
*
* @return bool
* @throws void
*/
public function isBusy()
{
return $this->currCommand !== null;
}

public function start()
{
$this->stream->on('data', [$this, 'handleData']);
Expand Down
Loading

0 comments on commit 3cf70c6

Please sign in to comment.