From 16c002f94d4a9a6db08463a0257b8561938dd6cf Mon Sep 17 00:00:00 2001 From: Yoan Arnaudov Date: Wed, 26 Nov 2025 12:18:50 +0200 Subject: [PATCH 1/2] Implement async query functionality --- src/Async/AsyncQuery.php | 58 ++++ src/Async/AsyncQueryBatch.php | 312 ++++++++++++++++++ src/Async/Exception/AsyncNotSupported.php | 50 +++ src/Connection.php | 43 +++ src/Driver/API/AsyncConnection.php | 45 +++ src/Driver/Mysqli/AsyncResult.php | 133 ++++++++ src/Driver/Mysqli/Connection.php | 79 ++++- src/Driver/PgSQL/Connection.php | 54 ++- tests/Async/AsyncQueryTest.php | 71 ++++ .../Async/Exception/AsyncNotSupportedTest.php | 46 +++ tests/Functional/AsyncQueryTest.php | 311 +++++++++++++++++ 11 files changed, 1200 insertions(+), 2 deletions(-) create mode 100644 src/Async/AsyncQuery.php create mode 100644 src/Async/AsyncQueryBatch.php create mode 100644 src/Async/Exception/AsyncNotSupported.php create mode 100644 src/Driver/API/AsyncConnection.php create mode 100644 src/Driver/Mysqli/AsyncResult.php create mode 100644 tests/Async/AsyncQueryTest.php create mode 100644 tests/Async/Exception/AsyncNotSupportedTest.php create mode 100644 tests/Functional/AsyncQueryTest.php diff --git a/src/Async/AsyncQuery.php b/src/Async/AsyncQuery.php new file mode 100644 index 00000000000..2c7e52aea0a --- /dev/null +++ b/src/Async/AsyncQuery.php @@ -0,0 +1,58 @@ +|array */ + private array $params; + + /** @var array|array */ + private array $types; + + /** + * @param string $sql The SQL query string + * @param list|array $params Query parameters + * @param array|array $types Parameter types + */ + public function __construct(string $sql, array $params = [], array $types = []) + { + $this->sql = $sql; + $this->params = $params; + $this->types = $types; + } + + public function getSQL(): string + { + return $this->sql; + } + + /** + * @return list|array + */ + public function getParams(): array + { + return $this->params; + } + + /** + * @return array|array + */ + public function getTypes(): array + { + return $this->types; + } +} + diff --git a/src/Async/AsyncQueryBatch.php b/src/Async/AsyncQueryBatch.php new file mode 100644 index 00000000000..b1fdaafa10a --- /dev/null +++ b/src/Async/AsyncQueryBatch.php @@ -0,0 +1,312 @@ + */ + private array $params; + + public function __construct(Connection $connection) + { + $this->connection = $connection; + $this->driver = $connection->getDriver(); + $this->params = $connection->getParams(); + } + + /** + * Executes multiple queries in parallel and returns all results. + * + * @param AsyncQuery[] $queries The queries to execute in parallel + * + * @return Result[] The results in the same order as the input queries + * + * @throws AsyncNotSupported If the driver does not support async queries + * @throws Exception If query execution fails + */ + public function execute(array $queries): array + { + if (count($queries) === 0) { + throw AsyncNotSupported::emptyQueryBatch(); + } + + // Create async connections for each query + $asyncConnections = $this->createAsyncConnections(count($queries)); + + // Send all queries + $this->sendQueries($asyncConnections, $queries); + + // Wait for all results + $this->waitForCompletion($asyncConnections); + + // Collect results (connections will be automatically cleaned up when they go out of scope) + return $this->collectResults($asyncConnections); + } + + /** + * Creates multiple async-capable connections. + * + * @return AsyncConnection[] + * + * @throws AsyncNotSupported + * @throws Exception + */ + private function createAsyncConnections(int $count): array + { + $connections = []; + + for ($i = 0; $i < $count; $i++) { + $driverConnection = $this->driver->connect($this->params); + + if (! $driverConnection instanceof AsyncConnection) { + throw AsyncNotSupported::driverNotSupported(get_class($driverConnection)); + } + + $connections[] = $driverConnection; + } + + return $connections; + } + + /** + * Sends all queries to their respective connections. + * + * @param AsyncConnection[] $connections + * @param AsyncQuery[] $queries + */ + private function sendQueries(array $connections, array $queries): void + { + foreach ($queries as $index => $query) { + $connection = $connections[$index]; + $sql = $query->getSQL(); + $params = $query->getParams(); + + // For mysqli, we need to handle parameter substitution differently + // since mysqli async doesn't support prepared statements + if ($connection instanceof MysqliConnection) { + $sql = $this->substituteParamsForMysqli($connection, $sql, $params); + $connection->sendQueryAsync($sql, []); + } else { + // For PostgreSQL, we can pass params directly + // But we need to convert named params to positional if needed + $connection->sendQueryAsync($sql, $this->convertToPositionalParams($params)); + } + } + } + + /** + * Substitutes parameters into SQL for mysqli async queries. + * + * @param list|array $params + */ + private function substituteParamsForMysqli(MysqliConnection $connection, string $sql, array $params): string + { + if (count($params) === 0) { + return $sql; + } + + $mysqli = $connection->getNativeConnection(); + + // Simple positional parameter substitution + foreach ($params as $param) { + $escaped = $this->escapeValueForMysqli($mysqli, $param); + $pos = strpos($sql, '?'); + if ($pos !== false) { + $sql = substr_replace($sql, $escaped, $pos, 1); + } + } + + return $sql; + } + + /** + * Escapes a value for use in a mysqli query. + * + * @param mixed $value + */ + private function escapeValueForMysqli(mysqli $mysqli, $value): string + { + if ($value === null) { + return 'NULL'; + } + + if (is_bool($value)) { + return $value ? '1' : '0'; + } + + if (is_int($value) || is_float($value)) { + return (string) $value; + } + + return "'" . $mysqli->escape_string((string) $value) . "'"; + } + + /** + * Converts named parameters to positional parameters. + * + * @param list|array $params + * + * @return list + */ + private function convertToPositionalParams(array $params): array + { + if (count($params) === 0) { + return []; + } + + // If already positional (numeric keys), return as-is + $keys = array_keys($params); + if (is_int($keys[0])) { + return array_values($params); + } + + // For named params, just return values (SQL must use $1, $2 style for PgSQL) + return array_values($params); + } + + /** + * Waits for all async queries to complete. + * + * @param AsyncConnection[] $connections + */ + private function waitForCompletion(array $connections): void + { + // Determine wait strategy based on connection type + $first = $connections[0] ?? null; + + if ($first instanceof MysqliConnection) { + $this->waitForMysqliCompletion($connections); + } elseif ($first instanceof PgSQLConnection) { + $this->waitForPgSQLCompletion($connections); + } + } + + /** + * Waits for mysqli async queries using mysqli_poll. + * + * @param AsyncConnection[] $connections + */ + private function waitForMysqliCompletion(array $connections): void + { + $pending = []; + foreach ($connections as $index => $connection) { + if ($connection instanceof MysqliConnection) { + $pending[$index] = $connection->getNativeConnection(); + } + } + + while (count($pending) > 0) { + $read = array_values($pending); + $error = []; + $reject = []; + + // Poll with 1 second timeout + $result = mysqli_poll($read, $error, $reject, 1); + + if ($result === false) { + break; + } + + // Remove completed connections from pending + foreach ($read as $readyConnection) { + foreach ($pending as $index => $mysqli) { + if ($mysqli === $readyConnection) { + unset($pending[$index]); + break; + } + } + } + + // Small sleep to prevent busy-waiting + if (count($pending) > 0 && $result === 0) { + usleep(1000); // 1ms + } + } + } + + /** + * Waits for PostgreSQL async queries to complete. + * + * @param AsyncConnection[] $connections + */ + private function waitForPgSQLCompletion(array $connections): void + { + $pending = []; + foreach ($connections as $index => $connection) { + if ($connection instanceof PgSQLConnection) { + $pending[$index] = $connection; + } + } + + while (count($pending) > 0) { + foreach ($pending as $index => $connection) { + $nativeConnection = $connection->getNativeConnection(); + if (! pg_connection_busy($nativeConnection)) { + unset($pending[$index]); + } + } + + // Small sleep to prevent busy-waiting + if (count($pending) > 0) { + usleep(1000); // 1ms + } + } + } + + /** + * Collects results from all completed async queries. + * + * @param AsyncConnection[] $connections + * + * @return Result[] + */ + private function collectResults(array $connections): array + { + $results = []; + + foreach ($connections as $connection) { + $driverResult = $connection->getAsyncResult(); + $results[] = new Result($driverResult, $this->connection); + } + + return $results; + } +} + diff --git a/src/Async/Exception/AsyncNotSupported.php b/src/Async/Exception/AsyncNotSupported.php new file mode 100644 index 00000000000..ed708be7e6f --- /dev/null +++ b/src/Async/Exception/AsyncNotSupported.php @@ -0,0 +1,50 @@ +executeQueriesAsync([ + * new AsyncQuery('SELECT * FROM users WHERE id = ?', [1]), + * new AsyncQuery('SELECT * FROM orders WHERE user_id = ?', [1]), + * new AsyncQuery('SELECT COUNT(*) FROM products'), + * ]); + * + * @param AsyncQuery[] $queries The queries to execute in parallel + * + * @return Result[] The results in the same order as the input queries + * + * @throws AsyncNotSupported If async queries are not supported + * @throws Exception If query execution fails + */ + public function executeQueriesAsync(array $queries): array + { + if (PHP_VERSION_ID < 80100) { + throw AsyncNotSupported::phpVersionTooOld(); + } + + if ($this->transactionNestingLevel > 0) { + throw AsyncNotSupported::notAllowedInTransaction(); + } + + $batch = new AsyncQueryBatch($this); + + return $batch->execute($queries); + } + /** * Executes a caching query. * diff --git a/src/Driver/API/AsyncConnection.php b/src/Driver/API/AsyncConnection.php new file mode 100644 index 00000000000..80e63ca4f5e --- /dev/null +++ b/src/Driver/API/AsyncConnection.php @@ -0,0 +1,45 @@ + $params Query parameters (positional only for async) + * + * @return bool True if the query was sent successfully + * + * @throws Exception If sending the query fails + */ + public function sendQueryAsync(string $sql, array $params = []): bool; + + /** + * Checks if the connection is busy processing an async query. + */ + public function isBusy(): bool; + + /** + * Retrieves the result of the last async query. + * + * This method should be called after sendQueryAsync() and after + * confirming the connection is no longer busy. + * + * @throws Exception If retrieving the result fails + */ + public function getAsyncResult(): Result; +} + diff --git a/src/Driver/Mysqli/AsyncResult.php b/src/Driver/Mysqli/AsyncResult.php new file mode 100644 index 00000000000..fe53afdb9d7 --- /dev/null +++ b/src/Driver/Mysqli/AsyncResult.php @@ -0,0 +1,133 @@ +result = $result; + $this->connection = $connection; + } + + /** + * {@inheritDoc} + */ + public function fetchNumeric() + { + if ($this->result === null) { + return false; + } + + $row = $this->result->fetch_row(); + + return $row ?? false; + } + + /** + * {@inheritDoc} + */ + public function fetchAssociative() + { + if ($this->result === null) { + return false; + } + + $row = $this->result->fetch_assoc(); + + return $row ?? false; + } + + /** + * {@inheritDoc} + */ + public function fetchOne() + { + return FetchUtils::fetchOne($this); + } + + /** + * {@inheritDoc} + */ + public function fetchAllNumeric(): array + { + if ($this->result === null) { + return []; + } + + return $this->result->fetch_all(MYSQLI_NUM); + } + + /** + * {@inheritDoc} + */ + public function fetchAllAssociative(): array + { + if ($this->result === null) { + return []; + } + + return $this->result->fetch_all(MYSQLI_ASSOC); + } + + /** + * {@inheritDoc} + */ + public function fetchFirstColumn(): array + { + return FetchUtils::fetchFirstColumn($this); + } + + public function rowCount(): int + { + if ($this->result !== null) { + return $this->result->num_rows; + } + + return $this->connection->affected_rows; + } + + public function columnCount(): int + { + if ($this->result === null) { + return 0; + } + + return $this->result->field_count; + } + + public function free(): void + { + if ($this->result === null) { + return; + } + + $this->result->free(); + } +} + diff --git a/src/Driver/Mysqli/Connection.php b/src/Driver/Mysqli/Connection.php index d492684cc11..0321042e42c 100644 --- a/src/Driver/Mysqli/Connection.php +++ b/src/Driver/Mysqli/Connection.php @@ -2,6 +2,7 @@ namespace Doctrine\DBAL\Driver\Mysqli; +use Doctrine\DBAL\Driver\API\AsyncConnection; use Doctrine\DBAL\Driver\Mysqli\Exception\ConnectionError; use Doctrine\DBAL\Driver\Result as ResultInterface; use Doctrine\DBAL\Driver\ServerInfoAwareConnection; @@ -11,7 +12,11 @@ use mysqli; use mysqli_sql_exception; -final class Connection implements ServerInfoAwareConnection +use function mysqli_poll; + +use const MYSQLI_ASYNC; + +final class Connection implements ServerInfoAwareConnection, AsyncConnection { /** * Name of the option to set connection flags @@ -138,4 +143,76 @@ public function getNativeConnection(): mysqli { return $this->connection; } + + /** + * Sends a query asynchronously without waiting for results. + * + * Note: mysqli async queries do not support prepared statements. + * Parameters must be escaped and embedded in the query string. + * + * @param string $sql The SQL query to execute (with params already embedded) + * @param list $params Not used for mysqli - params must be in SQL string + * + * @throws ConnectionError If sending the query fails + */ + public function sendQueryAsync(string $sql, array $params = []): bool + { + // Note: mysqli MYSQLI_ASYNC does not support prepared statements + // The caller must ensure parameters are properly escaped in the SQL string + try { + $result = $this->connection->query($sql, MYSQLI_ASYNC); + } catch (mysqli_sql_exception $e) { + throw ConnectionError::upcast($e); + } + + // For async queries, query() returns true immediately + if ($result === false) { + throw ConnectionError::new($this->connection); + } + + return true; + } + + /** + * Checks if the connection is busy processing an async query. + * + * Uses mysqli_poll with 0 timeout to check without blocking. + */ + public function isBusy(): bool + { + $read = [$this->connection]; + $error = []; + $reject = []; + + // Poll with 0 second timeout to check status without blocking + $result = mysqli_poll($read, $error, $reject, 0, 0); + + // If connection is in $read array, it means result is ready (not busy) + // If $result is 0, connection is still busy + return $result === 0; + } + + /** + * Retrieves the result of the last async query. + * + * @throws ConnectionError If retrieving the result fails + */ + public function getAsyncResult(): ResultInterface + { + $result = $this->connection->reap_async_query(); + + if ($result === false) { + throw ConnectionError::new($this->connection); + } + + // reap_async_query returns mysqli_result|bool + // For SELECT queries, it returns mysqli_result + // For INSERT/UPDATE/DELETE, it returns true + if ($result === true) { + // For non-SELECT queries, return AsyncResult with null + return new AsyncResult(null, $this->connection); + } + + return new AsyncResult($result, $this->connection); + } } diff --git a/src/Driver/PgSQL/Connection.php b/src/Driver/PgSQL/Connection.php index 378e8ed7a15..65376907c25 100644 --- a/src/Driver/PgSQL/Connection.php +++ b/src/Driver/PgSQL/Connection.php @@ -2,6 +2,7 @@ namespace Doctrine\DBAL\Driver\PgSQL; +use Doctrine\DBAL\Driver\API\AsyncConnection; use Doctrine\DBAL\Driver\ServerInfoAwareConnection; use Doctrine\DBAL\ParameterType; use Doctrine\DBAL\SQL\Parser; @@ -15,6 +16,7 @@ use function is_object; use function is_resource; use function pg_close; +use function pg_connection_busy; use function pg_escape_bytea; use function pg_escape_literal; use function pg_get_result; @@ -22,11 +24,12 @@ use function pg_result_error; use function pg_send_prepare; use function pg_send_query; +use function pg_send_query_params; use function pg_version; use function sprintf; use function uniqid; -final class Connection implements ServerInfoAwareConnection +final class Connection implements ServerInfoAwareConnection, AsyncConnection { /** @var PgSqlConnection|resource */ private $connection; @@ -158,4 +161,53 @@ public function getNativeConnection() { return $this->connection; } + + /** + * Sends a query asynchronously without waiting for results. + * + * @param string $sql The SQL query to execute + * @param list $params Query parameters (positional only for async) + * + * @throws Exception If sending the query fails + */ + public function sendQueryAsync(string $sql, array $params = []): bool + { + if (count($params) > 0) { + // Use pg_send_query_params for parameterized queries + $result = @pg_send_query_params($this->connection, $sql, $params); + } else { + $result = @pg_send_query($this->connection, $sql); + } + + if ($result !== true) { + throw new Exception(pg_last_error($this->connection)); + } + + return true; + } + + /** + * Checks if the connection is busy processing an async query. + */ + public function isBusy(): bool + { + return pg_connection_busy($this->connection); + } + + /** + * Retrieves the result of the last async query. + * + * @throws Exception If retrieving the result fails + */ + public function getAsyncResult(): Result + { + $result = @pg_get_result($this->connection); + assert($result !== false); + + if ((bool) pg_result_error($result)) { + throw Exception::fromResult($result); + } + + return new Result($result); + } } diff --git a/tests/Async/AsyncQueryTest.php b/tests/Async/AsyncQueryTest.php new file mode 100644 index 00000000000..c07805836d2 --- /dev/null +++ b/tests/Async/AsyncQueryTest.php @@ -0,0 +1,71 @@ +getSQL()); + self::assertSame([], $query->getParams()); + self::assertSame([], $query->getTypes()); + } + + public function testConstructorWithPositionalParams(): void + { + $query = new AsyncQuery( + 'SELECT * FROM users WHERE id = ? AND status = ?', + [1, 'active'] + ); + + self::assertSame('SELECT * FROM users WHERE id = ? AND status = ?', $query->getSQL()); + self::assertSame([1, 'active'], $query->getParams()); + self::assertSame([], $query->getTypes()); + } + + public function testConstructorWithNamedParams(): void + { + $query = new AsyncQuery( + 'SELECT * FROM users WHERE id = :id AND status = :status', + ['id' => 1, 'status' => 'active'] + ); + + self::assertSame('SELECT * FROM users WHERE id = :id AND status = :status', $query->getSQL()); + self::assertSame(['id' => 1, 'status' => 'active'], $query->getParams()); + } + + public function testConstructorWithTypes(): void + { + $query = new AsyncQuery( + 'SELECT * FROM users WHERE id = ?', + [1], + [ParameterType::INTEGER] + ); + + self::assertSame([ParameterType::INTEGER], $query->getTypes()); + } + + public function testImmutability(): void + { + $params = [1, 'test']; + $types = [ParameterType::INTEGER, ParameterType::STRING]; + + $query = new AsyncQuery('SELECT 1', $params, $types); + + // Modifying original arrays should not affect the query + $params[0] = 999; + $types[0] = ParameterType::BINARY; + + self::assertSame([1, 'test'], $query->getParams()); + self::assertSame([ParameterType::INTEGER, ParameterType::STRING], $query->getTypes()); + } +} + diff --git a/tests/Async/Exception/AsyncNotSupportedTest.php b/tests/Async/Exception/AsyncNotSupportedTest.php new file mode 100644 index 00000000000..026f3bdb98d --- /dev/null +++ b/tests/Async/Exception/AsyncNotSupportedTest.php @@ -0,0 +1,46 @@ +getMessage()); + self::assertStringContainsString(PHP_VERSION, $exception->getMessage()); + } + + public function testDriverNotSupported(): void + { + $exception = AsyncNotSupported::driverNotSupported(Driver::class); + + self::assertStringContainsString(Driver::class, $exception->getMessage()); + self::assertStringContainsString('pgsql and mysqli', $exception->getMessage()); + } + + public function testNotAllowedInTransaction(): void + { + $exception = AsyncNotSupported::notAllowedInTransaction(); + + self::assertStringContainsString('transaction', $exception->getMessage()); + self::assertStringContainsString('separate connection', $exception->getMessage()); + } + + public function testEmptyQueryBatch(): void + { + $exception = AsyncNotSupported::emptyQueryBatch(); + + self::assertStringContainsString('empty batch', $exception->getMessage()); + } +} + diff --git a/tests/Functional/AsyncQueryTest.php b/tests/Functional/AsyncQueryTest.php new file mode 100644 index 00000000000..8a5eb01d207 --- /dev/null +++ b/tests/Functional/AsyncQueryTest.php @@ -0,0 +1,311 @@ += 8.1 + */ +class AsyncQueryTest extends FunctionalTestCase +{ + private const TABLE = 'async_test'; + + protected function setUp(): void + { + parent::setUp(); + + if (PHP_VERSION_ID < 80100) { + self::markTestSkipped('Async queries require PHP 8.1 or higher'); + } + } + + protected function tearDown(): void + { + $this->dropTableIfExists(self::TABLE); + $this->markConnectionNotReusable(); + } + + public function testAsyncQueriesNotSupportedWithPDO(): void + { + $nativeConnection = $this->connection->getNativeConnection(); + + if (! $nativeConnection instanceof \PDO) { + self::markTestSkipped('This test requires a PDO connection'); + } + + $this->expectException(AsyncNotSupported::class); + $this->expectExceptionMessage('does not support async queries'); + + $this->connection->executeQueriesAsync([ + new AsyncQuery('SELECT 1'), + ]); + } + + public function testAsyncQueriesNotAllowedInTransaction(): void + { + $this->skipIfNotAsyncCapable(); + + $this->connection->beginTransaction(); + + try { + $this->expectException(AsyncNotSupported::class); + $this->expectExceptionMessage('transaction'); + + $this->connection->executeQueriesAsync([ + new AsyncQuery('SELECT 1'), + ]); + } finally { + $this->connection->rollBack(); + } + } + + public function testEmptyQueryBatchThrowsException(): void + { + $this->skipIfNotAsyncCapable(); + + $this->expectException(AsyncNotSupported::class); + $this->expectExceptionMessage('empty batch'); + + $this->connection->executeQueriesAsync([]); + } + + public function testSingleAsyncQuery(): void + { + $this->skipIfNotAsyncCapable(); + + $results = $this->connection->executeQueriesAsync([ + new AsyncQuery('SELECT 1 AS val'), + ]); + + self::assertCount(1, $results); + + $row = $results[0]->fetchAssociative(); + self::assertIsArray($row); + self::assertEquals(1, $row['val']); + } + + public function testMultipleAsyncQueries(): void + { + $this->skipIfNotAsyncCapable(); + $this->createTestTable(); + + // Insert test data + $this->connection->insert(self::TABLE, ['name' => 'Alice', 'value' => 100]); + $this->connection->insert(self::TABLE, ['name' => 'Bob', 'value' => 200]); + $this->connection->insert(self::TABLE, ['name' => 'Charlie', 'value' => 300]); + + // Run multiple queries in parallel + $results = $this->connection->executeQueriesAsync([ + new AsyncQuery('SELECT name FROM ' . self::TABLE . ' WHERE value = 100'), + new AsyncQuery('SELECT name FROM ' . self::TABLE . ' WHERE value = 200'), + new AsyncQuery('SELECT COUNT(*) as cnt FROM ' . self::TABLE), + ]); + + self::assertCount(3, $results); + + // Verify first query result + $row1 = $results[0]->fetchAssociative(); + self::assertIsArray($row1); + self::assertEquals('Alice', $row1['name']); + + // Verify second query result + $row2 = $results[1]->fetchAssociative(); + self::assertIsArray($row2); + self::assertEquals('Bob', $row2['name']); + + // Verify third query result (count) + $row3 = $results[2]->fetchAssociative(); + self::assertIsArray($row3); + self::assertEquals(3, $row3['cnt']); + } + + public function testAsyncQueryWithParameters(): void + { + $this->skipIfNotAsyncCapable(); + $this->createTestTable(); + + // Insert test data + $this->connection->insert(self::TABLE, ['name' => 'Test', 'value' => 42]); + + // For PostgreSQL, we can use positional parameters ($1, $2) + // For MySQL, we need to embed values (async doesn't support prepared statements) + $driverConnection = $this->getDriverConnection(); + + if ($driverConnection instanceof PgSQLConnection) { + $results = $this->connection->executeQueriesAsync([ + new AsyncQuery('SELECT name FROM ' . self::TABLE . ' WHERE value = $1', [42]), + ]); + } else { + // For mysqli, use direct value (params are embedded in SQL) + $results = $this->connection->executeQueriesAsync([ + new AsyncQuery('SELECT name FROM ' . self::TABLE . ' WHERE value = 42'), + ]); + } + + self::assertCount(1, $results); + + $row = $results[0]->fetchAssociative(); + self::assertIsArray($row); + self::assertEquals('Test', $row['name']); + } + + public function testAsyncQueriesPreserveOrder(): void + { + $this->skipIfNotAsyncCapable(); + + // Execute queries that might complete in different order + $results = $this->connection->executeQueriesAsync([ + new AsyncQuery('SELECT 1 AS order_num'), + new AsyncQuery('SELECT 2 AS order_num'), + new AsyncQuery('SELECT 3 AS order_num'), + ]); + + self::assertCount(3, $results); + + // Results should be in the same order as queries + self::assertEquals(1, $results[0]->fetchAssociative()['order_num']); + self::assertEquals(2, $results[1]->fetchAssociative()['order_num']); + self::assertEquals(3, $results[2]->fetchAssociative()['order_num']); + } + + /** + * Tests that queries actually run in parallel by using sleep functions. + * + * If queries run sequentially: 3 queries × 1 second = ~3 seconds + * If queries run in parallel: ~1 second (+ overhead) + * + * We assert that total time is between 1 and 2.5 seconds to prove parallelism. + */ + public function testQueriesExecuteInParallel(): void + { + $this->skipIfNotAsyncCapable(); + + $sleepQuery = $this->getSleepQuery(1); + + $startTime = microtime(true); + + $results = $this->connection->executeQueriesAsync([ + new AsyncQuery($sleepQuery), + new AsyncQuery($sleepQuery), + new AsyncQuery($sleepQuery), + ]); + + $endTime = microtime(true); + $duration = $endTime - $startTime; + + self::assertCount(3, $results); + + // If parallel: should take ~1 second (+ connection overhead) + // If sequential: would take ~3 seconds + // We allow up to 2.5 seconds to account for connection setup overhead + self::assertGreaterThanOrEqual(1.0, $duration, 'Queries should take at least 1 second (sleep time)'); + self::assertLessThan(2.5, $duration, 'Queries should complete in under 2.5 seconds if running in parallel (sequential would take ~3s)'); + } + + /** + * Tests parallel execution with varying sleep times. + * + * Total time should be approximately equal to the longest query, + * not the sum of all query times. + */ + public function testParallelExecutionWithVaryingSleepTimes(): void + { + $this->skipIfNotAsyncCapable(); + + $startTime = microtime(true); + + // Queries with different sleep times: 0.5s, 1s, 0.3s + $results = $this->connection->executeQueriesAsync([ + new AsyncQuery($this->getSleepQuery(0.5)), + new AsyncQuery($this->getSleepQuery(1)), // Longest query + new AsyncQuery($this->getSleepQuery(0.3)), + ]); + + $endTime = microtime(true); + $duration = $endTime - $startTime; + + self::assertCount(3, $results); + + // If parallel: should take ~1 second (the longest query) + // If sequential: would take ~1.8 seconds (0.5 + 1 + 0.3) + self::assertGreaterThanOrEqual(1.0, $duration, 'Should take at least as long as the longest query'); + self::assertLessThan(1.7, $duration, 'Should complete faster than sequential execution (which would take ~1.8s)'); + } + + /** + * Gets a sleep query appropriate for the current database. + * + * @param float $seconds Number of seconds to sleep + */ + private function getSleepQuery(float $seconds): string + { + $nativeConnection = $this->connection->getNativeConnection(); + + if ($nativeConnection instanceof \mysqli) { + // MySQL SLEEP() returns 0 on success + return "SELECT SLEEP($seconds) AS slept"; + } + + // PostgreSQL pg_sleep() returns void, so we select 1 after + return "SELECT pg_sleep($seconds), 1 AS slept"; + } + + private function createTestTable(): void + { + $table = new Table(self::TABLE); + $table->addColumn('id', Types::INTEGER, ['autoincrement' => true]); + $table->addColumn('name', Types::STRING, ['length' => 255]); + $table->addColumn('value', Types::INTEGER); + $table->setPrimaryKey(['id']); + + $this->dropAndCreateTable($table); + } + + /** + * Gets the underlying driver connection. + * + * @return MysqliConnection|PgSQLConnection|PDOConnection|object + */ + private function getDriverConnection(): object + { + return $this->connection->getNativeConnection(); + } + + /** + * Skips the test if the current connection does not support async queries. + */ + private function skipIfNotAsyncCapable(): void + { + $nativeConnection = $this->connection->getNativeConnection(); + + $isAsyncCapable = $nativeConnection instanceof \mysqli + || $nativeConnection instanceof \PgSql\Connection + || is_resource($nativeConnection); + + if (! $isAsyncCapable) { + self::markTestSkipped( + 'This test requires an async-capable driver (pgsql or mysqli). ' + . 'Current driver connection: ' . get_class($nativeConnection) + ); + } + } +} + From 5833b2af5a8098b12648f6bfb6c95802ed72e841 Mon Sep 17 00:00:00 2001 From: Yoan Arnaudov Date: Wed, 26 Nov 2025 12:35:33 +0200 Subject: [PATCH 2/2] Add QueryBuilder helper method --- src/Async/AsyncQuery.php | 25 +++++++++++ tests/Async/AsyncQueryTest.php | 62 ++++++++++++++++++++++++++++ tests/Functional/AsyncQueryTest.php | 64 +++++++++++++++++++++++++++++ 3 files changed, 151 insertions(+) diff --git a/src/Async/AsyncQuery.php b/src/Async/AsyncQuery.php index 2c7e52aea0a..787ff5142f7 100644 --- a/src/Async/AsyncQuery.php +++ b/src/Async/AsyncQuery.php @@ -4,6 +4,7 @@ namespace Doctrine\DBAL\Async; +use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Types\Type; /** @@ -13,6 +14,30 @@ */ final class AsyncQuery { + /** + * Creates an AsyncQuery from a QueryBuilder instance. + * + * This is a convenience factory method that extracts the SQL, parameters, + * and parameter types from a QueryBuilder. + * + * Example: + * $qb = $conn->createQueryBuilder() + * ->select('*') + * ->from('users') + * ->where('id = :id') + * ->setParameter('id', 1); + * + * $asyncQuery = AsyncQuery::fromQueryBuilder($qb); + */ + public static function fromQueryBuilder(QueryBuilder $queryBuilder): self + { + return new self( + $queryBuilder->getSQL(), + $queryBuilder->getParameters(), + $queryBuilder->getParameterTypes() + ); + } + /** @var string */ private string $sql; diff --git a/tests/Async/AsyncQueryTest.php b/tests/Async/AsyncQueryTest.php index c07805836d2..0a62a81ce73 100644 --- a/tests/Async/AsyncQueryTest.php +++ b/tests/Async/AsyncQueryTest.php @@ -5,7 +5,9 @@ namespace Doctrine\DBAL\Tests\Async; use Doctrine\DBAL\Async\AsyncQuery; +use Doctrine\DBAL\Connection; use Doctrine\DBAL\ParameterType; +use Doctrine\DBAL\Query\QueryBuilder; use PHPUnit\Framework\TestCase; class AsyncQueryTest extends TestCase @@ -67,5 +69,65 @@ public function testImmutability(): void self::assertSame([1, 'test'], $query->getParams()); self::assertSame([ParameterType::INTEGER, ParameterType::STRING], $query->getTypes()); } + + public function testFromQueryBuilder(): void + { + $connection = $this->createConnectionMock(); + + $qb = new QueryBuilder($connection); + $qb->select('id', 'name') + ->from('users', 'u') + ->where('u.id = :id') + ->andWhere('u.status = :status') + ->setParameter('id', 42, ParameterType::INTEGER) + ->setParameter('status', 'active', ParameterType::STRING); + + $asyncQuery = AsyncQuery::fromQueryBuilder($qb); + + self::assertSame($qb->getSQL(), $asyncQuery->getSQL()); + self::assertSame($qb->getParameters(), $asyncQuery->getParams()); + self::assertSame($qb->getParameterTypes(), $asyncQuery->getTypes()); + } + + public function testFromQueryBuilderWithPositionalParams(): void + { + $connection = $this->createConnectionMock(); + + $qb = new QueryBuilder($connection); + $qb->select('*') + ->from('products') + ->where('price > ?') + ->setParameter(0, 100, ParameterType::INTEGER); + + $asyncQuery = AsyncQuery::fromQueryBuilder($qb); + + self::assertSame([0 => 100], $asyncQuery->getParams()); + self::assertSame([0 => ParameterType::INTEGER], $asyncQuery->getTypes()); + } + + public function testFromQueryBuilderWithNoParams(): void + { + $connection = $this->createConnectionMock(); + + $qb = new QueryBuilder($connection); + $qb->select('COUNT(*)') + ->from('users'); + + $asyncQuery = AsyncQuery::fromQueryBuilder($qb); + + self::assertStringContainsString('SELECT', $asyncQuery->getSQL()); + self::assertSame([], $asyncQuery->getParams()); + self::assertSame([], $asyncQuery->getTypes()); + } + + private function createConnectionMock(): Connection + { + $platform = new \Doctrine\DBAL\Platforms\MySQLPlatform(); + + $connection = $this->createMock(Connection::class); + $connection->method('getDatabasePlatform')->willReturn($platform); + + return $connection; + } } diff --git a/tests/Functional/AsyncQueryTest.php b/tests/Functional/AsyncQueryTest.php index 8a5eb01d207..ffc4e16c432 100644 --- a/tests/Functional/AsyncQueryTest.php +++ b/tests/Functional/AsyncQueryTest.php @@ -307,5 +307,69 @@ private function skipIfNotAsyncCapable(): void ); } } + + public function testAsyncQueryFromQueryBuilder(): void + { + $this->skipIfNotAsyncCapable(); + $this->createTestTable(); + + // Insert test data + $this->connection->insert(self::TABLE, ['name' => 'QueryBuilder Test', 'value' => 999]); + + // Create queries using QueryBuilder + $qb1 = $this->connection->createQueryBuilder() + ->select('name') + ->from(self::TABLE) + ->where('value = 999'); + + $qb2 = $this->connection->createQueryBuilder() + ->select('COUNT(*) as cnt') + ->from(self::TABLE); + + // Execute using fromQueryBuilder helper + $results = $this->connection->executeQueriesAsync([ + AsyncQuery::fromQueryBuilder($qb1), + AsyncQuery::fromQueryBuilder($qb2), + ]); + + self::assertCount(2, $results); + + $row1 = $results[0]->fetchAssociative(); + self::assertIsArray($row1); + self::assertEquals('QueryBuilder Test', $row1['name']); + + $row2 = $results[1]->fetchAssociative(); + self::assertIsArray($row2); + self::assertGreaterThanOrEqual(1, $row2['cnt']); + } + + public function testAsyncQueryFromMultipleQueryBuilders(): void + { + $this->skipIfNotAsyncCapable(); + $this->createTestTable(); + + // Insert test data + $this->connection->insert(self::TABLE, ['name' => 'Alice', 'value' => 10]); + $this->connection->insert(self::TABLE, ['name' => 'Bob', 'value' => 20]); + $this->connection->insert(self::TABLE, ['name' => 'Charlie', 'value' => 30]); + + // Create multiple QueryBuilder instances + $queries = []; + foreach ([10, 20, 30] as $value) { + $qb = $this->connection->createQueryBuilder() + ->select('name') + ->from(self::TABLE) + ->where('value = ' . $value); + + $queries[] = AsyncQuery::fromQueryBuilder($qb); + } + + $results = $this->connection->executeQueriesAsync($queries); + + self::assertCount(3, $results); + self::assertEquals('Alice', $results[0]->fetchAssociative()['name']); + self::assertEquals('Bob', $results[1]->fetchAssociative()['name']); + self::assertEquals('Charlie', $results[2]->fetchAssociative()['name']); + } }