Skip to content

Commit

Permalink
Merge pull request #186 from clue-labs/mysqlclient
Browse files Browse the repository at this point in the history
Simplify API, add new `MysqlClient` and remove `Factory` and `ConnectionInterface`
  • Loading branch information
WyriHaximus authored Nov 21, 2023
2 parents 6209d11 + c4dc415 commit 9e01898
Show file tree
Hide file tree
Showing 15 changed files with 1,118 additions and 1,151 deletions.
260 changes: 73 additions & 187 deletions README.md

Large diffs are not rendered by default.

8 changes: 2 additions & 6 deletions examples/01-query.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@
// $ php examples/01-query.php
// $ MYSQL_URI=test:test@localhost/test php examples/01-query.php "SELECT * FROM book"

use React\MySQL\Factory;
use React\MySQL\QueryResult;

require __DIR__ . '/../vendor/autoload.php';

$factory = new Factory();
$connection = $factory->createLazyConnection(getenv('MYSQL_URI') ?: 'test:test@localhost/test');
$mysql = new React\MySQL\MysqlClient(getenv('MYSQL_URI') ?: 'test:test@localhost/test');

$query = isset($argv[1]) ? $argv[1] : 'select * from book';
$connection->query($query)->then(function (QueryResult $command) {
$mysql->query($query)->then(function (React\MySQL\QueryResult $command) {
if (isset($command->resultRows)) {
// this is a response to a SELECT etc. with some rows (0+)
print_r($command->resultFields);
Expand Down
7 changes: 2 additions & 5 deletions examples/02-query-stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@
// $ php examples/02-query-stream.php "SHOW VARIABLES"
// $ MYSQL_URI=test:test@localhost/test php examples/02-query-stream.php "SELECT * FROM book"

use React\MySQL\Factory;

require __DIR__ . '/../vendor/autoload.php';

$factory = new Factory();
$connection = $factory->createLazyConnection(getenv('MYSQL_URI') ?: 'test:test@localhost/test');
$mysql = new React\MySQL\MysqlClient(getenv('MYSQL_URI') ?: 'test:test@localhost/test');

$query = isset($argv[1]) ? $argv[1] : 'select * from book';
$stream = $connection->queryStream($query);
$stream = $mysql->queryStream($query);

$stream->on('data', function ($row) {
echo json_encode($row, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) . PHP_EOL;
Expand Down
126 changes: 56 additions & 70 deletions examples/11-interactive.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,87 +3,73 @@
// $ php examples/11-interactive.php
// $ MYSQL_URI=test:test@localhost/test php examples/11-interactive.php

use React\MySQL\ConnectionInterface;
use React\MySQL\QueryResult;
use React\MySQL\Factory;
use React\Stream\ReadableResourceStream;

require __DIR__ . '/../vendor/autoload.php';

$factory = new Factory();
$uri = getenv('MYSQL_URI') ?: 'test:test@localhost/test';
$mysql = new React\MySQL\MysqlClient(getenv('MYSQL_URI') ?: 'test:test@localhost/test');

// open a STDIN stream to read keyboard input (not supported on Windows)
$stdin = new ReadableResourceStream(STDIN);
$stdin->pause();

//create a mysql connection for executing queries
$factory->createConnection($uri)->then(function (ConnectionInterface $connection) use ($stdin) {
echo 'Connection success.' . PHP_EOL;
$stdin->resume();
$stdin = new React\Stream\ReadableResourceStream(STDIN);

$stdin->on('data', function ($line) use ($connection) {
$query = trim($line);
$stdin->on('data', function ($line) use ($mysql) {
$query = trim($line);

if ($query === '') {
// skip empty commands
return;
}
if ($query === 'exit') {
// exit command should close the connection
echo 'bye.' . PHP_EOL;
$connection->quit();
return;
}
if ($query === '') {
// skip empty commands
return;
}
if ($query === 'exit') {
// exit command should close the connection
echo 'bye.' . PHP_EOL;
$mysql->quit();
return;
}

$time = microtime(true);
$connection->query($query)->then(function (QueryResult $command) use ($time) {
if (isset($command->resultRows)) {
// this is a response to a SELECT etc. with some rows (0+)
echo implode("\t", array_column($command->resultFields, 'name')) . PHP_EOL;
foreach ($command->resultRows as $row) {
echo implode("\t", $row) . PHP_EOL;
}

printf(
'%d row%s in set (%.03f sec)%s',
count($command->resultRows),
count($command->resultRows) === 1 ? '' : 's',
microtime(true) - $time,
PHP_EOL
);
} else {
// this is an OK message in response to an UPDATE etc.
// the insertId will only be set if this is
if ($command->insertId !== 0) {
var_dump('last insert ID', $command->insertId);
}
$time = microtime(true);
$mysql->query($query)->then(function (React\MySQL\QueryResult $command) use ($time) {
if (isset($command->resultRows)) {
// this is a response to a SELECT etc. with some rows (0+)
echo implode("\t", array_column($command->resultFields, 'name')) . PHP_EOL;
foreach ($command->resultRows as $row) {
echo implode("\t", $row) . PHP_EOL;
}

printf(
'Query OK, %d row%s affected (%.03f sec)%s',
$command->affectedRows,
$command->affectedRows === 1 ? '' : 's',
microtime(true) - $time,
PHP_EOL
);
printf(
'%d row%s in set (%.03f sec)%s',
count($command->resultRows),
count($command->resultRows) === 1 ? '' : 's',
microtime(true) - $time,
PHP_EOL
);
} else {
// this is an OK message in response to an UPDATE etc.
// the insertId will only be set if this is
if ($command->insertId !== 0) {
var_dump('last insert ID', $command->insertId);
}
}, function (Exception $error) {
// the query was not executed successfully
echo 'Error: ' . $error->getMessage() . PHP_EOL;
});
});

// close connection when STDIN closes (EOF or CTRL+D)
$stdin->on('close', function () use ($connection) {
$connection->quit();
printf(
'Query OK, %d row%s affected (%.03f sec)%s',
$command->affectedRows,
$command->affectedRows === 1 ? '' : 's',
microtime(true) - $time,
PHP_EOL
);
}
}, function (Exception $error) {
// the query was not executed successfully
echo 'Error: ' . $error->getMessage() . PHP_EOL;
});
});

// close STDIN (stop reading) when connection closes
$connection->on('close', function () use ($stdin) {
$stdin->close();
echo 'Disconnected.' . PHP_EOL;
});
}, function (Exception $e) use ($stdin) {
echo 'Connection error: ' . $e->getMessage() . PHP_EOL;
// close connection when STDIN closes (EOF or CTRL+D)
$stdin->on('close', function () use ($mysql) {
$mysql->quit();
});

// close STDIN (stop reading) when connection closes
$mysql->on('close', function () use ($stdin) {
$stdin->close();
echo 'Disconnected.' . PHP_EOL;
});

echo '# Entering interactive mode ready, hit CTRL-D to quit' . PHP_EOL;
72 changes: 36 additions & 36 deletions examples/12-slow-stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@
// $ MYSQL_URI=test:test@localhost/test php examples/12-slow-stream.php "SELECT * FROM book"

use React\EventLoop\Loop;
use React\MySQL\ConnectionInterface;
use React\MySQL\Factory;

require __DIR__ . '/../vendor/autoload.php';

$factory = new Factory();
$uri = getenv('MYSQL_URI') ?: 'test:test@localhost/test';
$mysql = new React\MySQL\MysqlClient(getenv('MYSQL_URI') ?: 'test:test@localhost/test');

$query = isset($argv[1]) ? $argv[1] : 'select * from book';
$stream = $mysql->queryStream($query);

//create a mysql connection for executing query
$factory->createConnection($uri)->then(function (ConnectionInterface $connection) use ($query) {
// The protocol parser reads rather large chunked from the underlying connection
$ref = new ReflectionProperty($mysql, 'connecting');
$ref->setAccessible(true);
$promise = $ref->getValue($mysql);
assert($promise instanceof React\Promise\PromiseInterface);

$promise->then(function (React\MySQL\Io\Connection $connection) {
// The protocol parser reads rather large chunks from the underlying connection
// and as such can yield multiple (dozens to hundreds) rows from a single data
// chunk. We try to artificially limit the stream chunk size here to try to
// only ever read a single row so we can demonstrate throttling this stream.
Expand All @@ -28,11 +30,13 @@
$ref = new ReflectionProperty($connection, 'stream');
$ref->setAccessible(true);
$conn = $ref->getValue($connection);
assert($conn instanceof React\Socket\ConnectionInterface);

// access private "input" (instanceof React\Stream\DuplexStreamInterface)
$ref = new ReflectionProperty($conn, 'input');
$ref->setAccessible(true);
$stream = $ref->getValue($conn);
assert($stream instanceof React\Stream\DuplexStreamInterface);

// reduce private bufferSize to just a few bytes to slow things down
$ref = new ReflectionProperty($stream, 'bufferSize');
Expand All @@ -41,38 +45,34 @@
} catch (Exception $e) {
echo 'Warning: Unable to reduce buffer size: ' . $e->getMessage() . PHP_EOL;
}
});

$stream = $connection->queryStream($query);

$throttle = null;
$stream->on('data', function ($row) use (&$throttle, $stream) {
echo json_encode($row, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) . PHP_EOL;

// simple throttle mechanism: explicitly pause the result stream and
// resume it again after some time.
if ($throttle === null) {
$throttle = Loop::addTimer(1.0, function () use ($stream, &$throttle) {
$throttle = null;
$stream->resume();
});
$stream->pause();
}
});

$stream->on('error', function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

$stream->on('close', function () use (&$throttle) {
echo 'CLOSED' . PHP_EOL;
$throttle = null;
$stream->on('data', function ($row) use (&$throttle, $stream) {
echo json_encode($row, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) . PHP_EOL;

if ($throttle) {
Loop::cancelTimer($throttle);
// simple throttle mechanism: explicitly pause the result stream and
// resume it again after some time.
if ($throttle === null) {
$throttle = Loop::addTimer(1.0, function () use ($stream, &$throttle) {
$throttle = null;
}
});
$stream->resume();
});
$stream->pause();
}
});

$connection->quit();
}, function (Exception $e) {
$stream->on('error', function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

$stream->on('close', function () use (&$throttle) {
echo 'CLOSED' . PHP_EOL;

if ($throttle) {
Loop::cancelTimer($throttle);
$throttle = null;
}
});

$mysql->quit();
Loading

0 comments on commit 9e01898

Please sign in to comment.