diff --git a/pkg/frontend/data_branch_helpers.go b/pkg/frontend/data_branch_helpers.go index bb9425b33f83a..02c5fc893737b 100644 --- a/pkg/frontend/data_branch_helpers.go +++ b/pkg/frontend/data_branch_helpers.go @@ -43,6 +43,42 @@ func containsDataBranchTempTableName(sqlLower string) bool { containsTempTableMarker(sqlLower, "__mo_diff_ins_") } +func dataBranchTempSQLNeedsBackExec(sqlLower string) bool { + if !containsDataBranchTempTableName(sqlLower) { + return false + } + + switch { + case strings.HasPrefix(sqlLower, "drop table"): + return true + case strings.HasPrefix(sqlLower, "create table"): + return true + case strings.HasPrefix(sqlLower, "truncate table"): + return true + case strings.HasPrefix(sqlLower, "insert into "): + return containsDataBranchTempTableName(firstSQLTableToken(sqlLower[len("insert into "):])) + case strings.HasPrefix(sqlLower, "replace into "): + return containsDataBranchTempTableName(firstSQLTableToken(sqlLower[len("replace into "):])) + case strings.HasPrefix(sqlLower, "delete from "): + return containsDataBranchTempTableName(firstSQLTableToken(sqlLower[len("delete from "):])) + case strings.HasPrefix(sqlLower, "update "): + return containsDataBranchTempTableName(firstSQLTableToken(sqlLower[len("update "):])) + default: + return true + } +} + +func firstSQLTableToken(sqlLower string) string { + sqlLower = strings.TrimSpace(sqlLower) + for i := 0; i < len(sqlLower); i++ { + switch sqlLower[i] { + case ' ', '\t', '\n', '\r', '(': + return sqlLower[:i] + } + } + return sqlLower +} + func containsTempTableMarker(sqlLower, marker string) bool { searchFrom := 0 for { @@ -125,7 +161,7 @@ func runSql( if strings.HasPrefix(trimmedLower, "drop database") { // Internal executor does not support DROP DATABASE (IsPublishing panics). useBackExec = true - } else if containsDataBranchTempTableName(trimmedLower) { + } else if dataBranchTempSQLNeedsBackExec(trimmedLower) { // Branch diff/merge/pick temp tables do repeated DDL/DML in one shared txn. // The internal SQL fast path skips per-statement workspace increments and can // hit ErrTxnNeedRetryWithDefChanged in RC mode while these temp definitions churn. diff --git a/pkg/frontend/data_branch_helpers_test.go b/pkg/frontend/data_branch_helpers_test.go index e25e388ce3ef0..0e2e3489ca464 100644 --- a/pkg/frontend/data_branch_helpers_test.go +++ b/pkg/frontend/data_branch_helpers_test.go @@ -17,6 +17,7 @@ package frontend import ( "bytes" "context" + "strings" "sync" "testing" @@ -83,6 +84,61 @@ func TestContainsDataBranchTempTableName(t *testing.T) { require.False(t, containsDataBranchTempTableName("select '__mo_diff_del_merge_1'")) } +func TestDataBranchTempSQLNeedsBackExec(t *testing.T) { + tests := []struct { + name string + sql string + want bool + }{ + { + name: "drop temp table", + sql: "drop table if exists test.__mo_diff_del_merge_1", + want: true, + }, + { + name: "create temp table", + sql: "create table test.__mo_diff_ins_merge_1 as select id from test.t where 1 = 0", + want: true, + }, + { + name: "insert into temp table", + sql: "insert into test.__mo_diff_del_merge_1 values (1)", + want: true, + }, + { + name: "delete from temp table", + sql: "delete from test.__mo_diff_ins_merge_1", + want: true, + }, + { + name: "main table delete reads temp table", + sql: "delete from test.orders where id in (select id from test.__mo_diff_del_merge_1)", + want: false, + }, + { + name: "main table insert reads temp table", + sql: "insert into test.orders (id, name) select id, name from test.__mo_diff_ins_merge_1", + want: false, + }, + { + name: "unknown temp table statement stays conservative", + sql: "select * from test.__mo_diff_ins_merge_1", + want: true, + }, + { + name: "ordinary statement", + sql: "delete from test.orders where id = 1", + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.want, dataBranchTempSQLNeedsBackExec(strings.ToLower(tt.sql))) + }) + } +} + func TestRunSQL_BackgroundExecPaths(t *testing.T) { ses := newValidateSession(t) @@ -137,6 +193,35 @@ func TestRunSQL_DataBranchTempTablesUseBackgroundExec(t *testing.T) { require.Empty(t, spyExec.sql) } +func TestRunSQL_DataBranchMainTableDMLUsesInternalExec(t *testing.T) { + tests := []struct { + name string + sql string + }{ + { + name: "delete main table using diff delete table", + sql: "delete from test.orders where id in (select id from test.__mo_diff_del_merge_1)", + }, + { + name: "insert main table using diff insert table", + sql: "insert into test.orders (id, name) select id, name from test.__mo_diff_ins_merge_1", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ses := newValidateSession(t) + spyExec := &pickStreamingExecutor{} + bh := newPickStreamingBackExecForTest(t, ses, spyExec) + + ret, err := runSql(context.Background(), ses, bh, tt.sql, nil, nil) + require.NoError(t, err) + ret.Close() + require.Equal(t, tt.sql, spyExec.sql) + }) + } +} + func TestScanSnapshotRelationByID_EarlyAndErrorPaths(t *testing.T) { ses := newValidateSession(t) diff --git a/pkg/frontend/data_branch_pick.go b/pkg/frontend/data_branch_pick.go index b366f2af8f12e..ea381b87f24d1 100644 --- a/pkg/frontend/data_branch_pick.go +++ b/pkg/frontend/data_branch_pick.go @@ -816,7 +816,7 @@ func materializeSubqueryUnified( // Compose the subquery SQL: wrap the user's SELECT with ORDER BY for // streaming sorted results. For composite PKs we ORDER BY all component // columns so that serial()-encoded keys arrive in ascending byte order. - fmtCtx := tree.NewFmtCtx(dialect.MYSQL) + fmtCtx := tree.NewFmtCtx(dialect.MYSQL, tree.WithSingleQuoteString()) stmt.Keys.Select.Format(fmtCtx) subquerySQL := fmtCtx.String() diff --git a/pkg/frontend/data_branch_pick_test.go b/pkg/frontend/data_branch_pick_test.go index 5ccc752026bc2..dc310b75317ff 100644 --- a/pkg/frontend/data_branch_pick_test.go +++ b/pkg/frontend/data_branch_pick_test.go @@ -1179,6 +1179,71 @@ func TestMaterializeSubqueryUnified_SinglePKSuccessBuildsFilterAndHashmap(t *tes } } +func TestMaterializeSubqueryUnified_PreservesStringLiteralQuotes(t *testing.T) { + ses := newValidateSession(t) + stmtNode, err := parsers.ParseOne( + context.Background(), + dialect.MYSQL, + "data branch pick orders_fix into orders keys(select order_id from orders_fix where customer = 'Grace') when conflict accept", + 1, + ) + require.NoError(t, err) + + stmt, ok := stmtNode.(*tree.DataBranchPick) + require.True(t, ok) + + oldBuildPlanWithAuthorization := buildPlanWithAuthorization + defer func() { + buildPlanWithAuthorization = oldBuildPlanWithAuthorization + }() + buildPlanWithAuthorization = func( + reqCtx context.Context, + ses FeSession, + ctx plan2.CompilerContext, + stmt tree.Statement, + ) (*plan2.Plan, error) { + return nil, nil + } + + tblStuff := tableStuff{} + tblStuff.def.colTypes = []types.Type{types.T_int64.ToType()} + tblStuff.def.pkColIdx = 0 + tblStuff.def.pkColIdxes = []int{0} + + hm, err := databranchutils.NewBranchHashmap(databranchutils.WithBranchHashmapShardCount(1)) + require.NoError(t, err) + defer func() { + require.NoError(t, hm.Close()) + }() + + exec := &pickStreamingExecutor{ + result: executor.Result{ + Batches: []*batch.Batch{buildPickStreamingBatch( + t, + ses.proc.Mp(), + []types.Type{types.T_int64.ToType()}, + nil, + )}, + Mp: ses.proc.Mp(), + }, + } + bh := newPickStreamingBackExecForTest(t, ses, exec) + + pkFilter, err := materializeSubqueryUnified( + context.Background(), + ses, + bh, + stmt, + tblStuff, + false, + hm, + ) + require.NoError(t, err) + require.Nil(t, pkFilter) + require.Contains(t, exec.sql, "customer = 'Grace'") + require.NotContains(t, exec.sql, "customer = Grace") +} + func TestMaterializeSubqueryUnified_CompositePKOrdersAllColumns(t *testing.T) { ses := newValidateSession(t) stmtNode, err := parsers.ParseOne( diff --git a/test/distributed/cases/git4data/branch/pick/pick_5.result b/test/distributed/cases/git4data/branch/pick/pick_5.result index e8d5c8cbd8b36..9837ee059c8a3 100644 --- a/test/distributed/cases/git4data/branch/pick/pick_5.result +++ b/test/distributed/cases/git4data/branch/pick/pick_5.result @@ -46,6 +46,23 @@ select * from t1 order by a asc; drop table pick_keys; drop table t1; drop table t2; +create table orders (order_id int primary key, customer varchar(20), amount int); +insert into orders values (1,'Alice',10),(2,'Bob',20),(3,'Carol',30); +data branch create table orders_fix from orders; +insert into orders_fix values (4,'Grace',40),(5,'Heidi',50),(6,'Grace',60); +data branch pick orders_fix into orders keys(select order_id from orders_fix where customer = 'Grace') when conflict accept; +select * from orders order by order_id asc; +➤ order_id[4,32,0] ¦ customer[12,-1,0] ¦ amount[4,32,0] 𝄀 +1 ¦ Alice ¦ 10 𝄀 +2 ¦ Bob ¦ 20 𝄀 +3 ¦ Carol ¦ 30 𝄀 +4 ¦ Grace ¦ 40 𝄀 +6 ¦ Grace ¦ 60 +data branch diff orders_fix against orders; +➤ diff orders_fix against orders[12,0,0] ¦ flag[12,0,0] ¦ order_id[4,0,0] ¦ customer[12,0,0] ¦ amount[4,0,0] 𝄀 +orders_fix ¦ INSERT ¦ 5 ¦ Heidi ¦ 50 +drop table orders; +drop table orders_fix; create table t0 (a int, b int, primary key(a)); insert into t0 values (1,1),(2,2),(3,3); data branch create table t1 from t0; diff --git a/test/distributed/cases/git4data/branch/pick/pick_5.sql b/test/distributed/cases/git4data/branch/pick/pick_5.sql index b29909469a20c..c27e8b0e672c1 100644 --- a/test/distributed/cases/git4data/branch/pick/pick_5.sql +++ b/test/distributed/cases/git4data/branch/pick/pick_5.sql @@ -49,7 +49,26 @@ drop table t1; drop table t2; -- ---------------------------------------------------------------- --- case 3: empty subquery result — should be no-op +-- case 3: subquery with string literal predicate +-- ---------------------------------------------------------------- + +create table orders (order_id int primary key, customer varchar(20), amount int); +insert into orders values (1,'Alice',10),(2,'Bob',20),(3,'Carol',30); + +data branch create table orders_fix from orders; +insert into orders_fix values (4,'Grace',40),(5,'Heidi',50),(6,'Grace',60); + +data branch pick orders_fix into orders keys(select order_id from orders_fix where customer = 'Grace') when conflict accept; +select * from orders order by order_id asc; + +-- verify: non-Grace row is still in diff +data branch diff orders_fix against orders; + +drop table orders; +drop table orders_fix; + +-- ---------------------------------------------------------------- +-- case 4: empty subquery result — should be no-op -- ---------------------------------------------------------------- create table t0 (a int, b int, primary key(a)); @@ -70,7 +89,7 @@ drop table t1; drop table t2; -- ---------------------------------------------------------------- --- case 4: large subquery — pick 25 out of 100 new rows +-- case 5: large subquery — pick 25 out of 100 new rows -- ---------------------------------------------------------------- create table t1 (a int, b varchar(20), primary key(a)); @@ -91,7 +110,7 @@ drop table t1; drop table t2; -- ---------------------------------------------------------------- --- case 5: subquery with DISTINCT and ORDER BY +-- case 6: subquery with DISTINCT and ORDER BY -- ---------------------------------------------------------------- create table t0 (a int, b int, primary key(a)); @@ -116,7 +135,7 @@ drop table t1; drop table t2; -- ---------------------------------------------------------------- --- case 6: subquery keys must not contain NULL +-- case 7: subquery keys must not contain NULL -- ---------------------------------------------------------------- create table t1 (a int, b int, primary key(a));