Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion pkg/frontend/data_branch_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,42 @@ func containsDataBranchTempTableName(sqlLower string) bool {
containsTempTableMarker(sqlLower, "__mo_diff_ins_")
}

func dataBranchTempSQLNeedsBackExec(sqlLower string) bool {
if !containsDataBranchTempTableName(sqlLower) {
return false
}

switch {
case strings.HasPrefix(sqlLower, "drop table"):
return true
case strings.HasPrefix(sqlLower, "create table"):
return true
case strings.HasPrefix(sqlLower, "truncate table"):
return true
case strings.HasPrefix(sqlLower, "insert into "):
return containsDataBranchTempTableName(firstSQLTableToken(sqlLower[len("insert into "):]))
case strings.HasPrefix(sqlLower, "replace into "):
return containsDataBranchTempTableName(firstSQLTableToken(sqlLower[len("replace into "):]))
case strings.HasPrefix(sqlLower, "delete from "):
return containsDataBranchTempTableName(firstSQLTableToken(sqlLower[len("delete from "):]))
case strings.HasPrefix(sqlLower, "update "):
return containsDataBranchTempTableName(firstSQLTableToken(sqlLower[len("update "):]))
default:
return true
}
}

func firstSQLTableToken(sqlLower string) string {
sqlLower = strings.TrimSpace(sqlLower)
for i := 0; i < len(sqlLower); i++ {
switch sqlLower[i] {
case ' ', '\t', '\n', '\r', '(':
return sqlLower[:i]
}
}
return sqlLower
}

func containsTempTableMarker(sqlLower, marker string) bool {
searchFrom := 0
for {
Expand Down Expand Up @@ -125,7 +161,7 @@ func runSql(
if strings.HasPrefix(trimmedLower, "drop database") {
// Internal executor does not support DROP DATABASE (IsPublishing panics).
useBackExec = true
} else if containsDataBranchTempTableName(trimmedLower) {
} else if dataBranchTempSQLNeedsBackExec(trimmedLower) {
// Branch diff/merge/pick temp tables do repeated DDL/DML in one shared txn.
// The internal SQL fast path skips per-statement workspace increments and can
// hit ErrTxnNeedRetryWithDefChanged in RC mode while these temp definitions churn.
Expand Down
85 changes: 85 additions & 0 deletions pkg/frontend/data_branch_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package frontend
import (
"bytes"
"context"
"strings"
"sync"
"testing"

Expand Down Expand Up @@ -83,6 +84,61 @@ func TestContainsDataBranchTempTableName(t *testing.T) {
require.False(t, containsDataBranchTempTableName("select '__mo_diff_del_merge_1'"))
}

func TestDataBranchTempSQLNeedsBackExec(t *testing.T) {
tests := []struct {
name string
sql string
want bool
}{
{
name: "drop temp table",
sql: "drop table if exists test.__mo_diff_del_merge_1",
want: true,
},
{
name: "create temp table",
sql: "create table test.__mo_diff_ins_merge_1 as select id from test.t where 1 = 0",
want: true,
},
{
name: "insert into temp table",
sql: "insert into test.__mo_diff_del_merge_1 values (1)",
want: true,
},
{
name: "delete from temp table",
sql: "delete from test.__mo_diff_ins_merge_1",
want: true,
},
{
name: "main table delete reads temp table",
sql: "delete from test.orders where id in (select id from test.__mo_diff_del_merge_1)",
want: false,
},
{
name: "main table insert reads temp table",
sql: "insert into test.orders (id, name) select id, name from test.__mo_diff_ins_merge_1",
want: false,
},
{
name: "unknown temp table statement stays conservative",
sql: "select * from test.__mo_diff_ins_merge_1",
want: true,
},
{
name: "ordinary statement",
sql: "delete from test.orders where id = 1",
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.want, dataBranchTempSQLNeedsBackExec(strings.ToLower(tt.sql)))
})
}
}

func TestRunSQL_BackgroundExecPaths(t *testing.T) {
ses := newValidateSession(t)

Expand Down Expand Up @@ -137,6 +193,35 @@ func TestRunSQL_DataBranchTempTablesUseBackgroundExec(t *testing.T) {
require.Empty(t, spyExec.sql)
}

func TestRunSQL_DataBranchMainTableDMLUsesInternalExec(t *testing.T) {
tests := []struct {
name string
sql string
}{
{
name: "delete main table using diff delete table",
sql: "delete from test.orders where id in (select id from test.__mo_diff_del_merge_1)",
},
{
name: "insert main table using diff insert table",
sql: "insert into test.orders (id, name) select id, name from test.__mo_diff_ins_merge_1",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ses := newValidateSession(t)
spyExec := &pickStreamingExecutor{}
bh := newPickStreamingBackExecForTest(t, ses, spyExec)

ret, err := runSql(context.Background(), ses, bh, tt.sql, nil, nil)
require.NoError(t, err)
ret.Close()
require.Equal(t, tt.sql, spyExec.sql)
})
}
}

func TestScanSnapshotRelationByID_EarlyAndErrorPaths(t *testing.T) {
ses := newValidateSession(t)

Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/data_branch_pick.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ func materializeSubqueryUnified(
// Compose the subquery SQL: wrap the user's SELECT with ORDER BY for
// streaming sorted results. For composite PKs we ORDER BY all component
// columns so that serial()-encoded keys arrive in ascending byte order.
fmtCtx := tree.NewFmtCtx(dialect.MYSQL)
fmtCtx := tree.NewFmtCtx(dialect.MYSQL, tree.WithSingleQuoteString())
stmt.Keys.Select.Format(fmtCtx)
subquerySQL := fmtCtx.String()

Expand Down
65 changes: 65 additions & 0 deletions pkg/frontend/data_branch_pick_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,71 @@ func TestMaterializeSubqueryUnified_SinglePKSuccessBuildsFilterAndHashmap(t *tes
}
}

func TestMaterializeSubqueryUnified_PreservesStringLiteralQuotes(t *testing.T) {
ses := newValidateSession(t)
stmtNode, err := parsers.ParseOne(
context.Background(),
dialect.MYSQL,
"data branch pick orders_fix into orders keys(select order_id from orders_fix where customer = 'Grace') when conflict accept",
1,
)
require.NoError(t, err)

stmt, ok := stmtNode.(*tree.DataBranchPick)
require.True(t, ok)

oldBuildPlanWithAuthorization := buildPlanWithAuthorization
defer func() {
buildPlanWithAuthorization = oldBuildPlanWithAuthorization
}()
buildPlanWithAuthorization = func(
reqCtx context.Context,
ses FeSession,
ctx plan2.CompilerContext,
stmt tree.Statement,
) (*plan2.Plan, error) {
return nil, nil
}

tblStuff := tableStuff{}
tblStuff.def.colTypes = []types.Type{types.T_int64.ToType()}
tblStuff.def.pkColIdx = 0
tblStuff.def.pkColIdxes = []int{0}

hm, err := databranchutils.NewBranchHashmap(databranchutils.WithBranchHashmapShardCount(1))
require.NoError(t, err)
defer func() {
require.NoError(t, hm.Close())
}()

exec := &pickStreamingExecutor{
result: executor.Result{
Batches: []*batch.Batch{buildPickStreamingBatch(
t,
ses.proc.Mp(),
[]types.Type{types.T_int64.ToType()},
nil,
)},
Mp: ses.proc.Mp(),
},
}
bh := newPickStreamingBackExecForTest(t, ses, exec)

pkFilter, err := materializeSubqueryUnified(
context.Background(),
ses,
bh,
stmt,
tblStuff,
false,
hm,
)
require.NoError(t, err)
require.Nil(t, pkFilter)
require.Contains(t, exec.sql, "customer = 'Grace'")
require.NotContains(t, exec.sql, "customer = Grace")
}

func TestMaterializeSubqueryUnified_CompositePKOrdersAllColumns(t *testing.T) {
ses := newValidateSession(t)
stmtNode, err := parsers.ParseOne(
Expand Down
17 changes: 17 additions & 0 deletions test/distributed/cases/git4data/branch/pick/pick_5.result
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ select * from t1 order by a asc;
drop table pick_keys;
drop table t1;
drop table t2;
create table orders (order_id int primary key, customer varchar(20), amount int);
insert into orders values (1,'Alice',10),(2,'Bob',20),(3,'Carol',30);
data branch create table orders_fix from orders;
insert into orders_fix values (4,'Grace',40),(5,'Heidi',50),(6,'Grace',60);
data branch pick orders_fix into orders keys(select order_id from orders_fix where customer = 'Grace') when conflict accept;
select * from orders order by order_id asc;
➤ order_id[4,32,0] ¦ customer[12,-1,0] ¦ amount[4,32,0] 𝄀
1 ¦ Alice ¦ 10 𝄀
2 ¦ Bob ¦ 20 𝄀
3 ¦ Carol ¦ 30 𝄀
4 ¦ Grace ¦ 40 𝄀
6 ¦ Grace ¦ 60
data branch diff orders_fix against orders;
➤ diff orders_fix against orders[12,0,0] ¦ flag[12,0,0] ¦ order_id[4,0,0] ¦ customer[12,0,0] ¦ amount[4,0,0] 𝄀
orders_fix ¦ INSERT ¦ 5 ¦ Heidi ¦ 50
drop table orders;
drop table orders_fix;
create table t0 (a int, b int, primary key(a));
insert into t0 values (1,1),(2,2),(3,3);
data branch create table t1 from t0;
Expand Down
27 changes: 23 additions & 4 deletions test/distributed/cases/git4data/branch/pick/pick_5.sql
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,26 @@ drop table t1;
drop table t2;

-- ----------------------------------------------------------------
-- case 3: empty subquery result — should be no-op
-- case 3: subquery with string literal predicate
-- ----------------------------------------------------------------

create table orders (order_id int primary key, customer varchar(20), amount int);
insert into orders values (1,'Alice',10),(2,'Bob',20),(3,'Carol',30);

data branch create table orders_fix from orders;
insert into orders_fix values (4,'Grace',40),(5,'Heidi',50),(6,'Grace',60);

data branch pick orders_fix into orders keys(select order_id from orders_fix where customer = 'Grace') when conflict accept;
select * from orders order by order_id asc;

-- verify: non-Grace row is still in diff
data branch diff orders_fix against orders;

drop table orders;
drop table orders_fix;

-- ----------------------------------------------------------------
-- case 4: empty subquery result — should be no-op
-- ----------------------------------------------------------------

create table t0 (a int, b int, primary key(a));
Expand All @@ -70,7 +89,7 @@ drop table t1;
drop table t2;

-- ----------------------------------------------------------------
-- case 4: large subquery — pick 25 out of 100 new rows
-- case 5: large subquery — pick 25 out of 100 new rows
-- ----------------------------------------------------------------

create table t1 (a int, b varchar(20), primary key(a));
Expand All @@ -91,7 +110,7 @@ drop table t1;
drop table t2;

-- ----------------------------------------------------------------
-- case 5: subquery with DISTINCT and ORDER BY
-- case 6: subquery with DISTINCT and ORDER BY
-- ----------------------------------------------------------------

create table t0 (a int, b int, primary key(a));
Expand All @@ -116,7 +135,7 @@ drop table t1;
drop table t2;

-- ----------------------------------------------------------------
-- case 6: subquery keys must not contain NULL
-- case 7: subquery keys must not contain NULL
-- ----------------------------------------------------------------

create table t1 (a int, b int, primary key(a));
Expand Down
Loading