From eccba442aea876a4b363e5c3ce416d09b41a04d3 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 30 Jun 2026 10:48:03 +0800 Subject: [PATCH] Stabilize ZSTD compressor level pipe IT (#17917) * Strictly isolate pipe tree and table visibility * Add pipe visibility unit test coverage * Add pipe static meta visibility edge tests * Add pipe table visibility filter test * Fix table pipe RPC visibility in ITs * Fix zstd compressor level IT assertion * it-fix * Update IoTDBPipeSinkCompressionIT.java * Stabilize zstd compressor level pipe IT * Keep only zstd compression pipe IT changes (cherry picked from commit d737f4b301b92deba9178e04dac0fbb380ebcef4) --- .../IoTDBPipeSinkCompressionIT.java | 180 ++++++++---------- 1 file changed, 75 insertions(+), 105 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java index 560653c7f124f..53ae05c637140 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java @@ -217,115 +217,85 @@ public void testZstdCompressorLevel() throws Exception { try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { - TestUtils.executeNonQueries( - senderEnv, - Arrays.asList( - "insert into root.db.d1(time,s1) values (1,1)", - "insert into root.db.d1(time,s2) values (1,1)", - "insert into root.db.d1(time,s3) values (1,1)", - "insert into root.db.d1(time,s4) values (1,1)", - "insert into root.db.d1(time,s5) values (1,1)", - "flush"), - null); - - // Create 5 pipes with different zstd compression levels, p4 and p5 should fail. - - try (final Connection connection = senderEnv.getConnection(); - final Statement statement = connection.createStatement()) { - statement.execute( - String.format( - "create pipe p1" - + " with extractor ('extractor.pattern'='root.db.d1.s1')" - + " with connector (" - + "'connector.ip'='%s'," - + "'connector.port'='%s'," - + "'connector.compressor'='zstd, zstd'," - + "'connector.compressor.zstd.level'='3')", - receiverIp, receiverPort)); - } catch (SQLException e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - try (final Connection connection = senderEnv.getConnection(); - final Statement statement = connection.createStatement()) { - statement.execute( - String.format( - "create pipe p2" - + " with extractor ('extractor.pattern'='root.db.d1.s2')" - + " with connector (" - + "'connector.ip'='%s'," - + "'connector.port'='%s'," - + "'connector.compressor'='zstd, zstd'," - + "'connector.compressor.zstd.level'='22')", - receiverIp, receiverPort)); - } catch (SQLException e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - try (final Connection connection = senderEnv.getConnection(); - final Statement statement = connection.createStatement()) { - statement.execute( - String.format( - "create pipe p3" - + " with extractor ('extractor.pattern'='root.db.d1.s3')" - + " with connector (" - + "'connector.ip'='%s'," - + "'connector.port'='%s'," - + "'connector.compressor'='zstd, zstd'," - + "'connector.compressor.zstd.level'='-131072')", - receiverIp, receiverPort)); - } catch (SQLException e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - try (final Connection connection = senderEnv.getConnection(); - final Statement statement = connection.createStatement()) { - statement.execute( - String.format( - "create pipe p4" - + " with extractor ('extractor.pattern'='root.db.d1.s4')" - + " with connector (" - + "'connector.ip'='%s'," - + "'connector.port'='%s'," - + "'connector.compressor'='zstd, zstd'," - + "'connector.compressor.zstd.level'='-131073')", - receiverIp, receiverPort)); - fail(); - } catch (SQLException e) { - // Make sure the error message in IoTDBConnector.java is returned - Assert.assertTrue(e.getMessage().contains("Zstd compression level should be in the range")); - } - - try (final Connection connection = senderEnv.getConnection(); - final Statement statement = connection.createStatement()) { - statement.execute( - String.format( - "create pipe p5" - + " with extractor ('extractor.pattern'='root.db.d1.s5')" - + " with connector (" - + "'connector.ip'='%s'," - + "'connector.port'='%s'," - + "'connector.compressor'='zstd, zstd'," - + "'connector.compressor.zstd.level'='23')", - receiverIp, receiverPort)); - fail(); - } catch (SQLException e) { - // Make sure the error message in IoTDBConnector.java is returned - Assert.assertTrue(e.getMessage().contains("Zstd compression level should be in the range")); - } + // Create legal zstd level pipes one by one, so the assertion identifies the exact level + // that fails and avoids concurrent historical TsFile splitting for this level test. + createZstdPipeAndAssertData( + "p1", "root.db.d1.s1", "3", receiverIp, receiverPort, "s1", handleFailure); + createZstdPipeAndAssertData( + "p2", "root.db.d1.s2", "22", receiverIp, receiverPort, "s2", handleFailure); + createZstdPipeAndAssertData( + "p3", "root.db.d1.s3", "-131072", receiverIp, receiverPort, "s3", handleFailure); + + assertCreateZstdPipeFailed("p4", "root.db.d1.s4", "-131073", receiverIp, receiverPort); + assertCreateZstdPipeFailed("p5", "root.db.d1.s5", "23", receiverIp, receiverPort); final List showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; Assert.assertEquals(3, showPipeResult.size()); + } + } - TestUtils.assertDataEventuallyOnEnv( - receiverEnv, - "count timeseries", - "count(timeseries),", - Collections.singleton("3,"), - handleFailure); + private void createZstdPipeAndAssertData( + final String pipeName, + final String extractorPattern, + final String zstdLevel, + final String receiverIp, + final int receiverPort, + final String measurement, + final Consumer handleFailure) { + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + String.format("insert into root.db.d1(time,%s) values (1,1)", measurement), "flush"), + null); + + try { + createZstdPipe(pipeName, extractorPattern, zstdLevel, receiverIp, receiverPort); + } catch (final SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + String.format("select count(%s) from root.db.d1", measurement), + String.format("count(root.db.d1.%s),", measurement), + Collections.singleton("1,"), + handleFailure); + } + + private void assertCreateZstdPipeFailed( + final String pipeName, + final String extractorPattern, + final String zstdLevel, + final String receiverIp, + final int receiverPort) { + try { + createZstdPipe(pipeName, extractorPattern, zstdLevel, receiverIp, receiverPort); + fail(); + } catch (final SQLException e) { + Assert.assertTrue(e.getMessage().contains("Zstd compression level should be in the range")); + } + } + + private void createZstdPipe( + final String pipeName, + final String extractorPattern, + final String zstdLevel, + final String receiverIp, + final int receiverPort) + throws SQLException { + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe %s" + + " with extractor ('extractor.pattern'='%s')" + + " with connector (" + + "'connector.ip'='%s'," + + "'connector.port'='%s'," + + "'connector.compressor'='zstd, zstd'," + + "'connector.compressor.zstd.level'='%s')", + pipeName, extractorPattern, receiverIp, receiverPort, zstdLevel)); } } }