diff --git a/vertx-mssql-client/src/test/java/io/vertx/tests/mssqlclient/MSSQLQueriesTest.java b/vertx-mssql-client/src/test/java/io/vertx/tests/mssqlclient/MSSQLQueriesTest.java index 07b645cab..9e0b4869e 100644 --- a/vertx-mssql-client/src/test/java/io/vertx/tests/mssqlclient/MSSQLQueriesTest.java +++ b/vertx-mssql-client/src/test/java/io/vertx/tests/mssqlclient/MSSQLQueriesTest.java @@ -40,8 +40,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.*; import static org.hamcrest.MatcherAssert.assertThat; @RunWith(VertxUnitRunner.class) @@ -206,4 +205,22 @@ public void testQuerySequences(TestContext ctx) { })); })); } + + @Test + public void testUnsupportedXmlTypeErrorPropagation(TestContext ctx) { + connection + .query("CREATE TABLE #TempXmlTest (id INT, xmlData XML)") + .execute() + .onComplete(ctx.asyncAssertSuccess(create -> { + connection + .query("SELECT * FROM #TempXmlTest") + .execute() + .onComplete(ctx.asyncAssertFailure(t -> { + ctx.verify(v -> { + assertThat(t, instanceOf(UnsupportedOperationException.class)); + assertThat(t.getMessage(), containsString("XML")); + }); + })); + })); + } } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitPgCommandMessage.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitPgCommandMessage.java index 4f1c11bce..5def14084 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitPgCommandMessage.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitPgCommandMessage.java @@ -16,19 +16,19 @@ */ package io.vertx.pgclient.impl.codec; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; - import io.netty.buffer.ByteBuf; import io.vertx.core.VertxException; import io.vertx.pgclient.impl.PgDatabaseMetadata; import io.vertx.pgclient.impl.PgSocketConnection; import io.vertx.pgclient.impl.auth.scram.ScramAuthentication; import io.vertx.pgclient.impl.auth.scram.ScramSession; -import io.vertx.sqlclient.spi.connection.Connection; import io.vertx.sqlclient.codec.CommandResponse; +import io.vertx.sqlclient.spi.connection.Connection; import io.vertx.sqlclient.spi.protocol.InitCommand; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + class InitPgCommandMessage extends PgCommandMessage { private PgEncoder encoder; @@ -65,9 +65,17 @@ void handleAuthenticationSasl(ByteBuf in) { throw new VertxException("Scram authentication not supported, missing com.ongres.scram:scram-client on the class/module path"); } scramSession = scramAuth.session(cmd.username(), cmd.password().toCharArray()); - encoder.writeScramClientInitialMessage( + try { + encoder.writeScramClientInitialMessage( scramSession.createInitialSaslMessage(in, encoder.channelHandlerContext())); - encoder.flush(); + encoder.flush(); + } catch (RuntimeException e) { + decoder.fireCommandResponse(CommandResponse.failure(e)); + // If the frontend does not support the authentication method requested by the server, + // then it should immediately close the connection. + // See https://www.postgresql.org/docs/current/protocol-flow.html + encoder.close(); + } } @Override diff --git a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgConnectionTest.java b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgConnectionTest.java index b7b0043bd..f57a63bd1 100644 --- a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgConnectionTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgConnectionTest.java @@ -20,7 +20,7 @@ import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.pgclient.PgConnection; -import io.vertx.sqlclient.ClosedConnectionException; +import io.vertx.pgclient.PgException; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.RowSet; import io.vertx.sqlclient.Tuple; @@ -148,7 +148,12 @@ public void testInflightCommandsFailWhenConnectionClosed(TestContext ctx) { .query("SELECT pg_sleep(20)") .execute() .onComplete(ctx.asyncAssertFailure(t -> { - ctx.assertTrue(t instanceof ClosedConnectionException); + if (t instanceof PgException) { + PgException pge = (PgException) t; + ctx.assertEquals("57P01", pge.getSqlState()); // ADMIN SHUTDOWN + } else { + ctx.fail(t); + } })); connector.accept(ctx.asyncAssertSuccess(conn2 -> { conn2 diff --git a/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java b/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java index 85b912b40..e3ee9414a 100644 --- a/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java +++ b/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java @@ -21,28 +21,25 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderException; -import io.vertx.core.*; +import io.vertx.core.Completable; +import io.vertx.core.Vertx; +import io.vertx.core.VertxException; +import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.logging.Logger; import io.vertx.core.internal.logging.LoggerFactory; +import io.vertx.core.internal.net.NetSocketInternal; import io.vertx.core.net.SocketAddress; import io.vertx.core.spi.metrics.ClientMetrics; import io.vertx.core.tracing.TracingPolicy; -import io.vertx.core.internal.ContextInternal; -import io.vertx.core.internal.net.NetSocketInternal; import io.vertx.sqlclient.ClosedConnectionException; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.SqlConnectOptions; import io.vertx.sqlclient.codec.impl.PreparedStatementCache; -import io.vertx.sqlclient.spi.connection.Connection; import io.vertx.sqlclient.internal.PreparedStatement; import io.vertx.sqlclient.spi.DatabaseMetadata; +import io.vertx.sqlclient.spi.connection.Connection; import io.vertx.sqlclient.spi.connection.ConnectionContext; -import io.vertx.sqlclient.spi.protocol.CloseConnectionCommand; -import io.vertx.sqlclient.spi.protocol.CloseStatementCommand; -import io.vertx.sqlclient.spi.protocol.CommandBase; -import io.vertx.sqlclient.spi.protocol.CompositeCommand; -import io.vertx.sqlclient.spi.protocol.ExtendedQueryCommand; -import io.vertx.sqlclient.spi.protocol.PrepareStatementCommand; +import io.vertx.sqlclient.spi.protocol.*; import java.util.ArrayDeque; import java.util.List; @@ -437,11 +434,12 @@ private void handleClose(Throwable t) { if (t != null) { reportException(t); } + Throwable inflightCause = t != null ? t : ClosedConnectionException.INSTANCE; CommandMessage msg; while ((msg = inflights.poll()) != null) { - fail(msg, ClosedConnectionException.INSTANCE); + fail(msg, inflightCause); } - Throwable cause = t == null ? VertxException.noStackTrace(PENDING_CMD_CONNECTION_CORRUPT_MSG) : new VertxException(PENDING_CMD_CONNECTION_CORRUPT_MSG, t); + Throwable cause = t == null ? VertxException.noStackTrace(PENDING_CMD_CONNECTION_CORRUPT_MSG) : VertxException.noStackTrace(PENDING_CMD_CONNECTION_CORRUPT_MSG, t); CommandBase cmd; while ((cmd = pending.poll()) != null) { CommandBase c = cmd;