Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,115 +217,85 @@ public void testZstdCompressorLevel() throws Exception {

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"insert into root.db.d1(time,s1) values (1,1)",
"insert into root.db.d1(time,s2) values (1,1)",
"insert into root.db.d1(time,s3) values (1,1)",
"insert into root.db.d1(time,s4) values (1,1)",
"insert into root.db.d1(time,s5) values (1,1)",
"flush"),
null);

// Create 5 pipes with different zstd compression levels, p4 and p5 should fail.

try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe p1"
+ " with extractor ('extractor.pattern'='root.db.d1.s1')"
+ " with connector ("
+ "'connector.ip'='%s',"
+ "'connector.port'='%s',"
+ "'connector.compressor'='zstd, zstd',"
+ "'connector.compressor.zstd.level'='3')",
receiverIp, receiverPort));
} catch (SQLException e) {
e.printStackTrace();
fail(e.getMessage());
}

try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe p2"
+ " with extractor ('extractor.pattern'='root.db.d1.s2')"
+ " with connector ("
+ "'connector.ip'='%s',"
+ "'connector.port'='%s',"
+ "'connector.compressor'='zstd, zstd',"
+ "'connector.compressor.zstd.level'='22')",
receiverIp, receiverPort));
} catch (SQLException e) {
e.printStackTrace();
fail(e.getMessage());
}

try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe p3"
+ " with extractor ('extractor.pattern'='root.db.d1.s3')"
+ " with connector ("
+ "'connector.ip'='%s',"
+ "'connector.port'='%s',"
+ "'connector.compressor'='zstd, zstd',"
+ "'connector.compressor.zstd.level'='-131072')",
receiverIp, receiverPort));
} catch (SQLException e) {
e.printStackTrace();
fail(e.getMessage());
}

try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe p4"
+ " with extractor ('extractor.pattern'='root.db.d1.s4')"
+ " with connector ("
+ "'connector.ip'='%s',"
+ "'connector.port'='%s',"
+ "'connector.compressor'='zstd, zstd',"
+ "'connector.compressor.zstd.level'='-131073')",
receiverIp, receiverPort));
fail();
} catch (SQLException e) {
// Make sure the error message in IoTDBConnector.java is returned
Assert.assertTrue(e.getMessage().contains("Zstd compression level should be in the range"));
}

try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe p5"
+ " with extractor ('extractor.pattern'='root.db.d1.s5')"
+ " with connector ("
+ "'connector.ip'='%s',"
+ "'connector.port'='%s',"
+ "'connector.compressor'='zstd, zstd',"
+ "'connector.compressor.zstd.level'='23')",
receiverIp, receiverPort));
fail();
} catch (SQLException e) {
// Make sure the error message in IoTDBConnector.java is returned
Assert.assertTrue(e.getMessage().contains("Zstd compression level should be in the range"));
}
// Create legal zstd level pipes one by one, so the assertion identifies the exact level
// that fails and avoids concurrent historical TsFile splitting for this level test.
createZstdPipeAndAssertData(
"p1", "root.db.d1.s1", "3", receiverIp, receiverPort, "s1", handleFailure);
createZstdPipeAndAssertData(
"p2", "root.db.d1.s2", "22", receiverIp, receiverPort, "s2", handleFailure);
createZstdPipeAndAssertData(
"p3", "root.db.d1.s3", "-131072", receiverIp, receiverPort, "s3", handleFailure);

assertCreateZstdPipeFailed("p4", "root.db.d1.s4", "-131073", receiverIp, receiverPort);
assertCreateZstdPipeFailed("p5", "root.db.d1.s5", "23", receiverIp, receiverPort);

final List<TShowPipeInfo> showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
Assert.assertEquals(3, showPipeResult.size());
}
}

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"count timeseries",
"count(timeseries),",
Collections.singleton("3,"),
handleFailure);
private void createZstdPipeAndAssertData(
final String pipeName,
final String extractorPattern,
final String zstdLevel,
final String receiverIp,
final int receiverPort,
final String measurement,
final Consumer<String> handleFailure) {
TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
String.format("insert into root.db.d1(time,%s) values (1,1)", measurement), "flush"),
null);

try {
createZstdPipe(pipeName, extractorPattern, zstdLevel, receiverIp, receiverPort);
} catch (final SQLException e) {
e.printStackTrace();
fail(e.getMessage());
}

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
String.format("select count(%s) from root.db.d1", measurement),
String.format("count(root.db.d1.%s),", measurement),
Collections.singleton("1,"),
handleFailure);
}

private void assertCreateZstdPipeFailed(
final String pipeName,
final String extractorPattern,
final String zstdLevel,
final String receiverIp,
final int receiverPort) {
try {
createZstdPipe(pipeName, extractorPattern, zstdLevel, receiverIp, receiverPort);
fail();
} catch (final SQLException e) {
Assert.assertTrue(e.getMessage().contains("Zstd compression level should be in the range"));
}
}

private void createZstdPipe(
final String pipeName,
final String extractorPattern,
final String zstdLevel,
final String receiverIp,
final int receiverPort)
throws SQLException {
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe %s"
+ " with extractor ('extractor.pattern'='%s')"
+ " with connector ("
+ "'connector.ip'='%s',"
+ "'connector.port'='%s',"
+ "'connector.compressor'='zstd, zstd',"
+ "'connector.compressor.zstd.level'='%s')",
pipeName, extractorPattern, receiverIp, receiverPort, zstdLevel));
}
}
}
Loading