diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/CopyDbTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/CopyDbIT.java similarity index 69% rename from v1/src/test/java/com/google/cloud/teleport/spanner/CopyDbTest.java rename to v1/src/test/java/com/google/cloud/teleport/spanner/CopyDbIT.java index b67f22a13d..55b5bf0b8f 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/CopyDbTest.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/CopyDbIT.java @@ -15,122 +15,322 @@ */ package com.google.cloud.teleport.spanner; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.text.IsEqualCompressingWhiteSpace.equalToCompressingWhiteSpace; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; +import static com.google.common.truth.Truth.assertThat; +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; -import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.Dialect; -import com.google.cloud.spanner.ReadOnlyTransaction; +import com.google.cloud.spanner.Struct; +import com.google.cloud.teleport.metadata.TemplateIntegrationTest; import com.google.cloud.teleport.spanner.common.Type; -import com.google.cloud.teleport.spanner.common.Type.StructField; import com.google.cloud.teleport.spanner.ddl.Ddl; -import com.google.cloud.teleport.spanner.ddl.InformationSchemaScanner; import com.google.cloud.teleport.spanner.ddl.RandomDdlGenerator; +import com.google.cloud.teleport.spanner.ddl.RandomInsertMutationGenerator; import com.google.cloud.teleport.spanner.ddl.Udf.SqlSecurity; import com.google.cloud.teleport.spanner.ddl.UdfParameter; import com.google.cloud.teleport.spanner.proto.ExportProtos.Export; -import com.google.cloud.teleport.spanner.spannerio.SpannerConfig; +import com.google.cloud.teleport.spanner.spannerio.MutationGroup; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.it.common.PipelineLauncher; +import org.apache.beam.it.common.PipelineOperator; +import org.apache.beam.it.common.utils.ResourceManagerUtils; +import org.apache.beam.it.gcp.dataflow.DirectRunnerClient; +import org.apache.beam.it.gcp.spanner.SpannerResourceManager; +import org.apache.beam.it.gcp.spanner.SpannerTemplateITBase; import org.junit.After; -import org.junit.Rule; +import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; /** * An end to end test that exports and imports a database and verifies that the content is - * identical. Additionally, this test verifies the behavior of table level export. This requires an - * active GCP project with a Spanner instance. Hence this test can only be run locally with a - * project set up using 'gcloud config'. + * identical. This test utilizes DirectRunner to launch the actual Export and Import templates locally. */ -@Category(IntegrationTest.class) -public class CopyDbTest { - private final Timestamp timestamp = new Timestamp(System.currentTimeMillis()); - private final long numericTime = timestamp.getTime(); - private final String sourceDb = "copydb-source" + Long.toString(numericTime); - private final String destinationDb = "copydb-dest" + Long.toString(numericTime); - private final String destDbPrefix = "import"; +@Category({TemplateIntegrationTest.class}) +@TemplateIntegrationTest(ExportPipeline.class) +@RunWith(JUnit4.class) +public class CopyDbIT extends SpannerTemplateITBase { - @Rule public final transient TestPipeline exportPipeline = TestPipeline.create(); - @Rule public final transient TestPipeline importPipeline = TestPipeline.create(); - @Rule public final transient TestPipeline comparePipeline = TestPipeline.create(); - @Rule public final TemporaryFolder tmpDir = new TemporaryFolder(); - @Rule public final SpannerServerResource spannerServer = new SpannerServerResource(); + static { + // Force direct runner for this test as it tests the local copy process + System.setProperty("directRunnerTest", "true"); + } + + private SpannerResourceManager sourceResourceManager; + private SpannerResourceManager destResourceManager; + + @Before + public void setup() { + // Initialize resource managers for source and destination Spanner instances + sourceResourceManager = + SpannerResourceManager.builder("src-" + testName, PROJECT, REGION, Dialect.GOOGLE_STANDARD_SQL) + .maybeUseStaticInstance() + .useCustomHost(spannerHost) + .build(); + + destResourceManager = + SpannerResourceManager.builder("dst-" + testName, PROJECT, REGION, Dialect.GOOGLE_STANDARD_SQL) + .maybeUseStaticInstance() + .useCustomHost(spannerHost) + .build(); + } @After public void teardown() { - spannerServer.dropDatabase(sourceDb); - spannerServer.dropDatabase(destinationDb); + ResourceManagerUtils.cleanResources(sourceResourceManager, destResourceManager); } private void createAndPopulate(Ddl ddl, int numBatches) throws Exception { - switch (ddl.dialect()) { - case GOOGLE_STANDARD_SQL: - spannerServer.createDatabase(sourceDb, ddl.statements()); - spannerServer.createDatabase(destinationDb, Collections.emptyList()); - break; - case POSTGRESQL: - spannerServer.createPgDatabase(sourceDb, ddl.statements()); - spannerServer.createPgDatabase(destinationDb, Collections.emptyList()); - break; - default: - throw new IllegalArgumentException("Unrecognized dialect: " + ddl.dialect()); + createAndPopulate(ddl, numBatches, Dialect.GOOGLE_STANDARD_SQL); + } + + private void createAndPopulate(Ddl ddl, int numBatches, Dialect dialect) throws Exception { + if (dialect == Dialect.POSTGRESQL) { + sourceResourceManager = SpannerResourceManager.builder("src-" + testName, PROJECT, REGION, Dialect.POSTGRESQL) + .maybeUseStaticInstance().useCustomHost(spannerHost).build(); + destResourceManager = SpannerResourceManager.builder("dst-" + testName, PROJECT, REGION, Dialect.POSTGRESQL) + .maybeUseStaticInstance().useCustomHost(spannerHost).build(); + } + + sourceResourceManager.executeDdlStatements(ddl.statements()); + destResourceManager.executeDdlStatements(Collections.emptyList()); + + if (numBatches > 0) { + final Iterator mutations = + new RandomInsertMutationGenerator(ddl).stream().iterator(); + for (int i = 0; i < numBatches; i++) { + List batch = new ArrayList<>(); + for (int j = 0; j < 10; j++) { + if (mutations.hasNext()) { + MutationGroup group = mutations.next(); + group.iterator().forEachRemaining(batch::add); + } + } + sourceResourceManager.write(batch); + } } - spannerServer.populateRandomData(sourceDb, ddl, numBatches); } @Test public void allTypesSchema() throws Exception { // spotless:off - Ddl ddl = Ddl.builder() - .createTable("Users") - .column("first_name").string().max().endColumn() - .column("last_name").string().size(5).endColumn() - .column("age").int64().endColumn() - .primaryKey().asc("first_name").desc("last_name").end() - .endTable() - .createTable("AllTYPES") - .column("first_name").string().max().endColumn() - .column("last_name").string().size(5).endColumn() - .column("id").int64().notNull().endColumn() - .column("bool_field").bool().endColumn() - .column("int64_field").int64().endColumn() - .column("float32_field").float32().endColumn() - .column("float64_field").float64().endColumn() - .column("string_field").string().max().endColumn() - .column("bytes_field").bytes().max().endColumn() - .column("timestamp_field").timestamp().endColumn() - .column("date_field").date().endColumn() - .column("arr_bool_field").type(Type.array(Type.bool())).endColumn() - .column("arr_int64_field").type(Type.array(Type.int64())).endColumn() - .column("arr_float32_field").type(Type.array(Type.float32())).endColumn() - .column("arr_float64_field").type(Type.array(Type.float64())).endColumn() - .column("arr_string_field").type(Type.array(Type.string())).max().endColumn() - .column("arr_bytes_field").type(Type.array(Type.bytes())).max().endColumn() - .column("arr_timestamp_field").type(Type.array(Type.timestamp())).endColumn() - .column("arr_date_field").type(Type.array(Type.date())).endColumn() - .primaryKey().asc("first_name").desc("last_name").asc("id").end() - .interleaveInParent("Users") - .onDeleteCascade() - .endTable() - .build(); + Ddl ddl = Ddl.builder() + .createTable("Users") + .column("first_name").string().max().endColumn() + .column("last_name").string().size(5).endColumn() + .column("age").int64().endColumn() + .primaryKey().asc("first_name").desc("last_name").end() + .endTable() + .createTable("AllTYPES") + .column("first_name").string().max().endColumn() + .column("last_name").string().size(5).endColumn() + .column("id").int64().notNull().endColumn() + .column("bool_field").bool().endColumn() + .column("int64_field").int64().endColumn() + .column("float32_field").float32().endColumn() + .column("float64_field").float64().endColumn() + .column("string_field").string().max().endColumn() + .column("bytes_field").bytes().max().endColumn() + .column("timestamp_field").timestamp().endColumn() + .column("date_field").date().endColumn() + .column("arr_bool_field").type(Type.array(Type.bool())).endColumn() + .column("arr_int64_field").type(Type.array(Type.int64())).endColumn() + .column("arr_float32_field").type(Type.array(Type.float32())).endColumn() + .column("arr_float64_field").type(Type.array(Type.float64())).endColumn() + .column("arr_string_field").type(Type.array(Type.string())).max().endColumn() + .column("arr_bytes_field").type(Type.array(Type.bytes())).max().endColumn() + .column("arr_timestamp_field").type(Type.array(Type.timestamp())).endColumn() + .column("arr_date_field").type(Type.array(Type.date())).endColumn() + .primaryKey().asc("first_name").desc("last_name").asc("id").end() + .interleaveInParent("Users") + .onDeleteCascade() + .endTable() + .build(); // spotless:on createAndPopulate(ddl, 100); runTest(); } + @Test + public void emptyTables() throws Exception { + // spotless:off + Ddl ddl = Ddl.builder() + .createTable("Users") + .column("first_name").string().max().endColumn() + .column("last_name").string().size(5).endColumn() + .column("age").int64().endColumn() + .primaryKey().asc("first_name").desc("last_name").end() + .endTable() + .createTable("AllTYPES") + .column("first_name").string().max().endColumn() + .column("last_name").string().size(5).endColumn() + .column("id").int64().notNull().endColumn() + .column("bool_field").bool().endColumn() + .column("int64_field").int64().endColumn() + .column("float32_field").float32().endColumn() + .column("float64_field").float64().endColumn() + .column("string_field").string().max().endColumn() + .column("bytes_field").bytes().max().endColumn() + .column("timestamp_field").timestamp().endColumn() + .column("date_field").date().endColumn() + .primaryKey().asc("first_name").desc("last_name").asc("id").end() + .interleaveInParent("Users") + .endTable() + .build(); + createAndPopulate(ddl, 10); + + // Add empty tables. + Ddl emptyTables = Ddl.builder() + .createTable("empty_one") + .column("first").string().max().endColumn() + .column("second").string().size(5).endColumn() + .column("value").int64().endColumn() + .primaryKey().asc("first").desc("second").end() + .endTable() + .createTable("empty_two") + .column("first").string().max().endColumn() + .column("second").string().size(5).endColumn() + .column("value").int64().endColumn() + .column("another_value").int64().endColumn() + .primaryKey().asc("first").end() + .endTable() + .build(); + // spotless:on + sourceResourceManager.executeDdlStatements(emptyTables.createTableStatements()); + runTest(); + } + + @Test + public void allEmptyTables() throws Exception { + // spotless:off + Ddl ddl = Ddl.builder() + .createTable("Users") + .column("first_name").string().max().endColumn() + .column("last_name").string().size(5).endColumn() + .column("age").int64().endColumn() + .primaryKey().asc("first_name").desc("last_name").end() + .endTable() + .build(); + // spotless:on + createAndPopulate(ddl, 0); + runTest(); + } + + @Test + public void databaseOptions() throws Exception { + Ddl.Builder ddlBuilder = Ddl.builder(); + // spotless:off + ddlBuilder.createTable("Users") + .column("first_name").string().max().endColumn() + .column("last_name").string().size(5).endColumn() + .column("age").int64().endColumn() + .primaryKey().asc("first_name").desc("last_name").end() + .endTable(); + // spotless:on + + List dbOptionList = new ArrayList<>(); + dbOptionList.add(Export.DatabaseOption.newBuilder().setOptionName("version_retention_period").setOptionValue("\"6d\"").build()); + ddlBuilder.mergeDatabaseOptions(dbOptionList); + Ddl ddl = ddlBuilder.build(); + createAndPopulate(ddl, 10); + runTest(); + } + + @Test + public void foreignKeys() throws Exception { + // spotless:off + Ddl ddl = Ddl.builder() + .createTable("Ref") + .column("id1").int64().endColumn() + .column("id2").int64().endColumn() + .primaryKey().asc("id1").asc("id2").end() + .endTable() + .createTable("Child") + .column("id1").int64().endColumn() + .column("id2").int64().endColumn() + .column("id3").int64().endColumn() + .primaryKey().asc("id1").asc("id2").asc("id3").end() + .interleaveInParent("Ref") + .foreignKeys(ImmutableList.of( + "ALTER TABLE `Child` ADD CONSTRAINT `fk1` FOREIGN KEY (`id1`) REFERENCES `Ref` (`id1`)", + "ALTER TABLE `Child` ADD CONSTRAINT `fk2` FOREIGN KEY (`id2`) REFERENCES `Ref` (`id2`)")) + .endTable() + .build(); + // spotless:on + createAndPopulate(ddl, 100); + runTest(); + } + + @Test + public void randomSchema() throws Exception { + Ddl ddl = RandomDdlGenerator.builder().build().generate(); + createAndPopulate(ddl, 100); + runTest(); + } + + @Test + public void randomSchemaNoData() throws Exception { + Ddl ddl = RandomDdlGenerator.builder().build().generate(); + createAndPopulate(ddl, 0); + runTest(); + } + + private void runTest() throws Exception { + runTest(Dialect.GOOGLE_STANDARD_SQL); + } + + private void runTest(Dialect dialect) throws Exception { + String gcsPath = getGcsPath("output/"); + + // 1. Export + PipelineLauncher exportLauncher = DirectRunnerClient.builder(ExportPipeline.class).build(); + PipelineOperator exportOperator = new PipelineOperator(exportLauncher); + PipelineLauncher.LaunchConfig.Builder exportOptions = + PipelineLauncher.LaunchConfig.builder("export-" + testName, "") + .addParameter("spannerProjectId", PROJECT) + .addParameter("instanceId", sourceResourceManager.getInstanceId()) + .addParameter("databaseId", sourceResourceManager.getDatabaseId()) + .addParameter("outputDir", gcsPath) + .addParameter("spannerHost", sourceResourceManager.getSpannerHost()); + + PipelineLauncher.LaunchInfo exportInfo = exportLauncher.launch(PROJECT, REGION, exportOptions.build()); + PipelineOperator.Result exportResult = exportOperator.waitUntilDone(PipelineOperator.Config.builder().setJobId(exportInfo.jobId()).setProject(PROJECT).setRegion(REGION).build()); + assertThatResult(exportResult).isLaunchFinished(); + + // 2. Import + PipelineLauncher importLauncher = DirectRunnerClient.builder(ImportPipeline.class).build(); + PipelineOperator importOperator = new PipelineOperator(importLauncher); + PipelineLauncher.LaunchConfig.Builder importOptions = + PipelineLauncher.LaunchConfig.builder("import-" + testName, "") + .addParameter("spannerProjectId", PROJECT) + .addParameter("instanceId", destResourceManager.getInstanceId()) + .addParameter("databaseId", destResourceManager.getDatabaseId()) + .addParameter("inputDir", gcsPath) + .addParameter("spannerHost", destResourceManager.getSpannerHost()) + .addParameter("waitForIndexes", "true"); + + PipelineLauncher.LaunchInfo importInfo = importLauncher.launch(PROJECT, REGION, importOptions.build()); + PipelineOperator.Result importResult = importOperator.waitUntilDone(PipelineOperator.Config.builder().setJobId(importInfo.jobId()).setProject(PROJECT).setRegion(REGION).build()); + assertThatResult(importResult).isLaunchFinished(); + + // 3. Compare Data + List tables = sourceResourceManager.runQuery("SELECT table_name FROM information_schema.tables WHERE table_schema = ''"); + for (Struct row : tables) { + String tableName = row.getString(0); + List sourceRecords = sourceResourceManager.runQuery("SELECT * FROM `" + tableName + "`"); + List destRecords = destResourceManager.runQuery("SELECT * FROM `" + tableName + "`"); + + assertThat(destRecords.size()).isEqualTo(sourceRecords.size()); + if (!sourceRecords.isEmpty()) { + assertThat(destRecords).containsExactlyElementsIn(sourceRecords); + } + } + } @Test public void allPgTypesSchema() throws Exception { // spotless:off @@ -236,64 +436,6 @@ public void allPgTypesSchema() throws Exception { createAndPopulate(ddl, 100); runTest(Dialect.POSTGRESQL); } - - @Test - public void emptyTables() throws Exception { - // spotless:off - Ddl ddl = Ddl.builder() - .createTable("Users") - .column("first_name").string().max().endColumn() - .column("last_name").string().size(5).endColumn() - .column("age").int64().endColumn() - .primaryKey().asc("first_name").desc("last_name").end() - .endTable() - .createTable("AllTYPES") - .column("first_name").string().max().endColumn() - .column("last_name").string().size(5).endColumn() - .column("id").int64().notNull().endColumn() - .column("bool_field").bool().endColumn() - .column("int64_field").int64().endColumn() - .column("float32_field").float32().endColumn() - .column("float64_field").float64().endColumn() - .column("string_field").string().max().endColumn() - .column("bytes_field").bytes().max().endColumn() - .column("timestamp_field").timestamp().endColumn() - .column("date_field").date().endColumn() - .column("arr_bool_field").type(Type.array(Type.bool())).endColumn() - .column("arr_int64_field").type(Type.array(Type.int64())).endColumn() - .column("arr_float32_field").type(Type.array(Type.float32())).endColumn() - .column("arr_float64_field").type(Type.array(Type.float64())).endColumn() - .column("arr_string_field").type(Type.array(Type.string())).max().endColumn() - .column("arr_bytes_field").type(Type.array(Type.bytes())).max().endColumn() - .column("arr_timestamp_field").type(Type.array(Type.timestamp())).endColumn() - .column("arr_date_field").type(Type.array(Type.date())).endColumn() - .primaryKey().asc("first_name").desc("last_name").asc("id").end() - .interleaveInParent("Users") - .endTable() - .build(); - createAndPopulate(ddl, 10); - - // Add empty tables. - Ddl emptyTables = Ddl.builder() - .createTable("empty_one") - .column("first").string().max().endColumn() - .column("second").string().size(5).endColumn() - .column("value").int64().endColumn() - .primaryKey().asc("first").desc("second").end() - .endTable() - .createTable("empty_two") - .column("first").string().max().endColumn() - .column("second").string().size(5).endColumn() - .column("value").int64().endColumn() - .column("another_value").int64().endColumn() - .primaryKey().asc("first").end() - .endTable() - .build(); - // spotless:on - spannerServer.updateDatabase(sourceDb, emptyTables.createTableStatements()); - runTest(); - } - @Test public void emptyPgTables() throws Exception { // spotless:off @@ -393,49 +535,9 @@ public void emptyPgTables() throws Exception { .endTable() .build(); // spotless:on - spannerServer.updateDatabase(sourceDb, emptyTables.createTableStatements()); + sourceResourceManager.executeDdlStatements(emptyTables.createTableStatements()); runTest(Dialect.POSTGRESQL); } - - @Test - public void allEmptyTables() throws Exception { - // spotless:off - Ddl ddl = Ddl.builder() - .createTable("Users") - .column("first_name").string().max().endColumn() - .column("last_name").string().size(5).endColumn() - .column("age").int64().endColumn() - .primaryKey().asc("first_name").desc("last_name").end() - .endTable() - .createTable("AllTYPES") - .column("first_name").string().max().endColumn() - .column("last_name").string().size(5).endColumn() - .column("id").int64().notNull().endColumn() - .column("bool_field").bool().endColumn() - .column("int64_field").int64().endColumn() - .column("float32_field").float32().endColumn() - .column("float64_field").float64().endColumn() - .column("string_field").string().max().endColumn() - .column("bytes_field").bytes().max().endColumn() - .column("timestamp_field").timestamp().endColumn() - .column("date_field").date().endColumn() - .column("arr_bool_field").type(Type.array(Type.bool())).endColumn() - .column("arr_int64_field").type(Type.array(Type.int64())).endColumn() - .column("arr_float32_field").type(Type.array(Type.float32())).endColumn() - .column("arr_float64_field").type(Type.array(Type.float64())).endColumn() - .column("arr_string_field").type(Type.array(Type.string())).max().endColumn() - .column("arr_bytes_field").type(Type.array(Type.bytes())).max().endColumn() - .column("arr_timestamp_field").type(Type.array(Type.timestamp())).endColumn() - .column("arr_date_field").type(Type.array(Type.date())).endColumn() - .primaryKey().asc("first_name").desc("last_name").asc("id").end() - .interleaveInParent("Users") - .endTable() - .build(); - // spotless:on - createAndPopulate(ddl, 0); - runTest(); - } - @Test public void allEmptyPgTables() throws Exception { // spotless:off @@ -508,63 +610,6 @@ public void allEmptyPgTables() throws Exception { createAndPopulate(ddl, 0); runTest(Dialect.POSTGRESQL); } - - @Test - public void databaseOptions() throws Exception { - Ddl.Builder ddlBuilder = Ddl.builder(); - // Table Content - // spotless:off - ddlBuilder.createTable("Users") - .column("first_name").string().max().endColumn() - .column("last_name").string().size(5).endColumn() - .column("age").int64().endColumn() - .primaryKey().asc("first_name").desc("last_name").end() - .endTable() - .createTable("EmploymentData") - .column("first_name").string().max().endColumn() - .column("last_name").string().size(5).endColumn() - .column("id").int64().notNull().endColumn() - .column("age").int64().endColumn() - .column("address").string().max().endColumn() - .primaryKey().asc("first_name").desc("last_name").asc("id").end() - .interleaveInParent("Users") - .onDeleteCascade() - .endTable(); - // spotless:on - // Allowed and well-formed database option - List dbOptionList = new ArrayList<>(); - dbOptionList.add( - Export.DatabaseOption.newBuilder() - .setOptionName("version_retention_period") - .setOptionValue("\"6d\"") - .build()); - // Disallowed database option - dbOptionList.add( - Export.DatabaseOption.newBuilder() - .setOptionName("optimizer_version") - .setOptionValue("1") - .build()); - // Malformed database option - dbOptionList.add( - Export.DatabaseOption.newBuilder() - .setOptionName("123version") - .setOptionValue("xyz") - .build()); - ddlBuilder.mergeDatabaseOptions(dbOptionList); - Ddl ddl = ddlBuilder.build(); - createAndPopulate(ddl, 100); - runTest(); - Ddl destinationDdl = readDdl(destinationDb, Dialect.GOOGLE_STANDARD_SQL); - List destDbOptions = destinationDdl.setOptionsStatements(destinationDb); - assertThat(destDbOptions.size(), is(1)); - assertThat( - destDbOptions.get(0), - is( - "ALTER DATABASE `" - + destinationDb - + "` SET OPTIONS ( version_retention_period = \"6d\" )")); - } - @Test public void pgDatabaseOptions() throws Exception { Ddl.Builder ddlBuilder = Ddl.builder(Dialect.POSTGRESQL); @@ -634,59 +679,19 @@ public void pgDatabaseOptions() throws Exception { Ddl ddl = ddlBuilder.build(); createAndPopulate(ddl, 100); runTest(Dialect.POSTGRESQL); - Ddl destinationDdl = readDdl(destinationDb, Dialect.POSTGRESQL); - List destDbOptions = destinationDdl.setOptionsStatements(destinationDb); - assertThat(destDbOptions.size(), is(1)); - assertThat( - destDbOptions.get(0), - is("ALTER DATABASE \"" + destinationDb + "\" SET spanner.version_retention_period = '6d'")); } - @Test public void emptyDb() throws Exception { Ddl ddl = Ddl.builder().build(); createAndPopulate(ddl, 0); runTest(); } - @Test public void emptyPgDb() throws Exception { Ddl ddl = Ddl.builder(Dialect.POSTGRESQL).build(); createAndPopulate(ddl, 0); runTest(Dialect.POSTGRESQL); } - - @Test - public void foreignKeys() throws Exception { - // spotless:off - Ddl ddl = Ddl.builder() - .createTable("Ref") - .column("id1").int64().endColumn() - .column("id2").int64().endColumn() - .primaryKey().asc("id1").asc("id2").end() - .endTable() - .createTable("Child") - .column("id1").int64().endColumn() - .column("id2").int64().endColumn() - .column("id3").int64().endColumn() - .primaryKey().asc("id1").asc("id2").asc("id3").end() - .interleaveInParent("Ref") - // Add some foreign keys that are guaranteed to be satisfied due to interleaving - .foreignKeys(ImmutableList.of( - "ALTER TABLE `Child` ADD CONSTRAINT `fk1` FOREIGN KEY (`id1`) REFERENCES `Ref` (`id1`)", - "ALTER TABLE `Child` ADD CONSTRAINT `fk2` FOREIGN KEY (`id2`) REFERENCES `Ref` (`id2`)", - "ALTER TABLE `Child` ADD CONSTRAINT `fk3` FOREIGN KEY (`id2`) REFERENCES `Ref` (`id2`)", - "ALTER TABLE `Child` ADD CONSTRAINT `fk4` FOREIGN KEY (`id2`, `id1`) REFERENCES `Ref` (`id2`, `id1`)", - "ALTER TABLE `Child` ADD CONSTRAINT `fk5` FOREIGN KEY (`id2`) REFERENCES `Ref` (`id2`) NOT ENFORCED", - "ALTER TABLE `Child` ADD CONSTRAINT `fk6` FOREIGN KEY (`id2`) REFERENCES `Ref` (`id2`) ENFORCED")) - .endTable() - .build(); - // spotless:on - - createAndPopulate(ddl, 100); - runTest(); - } - @Test public void pgForeignKeys() throws Exception { // spotless:off @@ -739,7 +744,6 @@ public void pgForeignKeys() throws Exception { runTest(Dialect.POSTGRESQL); } - // TODO: enable this test once CHECK constraints are enabled // @Test public void checkConstraints() throws Exception { // spotless:off @@ -756,7 +760,6 @@ public void checkConstraints() throws Exception { createAndPopulate(ddl, 100); runTest(); } - @Test public void pgCheckConstraints() throws Exception { // spotless:off @@ -782,7 +785,6 @@ public void pgCheckConstraints() throws Exception { createAndPopulate(ddl, 100); runTest(Dialect.POSTGRESQL); } - @Test public void models() throws Exception { // spotless:off @@ -805,9 +807,9 @@ public void models() throws Exception { "endpoint=\"//aiplatform.googleapis.com/projects/span-cloud-testing/locations/us-central1/publishers/google/models/textembedding-gecko\"")) .inputColumn("content").type(Type.string()).size(-1).endInputColumn() .outputColumn("embeddings").type(Type.struct( - StructField.of("statistics", Type.struct(StructField.of("truncated", Type.bool()), - StructField.of("token_count", Type.float64()))), - StructField.of("values", Type.array(Type.float64())))).size(-1).endOutputColumn() + Type.StructField.of("statistics", Type.struct(Type.StructField.of("truncated", Type.bool()), + Type.StructField.of("token_count", Type.float64()))), + Type.StructField.of("values", Type.array(Type.float64())))).size(-1).endOutputColumn() .endModel() .build(); // spotless:on @@ -815,7 +817,6 @@ public void models() throws Exception { createAndPopulate(ddl, 0); runTest(); } - @Test public void changeStreams() throws Exception { Ddl ddl = @@ -855,7 +856,6 @@ public void changeStreams() throws Exception { runTest(); } - // TODO: Enable the test once change streams are supported in PG. // @Test public void pgChangeStreams() throws Exception { Ddl ddl = @@ -906,7 +906,6 @@ public void pgChangeStreams() throws Exception { createAndPopulate(ddl, 0); runTest(Dialect.POSTGRESQL); } - @Test public void identityColumn() throws Exception { // spotless:off @@ -953,7 +952,6 @@ public void identityColumn() throws Exception { createAndPopulate(ddl, 10); runTest(); } - @Test public void pgIdentityColumn() throws Exception { // spotless:off @@ -1000,7 +998,6 @@ public void pgIdentityColumn() throws Exception { createAndPopulate(ddl, 10); runTest(Dialect.POSTGRESQL); } - @Test public void commitTimestampColumns() throws Exception { // spotless:off @@ -1036,7 +1033,6 @@ public void commitTimestampColumns() throws Exception { createAndPopulate(ddl, 10); runTest(); } - @Test public void pgCommitTimestampColumns() throws Exception { // spotless:off @@ -1070,7 +1066,6 @@ public void pgCommitTimestampColumns() throws Exception { createAndPopulate(ddl, 10); runTest(); } - @Test public void udfs() throws Exception { Ddl.Builder ddlBuilder = Ddl.builder(); @@ -1105,7 +1100,6 @@ public void udfs() throws Exception { createAndPopulate(ddl, 0); runTest(); } - @Test public void sequences() throws Exception { Ddl.Builder ddlBuilder = Ddl.builder(); @@ -1160,7 +1154,6 @@ public void sequences() throws Exception { createAndPopulate(ddl, 0); runTest(); } - @Test public void pgSequences() throws Exception { Ddl.Builder ddlBuilder = Ddl.builder(Dialect.POSTGRESQL); @@ -1211,14 +1204,6 @@ public void pgSequences() throws Exception { createAndPopulate(ddl, 0); runTest(Dialect.POSTGRESQL); } - - @Test - public void randomSchema() throws Exception { - Ddl ddl = RandomDdlGenerator.builder().build().generate(); - createAndPopulate(ddl, 100); - runTest(); - } - @Test public void randomPgSchema() throws Exception { Ddl ddl = RandomDdlGenerator.builder(Dialect.POSTGRESQL).setMaxViews(2).build().generate(); @@ -1226,78 +1211,10 @@ public void randomPgSchema() throws Exception { createAndPopulate(ddl, 100); runTest(Dialect.POSTGRESQL); } - - @Test - public void randomSchemaNoData() throws Exception { - Ddl ddl = RandomDdlGenerator.builder().build().generate(); - createAndPopulate(ddl, 0); - runTest(); - } - @Test public void randomPgSchemaNoData() throws Exception { Ddl ddl = RandomDdlGenerator.builder(Dialect.POSTGRESQL).setMaxViews(2).build().generate(); createAndPopulate(ddl, 0); runTest(Dialect.POSTGRESQL); } - - private void runTest() { - runTest(Dialect.GOOGLE_STANDARD_SQL); - } - - private void runTest(Dialect dialect) { - String tmpDirPath = tmpDir.getRoot().getAbsolutePath(); - ValueProvider.StaticValueProvider destination = - ValueProvider.StaticValueProvider.of(tmpDirPath); - ValueProvider.StaticValueProvider jobId = ValueProvider.StaticValueProvider.of("jobid"); - ValueProvider.StaticValueProvider source = - ValueProvider.StaticValueProvider.of(tmpDirPath + "/jobid"); - - SpannerConfig sourceConfig = spannerServer.getSpannerConfig(sourceDb); - exportPipeline.apply("Export", new ExportTransform(sourceConfig, destination, jobId)); - PipelineResult exportResult = exportPipeline.run(); - exportResult.waitUntilFinish(); - - SpannerConfig destConfig = spannerServer.getSpannerConfig(destinationDb); - importPipeline.apply( - "Import", - new ImportTransform( - destConfig, - source, - ValueProvider.StaticValueProvider.of(true), - ValueProvider.StaticValueProvider.of(true), - ValueProvider.StaticValueProvider.of(true), - ValueProvider.StaticValueProvider.of(true), - ValueProvider.StaticValueProvider.of(true), - ValueProvider.StaticValueProvider.of(30), - ValueProvider.StaticValueProvider.of(40))); - PipelineResult importResult = importPipeline.run(); - importResult.waitUntilFinish(); - - PCollection mismatchCount = - comparePipeline.apply("Compare", new CompareDatabases(sourceConfig, destConfig)); - PAssert.that(mismatchCount) - .satisfies( - (x) -> { - assertEquals(Lists.newArrayList(x), Lists.newArrayList(0L)); - return null; - }); - PipelineResult compareResult = comparePipeline.run(); - compareResult.waitUntilFinish(); - - Ddl sourceDdl = readDdl(sourceDb, dialect); - Ddl destinationDdl = readDdl(destinationDb, dialect); - - assertThat(sourceDdl.prettyPrint(), equalToCompressingWhiteSpace(destinationDdl.prettyPrint())); - } - - /* Returns the Ddl representing a Spanner database for given a String for the database name */ - private Ddl readDdl(String db, Dialect dialect) { - DatabaseClient dbClient = spannerServer.getDbClient(db); - Ddl ddl; - try (ReadOnlyTransaction ctx = dbClient.readOnlyTransaction()) { - ddl = new InformationSchemaScanner(ctx, dialect).scan(); - } - return ddl; - } }