Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ protected FailsafeElement<String, String> mutationToDlqElement(Mutation m) {
case BYTES:
val = Hex.encodeHexString(value.getBytes().toByteArray());
break;
case UUID:
val = value.getUuid().toString();
break;
case INT64:
val = value.getInt64();
break;
Expand All @@ -301,6 +304,11 @@ protected FailsafeElement<String, String> mutationToDlqElement(Mutation m) {
value.getBytesArray().stream()
.map(v -> v == null ? null : Hex.encodeHexString(v.toByteArray()))
.collect(Collectors.toList());
} else if (value.getType().getArrayElementType().getCode() == Type.Code.UUID) {
val =
value.getUuidArray().stream()
.map(v -> v == null ? null : v.toString())
.collect(Collectors.toList());
} else {
val = value.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,102 @@ public void testMutationToDlqElement_WithExplicitShardIdInMutationMap() {
.contains("\"_metadata_shard_id_column_name\":\"migration_shard_id\"");
}

@Test
public void testMutationToDlqElement_UuidType() {
Ddl ddlWithUuid =
Ddl.builder()
.createTable("uuid_table")
.column("id")
.type(com.google.cloud.teleport.v2.spanner.type.Type.uuid())
.endColumn()
.column("uuid_array")
.type(
com.google.cloud.teleport.v2.spanner.type.Type.array(
com.google.cloud.teleport.v2.spanner.type.Type.uuid()))
.endColumn()
.primaryKey()
.asc("id")
.end()
.endTable()
.build();

DeadLetterQueue dlq =
DeadLetterQueue.create(
"LOG", ddlWithUuid, SQLDialect.MYSQL, getIdentityMapper(ddlWithUuid));

java.util.UUID idUuid = java.util.UUID.randomUUID();
java.util.UUID elementUuid1 = java.util.UUID.randomUUID();
java.util.UUID elementUuid2 = java.util.UUID.randomUUID();

Mutation m =
Mutation.newInsertBuilder("uuid_table")
.set("id")
.to(Value.uuid(idUuid))
.set("uuid_array")
.to(Value.uuidArray(java.util.Arrays.asList(elementUuid1, elementUuid2)))
.build();

FailsafeElement<String, String> dlqElement = dlq.mutationToDlqElement(m);
String payload = dlqElement.getPayload();

assertThat(payload).contains("\"id\":\"" + idUuid.toString() + "\"");
assertThat(payload)
.contains(
"\"uuid_array\":[\""
+ elementUuid1.toString()
+ "\",\""
+ elementUuid2.toString()
+ "\"]");
}

@Test
public void testMutationToDlqElement_PgUuidType() {
Ddl ddlWithPgUuid =
Ddl.builder(com.google.cloud.spanner.Dialect.POSTGRESQL)
.createTable("new_cart")
.column("new_user_id")
.type(com.google.cloud.teleport.v2.spanner.type.Type.pgUuid())
.endColumn()
.column("uuid_array")
.type(
com.google.cloud.teleport.v2.spanner.type.Type.pgArray(
com.google.cloud.teleport.v2.spanner.type.Type.pgUuid()))
.endColumn()
.primaryKey()
.asc("new_user_id")
.end()
.endTable()
.build();

DeadLetterQueue dlq =
DeadLetterQueue.create(
"LOG", ddlWithPgUuid, SQLDialect.MYSQL, getIdentityMapper(ddlWithPgUuid));

java.util.UUID idUuid = java.util.UUID.randomUUID();
java.util.UUID elementUuid1 = java.util.UUID.randomUUID();
java.util.UUID elementUuid2 = java.util.UUID.randomUUID();

Mutation m =
Mutation.newInsertBuilder("new_cart")
.set("new_user_id")
.to(Value.uuid(idUuid))
.set("uuid_array")
.to(Value.uuidArray(java.util.Arrays.asList(elementUuid1, elementUuid2)))
.build();

FailsafeElement<String, String> dlqElement = dlq.mutationToDlqElement(m);
String payload = dlqElement.getPayload();

assertThat(payload).contains("\"new_user_id\":\"" + idUuid.toString() + "\"");
assertThat(payload)
.contains(
"\"uuid_array\":[\""
+ elementUuid1.toString()
+ "\",\""
+ elementUuid2.toString()
+ "\"]");
}

private static ISchemaMapper getIdentityMapper(Ddl spannerDdl) {
return new IdentityMapper(spannerDdl);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ private static String typeString(Type type, Integer size) {
return Type.Code.JSON.getName();
case PG_JSONB:
return Type.Code.PG_JSONB.getName();
case UUID:
return Type.Code.UUID.getName();
case PG_UUID:
return Type.Code.PG_UUID.getName();
case TOKENLIST:
return Type.Code.TOKENLIST.getName();
case ARRAY:
Expand Down Expand Up @@ -258,6 +262,10 @@ public Builder date() {
return type(Type.date());
}

public Builder uuid() {
return type(Type.uuid());
}

public Builder numeric() {
return type(Type.numeric());
}
Expand Down Expand Up @@ -298,6 +306,10 @@ public Builder pgDate() {
return type(Type.pgDate());
}

public Builder pgUuid() {
return type(Type.pgUuid());
}

public Builder pgNumeric() {
return type(Type.pgNumeric());
}
Expand Down Expand Up @@ -374,6 +386,9 @@ private static SizedType parseSpannerType(String spannerType, Dialect dialect) {
if (spannerType.equals(Type.Code.DATE.getName())) {
return t(Type.date(), null);
}
if (spannerType.equals(Type.Code.UUID.getName())) {
return t(Type.uuid(), null);
}
if (spannerType.equals(Type.Code.NUMERIC.getName())) {
return t(Type.numeric(), null);
}
Expand Down Expand Up @@ -438,6 +453,9 @@ private static SizedType parseSpannerType(String spannerType, Dialect dialect) {
if (spannerType.equals(Type.Code.PG_DATE.getName())) {
return t(Type.pgDate(), null);
}
if (spannerType.equals(Type.Code.PG_UUID.getName())) {
return t(Type.pgUuid(), null);
}
if (spannerType.equals(Type.Code.PG_COMMIT_TIMESTAMP.getName())) {
return t(Type.pgCommitTimestamp(), null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,16 @@ static Map<Type, AvroToValueFunction> getGsqlMap() {
avroArrayFieldToSpannerArray(
recordValue, fieldSchema, AvroToValueMapper::avroFieldToDate)));

gsqlFunctions.put(
Type.uuid(),
(recordValue, fieldSchema) -> Value.uuid(avroFieldToUuid(recordValue, fieldSchema)));
gsqlFunctions.put(
Type.array(Type.uuid()),
(recordValue, fieldSchema) ->
Value.uuidArray(
avroArrayFieldToSpannerArray(
recordValue, fieldSchema, AvroToValueMapper::avroFieldToUuid)));

return gsqlFunctions;
}

Expand Down Expand Up @@ -242,13 +252,63 @@ static Map<Type, AvroToValueFunction> getPgMap() {
pgFunctions.put(
Type.pgDate(),
(recordValue, fieldSchema) -> Value.date(avroFieldToDate(recordValue, fieldSchema)));
pgFunctions.put(
Type.pgUuid(),
(recordValue, fieldSchema) -> Value.uuid(avroFieldToUuid(recordValue, fieldSchema)));
pgFunctions.put(
Type.pgArray(Type.pgUuid()),
(recordValue, fieldSchema) ->
Value.uuidArray(
avroArrayFieldToSpannerArray(
recordValue, fieldSchema, AvroToValueMapper::avroFieldToUuid)));

return pgFunctions;
}

/**
* This method tries to map different kinds of source types to a boolean. This could be longs,
* string as well as booleans.
*/
static java.util.UUID avroFieldToUuid(Object recordValue, Schema fieldSchema) {
try {
if (recordValue == null) {
return null;
}
if (recordValue instanceof java.util.UUID) {
return (java.util.UUID) recordValue;
}
if (recordValue instanceof ByteBuffer) {
byte[] bytes = ((ByteBuffer) recordValue).array();
if (bytes.length == 16) {
ByteBuffer bb = ByteBuffer.wrap(bytes);
return new java.util.UUID(bb.getLong(), bb.getLong());
} else {
throw new IllegalArgumentException(
"ByteBuffer array length must be exactly 16, but was: " + bytes.length);
}
}
if (recordValue instanceof byte[]) {
byte[] bytes = (byte[]) recordValue;
if (bytes.length == 16) {
ByteBuffer bb = ByteBuffer.wrap(bytes);
return new java.util.UUID(bb.getLong(), bb.getLong());
} else {
throw new IllegalArgumentException(
"Byte array length must be exactly 16, but was: " + bytes.length);
}
}
return java.util.UUID.fromString(recordValue.toString());
} catch (Exception e) {
throw new AvroTypeConvertorException(
"Unable to convert "
+ fieldSchema.getType()
+ " to UUID, with value: "
+ recordValue
+ ", Exception: "
+ e.getMessage());
}
}

static Boolean avroFieldToBoolean(Object recordValue, Schema fieldSchema) {
if (recordValue == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public Map<String, Value> transformChangeEvent(GenericRecord record, String srcT
}
// If current column is migration shard id, populate value.
if (spannerColName.equals(shardIdCol)) {
result = populateShardId(result, shardIdCol);
result = populateShardId(result, shardIdCol, spannerTableName);
continue;
}

Expand All @@ -167,7 +167,14 @@ public Map<String, Value> transformChangeEvent(GenericRecord record, String srcT
// the schemaMapper returns null.
if (spannerColName.equals(
schemaMapper.getSyntheticPrimaryKeyColName(namespace, spannerTableName))) {
result.put(spannerColName, Value.string(getUUID()));
Type spannerColumnType =
schemaMapper.getSpannerColumnType(namespace, spannerTableName, spannerColName);
if (spannerColumnType.getCode() == Type.Code.UUID
|| spannerColumnType.getCode() == Type.Code.PG_UUID) {
result.put(spannerColName, Value.uuid(java.util.UUID.randomUUID()));
} else {
result.put(spannerColName, Value.string(getUUID()));
}
continue;
}

Expand Down Expand Up @@ -432,11 +439,17 @@ private MigrationTransformationResponse getCustomTransformationResponse(
return migrationTransformationResponse;
}

private Map<String, Value> populateShardId(Map<String, Value> result, String shardIdCol) {
private Map<String, Value> populateShardId(
Map<String, Value> result, String shardIdCol, String tableName) {
if (shardId == null || shardId.isBlank()) {
return result;
}
result.put(shardIdCol, Value.string(shardId));
Type spannerType = schemaMapper.getSpannerColumnType(namespace, tableName, shardIdCol);
if (spannerType.getCode() == Type.Code.UUID || spannerType.getCode() == Type.Code.PG_UUID) {
result.put(shardIdCol, Value.uuid(java.util.UUID.fromString(shardId)));
} else {
result.put(shardIdCol, Value.string(shardId));
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ public static com.google.cloud.spanner.Key changeEventToPrimaryKey(
ChangeEventTypeConvertor.toString(
changeEvent, keyColName, /* requiredField= */ true));
break;
case UUID:
case PG_UUID:
pk.appendObject(
ChangeEventTypeConvertor.toUuid(
changeEvent, keyColName, /* requiredField= */ true));
break;
case NUMERIC:
case PG_NUMERIC:
pk.append(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,32 @@ public static Value toValue(
case ARRAY:
// TODO(b/422928714): Add support for Array types.
return null;
case UUID:
case PG_UUID:
return Value.uuid(toUuid(changeEvent, key, requiredField));
// TODO(b/179070999) - Add support for other data types.
default:
throw new IllegalArgumentException(
"Column name(" + key + ") has unsupported column type(" + columnType.getCode() + ")");
}
}

public static java.util.UUID toUuid(JsonNode changeEvent, String key, boolean requiredField)
throws ChangeEventConvertorException {
if (!containsValue(changeEvent, key, requiredField)) {
return null;
}
try {
String uuidStr = changeEvent.get(key).asText();
if (uuidStr.equalsIgnoreCase("NULL") || uuidStr.isEmpty()) {
return null;
}
return java.util.UUID.fromString(uuidStr);
} catch (Exception e) {
throw new ChangeEventConvertorException("Unable to convert field " + key + " to UUID ", e);
}
}

public static Boolean toBoolean(JsonNode changeEvent, String key, boolean requiredField)
throws ChangeEventConvertorException {

Expand Down
Loading
Loading