diff --git a/entitybatch/batch.go b/entitybatch/batch.go new file mode 100644 index 00000000..1099686f --- /dev/null +++ b/entitybatch/batch.go @@ -0,0 +1,171 @@ +package db + +import ( + "github.com/scylladb/gocqlx/v2/qb" + "github.com/scylladb/gocqlx/v2/table" + "strconv" + "time" +) + +// The OperationName should be literal the public db Operation function name, such as "Create", "Read", "Update", "Delete". +// For those special operation(such as delte&create, please use the tool functions, which begins with Generate* in this package. +type Operation struct { + Stmt string + Names []string +} + +const ( + CREATE = "Create" + UPDATE = "Update" + DELETE = "Delete" + READ = "Read" + FINDBYPARTKEY = "FindByPartKey_" + FINDBYPARTKEYSANDSORTKEYS = "FindByPartKeyAndSortKey_" +) + +type SingleEntityBatch struct { + tables []*table.Table + PrebuiltOperations map[string]Operation +} + +func (this *SingleEntityBatch) SetupTables(table_metadatas []table.Metadata) { + this.tables = make([]*table.Table, len(table_metadatas)) + for i, tm := range table_metadatas { + this.tables[i] = table.New(table.Metadata{ + Name: tm.Name, + Columns: tm.Columns, + PartKey: tm.PartKey, + SortKey: tm.SortKey, + }) + } +} + +// Prebuild batch operations: +// Create, Update, Delete +// Read the entity by primary key columns from the BASE table. +func (this *SingleEntityBatch) PreBuildOperations() { + this.PrebuiltOperations = make(map[string]Operation) + + // Create entities + { + batchBuilder := qb.Batch() + for _, t := range this.tables { + stmt, names := t.Insert() + batchBuilder.AddStmtWithPrefix(t.Name(), stmt, names) + } + stmt, names := batchBuilder.ToCql() + this.PrebuiltOperations[CREATE] = Operation{stmt, names} + } + + // Update entities + { + batchBuilder := qb.Batch() + for _, t := range this.tables { + stmt, names := t.Update(getFiledsForUpdateBatch(t)...) + batchBuilder.AddStmtWithPrefix(t.Metadata().Name, stmt, names) + } + stmt, names := batchBuilder.ToCql() + this.PrebuiltOperations[UPDATE] = Operation{stmt, names} + } + + // Delete entities + { + batchBuilder := qb.Batch() + for _, t := range this.tables { + stmt, names := t.Delete() + batchBuilder.AddStmtWithPrefix(t.Metadata().Name, stmt, names) + } + stmt, names := batchBuilder.ToCql() + this.PrebuiltOperations[DELETE] = Operation{stmt, names} + } + + // Read the entity by primary key columns from the base table. + { + t := this.tables[0] + stmt, names := t.Get(t.Metadata().Columns...) + this.PrebuiltOperations[READ] = Operation{stmt, names} + } + + //Find entities from the find_by tables by partition keys. + { + for _, t := range this.tables { + stmt, names := t.Select(t.Metadata().Columns...) + this.PrebuiltOperations[GenerateFindByPartKeyOperationName(t.Metadata().Name)] = Operation{stmt, names} + } + } + + //Find entities from the find_by tables by partition keys and some of the sort keys. + { + for _, t := range this.tables { + for i := 0; i < len(t.Metadata().SortKey); i++ { + stmt, names := qb.Select(t.Metadata().Name). + Columns(t.Metadata().Columns...). + Where(t.PrimaryKeyCmp()[0 : i+1]...). + ToCql() + this.PrebuiltOperations[GenerateFindByPartKeysAndSortKeysOperationName(t.Metadata().Name, i)] = Operation{stmt, names} + } + } + } +} + +// Filters out search tables' primary keys because they cannot appear in the update batch's cql statements. +func getFiledsForUpdateBatch(t *table.Table) []string { + columns := t.Metadata().Columns + // primary columns (partition columns and sort columns) are not updatable. + primarykeys := make(map[string]struct{}) + for _, partKey := range t.Metadata().PartKey { + primarykeys[partKey] = struct{}{} + } + + for _, sortkey := range t.Metadata().SortKey { + primarykeys[sortkey] = struct{}{} + } + + var result []string + for _, col := range columns { + if _, exist := primarykeys[col]; !exist { + result = append(result, col) + } + } + return result +} + +func (this *SingleEntityBatch) Tables() []*table.Table { + return this.tables +} + + +// In order to update the partition key of the "find_by" tables, we need to delete the existing entity and create a new one in a batch cql request. +// The execution order of the statements in a batch doesn't depends on their order in the batch cql statements. +// So we need to add time stamps to the statements in the batch cql. +// DeleteAndCreateOperation CANNOT be pre-built because we need to use the latest time stamp. +func (this *SingleEntityBatch) GenerateDeleteAndCreateOperation() Operation { + // deleteAndCreate + { + batchBuilder := qb.Batch() + + // Delete the existing product + // We need the timestamp to order the cql statements execution in the batch. + for _, t := range this.tables { + primaryKeyCmp := t.PrimaryKeyCmp() + stmt, names := qb.Delete(t.Metadata().Name).Columns().Where(primaryKeyCmp...).Timestamp(time.Now()).ToCql() + batchBuilder.AddStmtWithPrefix(t.Metadata().Name+"_old", stmt, names) + } + + // Create the updated(new) product + for _, t := range this.tables { + stmt, names := qb.Insert(t.Metadata().Name).Columns(t.Metadata().Columns...).Timestamp(time.Now()).ToCql() + batchBuilder.AddStmtWithPrefix(t.Metadata().Name+"_new", stmt, names) + } + stmt, names := batchBuilder.ToCql() + return Operation{stmt, names} + } +} + +func GenerateFindByPartKeyOperationName(tableName string) string { + return FINDBYPARTKEY + tableName +} + +func GenerateFindByPartKeysAndSortKeysOperationName(tableName string, numSortCols int) string { + return FINDBYPARTKEYSANDSORTKEYS + tableName + "_" + strconv.Itoa(numSortCols) +} \ No newline at end of file diff --git a/entitybatch/batch_test.go b/entitybatch/batch_test.go new file mode 100644 index 00000000..5e5d0762 --- /dev/null +++ b/entitybatch/batch_test.go @@ -0,0 +1,230 @@ +package db + +import ( + "github.com/scylladb/gocqlx/v2/table" + "reflect" + "strconv" + "testing" +) + +// Base table is people(primary key: id) +// Search table is people_by_age(primary key: age, id) +type people struct { + Id string + FirstName string + LastName string + Age int +} + +const ( + base_table = "people" + search_table = "people_by_age" +) + +var ( + singleEntityBatch = &SingleEntityBatch{} +) + +func init() { + var metadatas []table.Metadata + metadatas = append(metadatas, table.Metadata{ + Name: base_table, + Columns: []string{"id", "first_name", "last_name", "age"}, + PartKey: []string{"id"}, + SortKey: []string{"first_name", "age"}, + }) + metadatas = append(metadatas, table.Metadata{ + Name: search_table, + Columns: []string{"id", "first_name", "last_name", "age"}, + PartKey: []string{"age"}, + SortKey: []string{"id", "first_name"}, + }) + singleEntityBatch.SetupTables(metadatas) +} + +func TestGenerateFindByPartKeyOperationName(t *testing.T) { + type args struct { + tableName string + } + tests := []struct { + name string + args args + want string + }{ + { + name: "base_table", + args: args{ + tableName: base_table, + }, + want: FINDBYPARTKEY + base_table, + }, + { + name: "search_table", + args: args{ + tableName: search_table, + }, + want: FINDBYPARTKEY + search_table, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := GenerateFindByPartKeyOperationName(tt.args.tableName); got != tt.want { + t.Errorf("GenerateFindByPartKeyOperationName() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestGenerateFindByPrimaryKeysOperationName(t *testing.T) { + type args struct { + tableName string + numSortCols int + } + tests := []struct { + name string + args args + want string + }{ + { + name: "base_table_0", + args: args{ + tableName: base_table, + numSortCols: 0, + }, + want: FINDBYPARTKEYSANDSORTKEYS + base_table + "_" + strconv.Itoa(0), + }, + { + name: "base_table_1", + args: args{ + tableName: base_table, + numSortCols: 1, + }, + want: FINDBYPARTKEYSANDSORTKEYS + base_table + "_" + strconv.Itoa(1), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := GenerateFindByPartKeysAndSortKeysOperationName(tt.args.tableName, tt.args.numSortCols); got != tt.want { + t.Errorf("GenerateFindByPartKeysAndSortKeysOperationName() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSingleEntityBatch_GenerateDeleteAndCreateOperation(t *testing.T) { + expected := Operation{ + Stmt: "BEGIN BATCH DELETE FROM people USING TIMESTAMP 1592734743192106 WHERE id=? AND first_name=? AND age=? ; DELETE FROM people_by_age USING TIMESTAMP 1592734743192115 WHERE age=? AND id=? AND first_name=? ; INSERT INTO people (id,first_name,last_name,age) VALUES (?,?,?,?) USING TIMESTAMP 1592734743192119 ; INSERT INTO people_by_age (id,first_name,last_name,age) VALUES (?,?,?,?) USING TIMESTAMP 1592734743192125 ; APPLY BATCH ", + Names: []string{"people_old.id", "people_old.first_name", "people_old.age", "people_by_age_old.age", "people_by_age_old.id", "people_by_age_old.first_name", "people_new.id", "people_new.first_name", "people_new.last_name", "people_new.age", "people_by_age_new.id", "people_by_age_new.first_name", "people_by_age_new.last_name", "people_by_age_new.age"}, + } + got := singleEntityBatch.GenerateDeleteAndCreateOperation() + + // TODO: Pass the two time stamps as parameters into make SingleEntityBatch.GenerateDeleteAndCreateOperation testable. + // The stmt contains the latest time stamp, so it cannot be tested for now. + // Stmt looks like: + // BEGIN BATCH DELETE FROM people USING TIMESTAMP 1592735095355312 WHERE id=? AND first_name=? AND age=? ; DELETE FROM people_by_age USING TIMESTAMP 1592735095355321 WHERE age=? AND id=? AND first_name=? ; INSERT INTO people (id,first_name,last_name,age) VALUES (?,?,?,?) USING TIMESTAMP 1592735095355325 ; INSERT INTO people_by_age (id,first_name,last_name,age) VALUES (?,?,?,?) USING TIMESTAMP 1592735095355331 ; APPLY BATCH + //if got.Stmt != expected.Stmt { + // t.Errorf("\n expected Stmt: %s, \n but got Stmt: %s", expected.Stmt, got.Stmt) + //} + + if !reflect.DeepEqual(got.Names, expected.Names) { + t.Errorf("expected Names: %s, got Names: %s", expected.Names, got.Names) + } +} + +func TestSingleEntityBatch_PreBuildOperations(t *testing.T) { + expected := make(map[string]Operation) + + expected["Create"] = Operation{ + Stmt: "BEGIN BATCH INSERT INTO people (id,first_name,last_name,age) VALUES (?,?,?,?) ; INSERT INTO people_by_age (id,first_name,last_name,age) VALUES (?,?,?,?) ; APPLY BATCH ", + Names: []string{"people.id", "people.first_name", "people.last_name", "people.age", "people_by_age.id", "people_by_age.first_name", "people_by_age.last_name", "people_by_age.age"}, + } + + expected["Read"] = Operation{ + Stmt: "SELECT id,first_name,last_name,age FROM people WHERE id=? AND first_name=? AND age=? ", + Names: []string{"id", "first_name", "age"}, + } + + expected["Update"] = Operation{ + Stmt: "BEGIN BATCH UPDATE people SET last_name=? WHERE id=? AND first_name=? AND age=? ; UPDATE people_by_age SET last_name=? WHERE age=? AND id=? AND first_name=? ; APPLY BATCH ", + Names: []string{"people.last_name", "people.id", "people.first_name", "people.age", "people_by_age.last_name", "people_by_age.age", "people_by_age.id", "people_by_age.first_name"}, + } + + expected["Delete"] = Operation{ + Stmt: "BEGIN BATCH DELETE FROM people WHERE id=? AND first_name=? AND age=? ; DELETE FROM people_by_age WHERE age=? AND id=? AND first_name=? ; APPLY BATCH ", + Names: []string{"people.id", "people.first_name", "people.age", "people_by_age.age", "people_by_age.id", "people_by_age.first_name"}, + } + + expected["FindByPartKeyAndSortKey_people_0"] = Operation{ + Stmt: "SELECT id,first_name,last_name,age FROM people WHERE id=? ", + Names: []string{"id"}, + } + + expected["FindByPartKeyAndSortKey_people_1"] = Operation{ + Stmt: "SELECT id,first_name,last_name,age FROM people WHERE id=? AND first_name=? ", + Names: []string{"id", "first_name"}, + } + + expected["FindByPartKeyAndSortKey_people_by_age_0"] = Operation{ + Stmt: "SELECT id,first_name,last_name,age FROM people_by_age WHERE age=? ", + Names: []string{"age"}, + } + + expected["FindByPartKeyAndSortKey_people_by_age_1"] = Operation{ + Stmt: "SELECT id,first_name,last_name,age FROM people_by_age WHERE age=? AND id=? ", + Names: []string{"age", "id"}, + } + + expected["FindByPartKey_people"] = Operation{ + Stmt: "SELECT id,first_name,last_name,age FROM people WHERE id=? ", + Names: []string{"id"}, + } + + expected["FindByPartKey_people_by_age"] = Operation{ + Stmt: "SELECT id,first_name,last_name,age FROM people_by_age WHERE age=? ", + Names: []string{"age"}, + } + singleEntityBatch.PreBuildOperations() + + for k, v := range singleEntityBatch.PrebuiltOperations { + if !reflect.DeepEqual(expected[k].Names, v.Names) { + t.Errorf("\n expected: %+v, \n but got: %+v", expected[k].Names, v.Names) + } + + if expected[k].Stmt != v.Stmt { + t.Errorf("\n expected: %+v \n but got: %+v", expected[k].Stmt, v.Stmt) + } + } +} + +func Test_getFiledsForUpdateBatch(t *testing.T) { + type args struct { + t *table.Table + } + tests := []struct { + name string + args args + want []string + }{ + { + name: "base_table", + args: args{ + t: singleEntityBatch.tables[0], + }, + want: []string{"last_name"}, + }, + { + name: "search_table", + args: args{ + t: singleEntityBatch.tables[1], + }, + want: []string{"last_name"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := getFiledsForUpdateBatch(tt.args.t); !reflect.DeepEqual(got, tt.want) { + t.Errorf("getFiledsForUpdateBatch() = %v, want %v", got, tt.want) + } + }) + } +} \ No newline at end of file