diff --git a/pkg/sql/plan/bind_load.go b/pkg/sql/plan/bind_load.go index 6e0303a4ea6c6..6150bd80d4946 100644 --- a/pkg/sql/plan/bind_load.go +++ b/pkg/sql/plan/bind_load.go @@ -184,7 +184,7 @@ func (builder *QueryBuilder) bindExternalScan( externalScanNode := &plan.Node{ NodeType: plan.Node_EXTERNAL_SCAN, - Stats: makeLoadExternalStats(stmt.Param, offset), + Stats: makeLoadExternalStats(stmt.Param, tableDef, offset), ObjRef: objRef, TableDef: tableDef, ExternScan: &plan.ExternScan{ diff --git a/pkg/sql/plan/build_load.go b/pkg/sql/plan/build_load.go index b8d6cb3761a13..11f751b61c279 100644 --- a/pkg/sql/plan/build_load.go +++ b/pkg/sql/plan/build_load.go @@ -18,6 +18,7 @@ import ( "bufio" "encoding/json" "io" + "math" "strings" "time" @@ -30,6 +31,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" "github.com/matrixorigin/matrixone/pkg/sql/util/csvparser" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" ) const ( @@ -285,13 +287,11 @@ func hasLoadUserVariable(cols []tree.LoadColumn) bool { return false } -func makeLoadExternalStats(param *tree.ExternParam, offset int64) *plan.Stats { +func makeLoadExternalStats(param *tree.ExternParam, tableDef *TableDef, offset int64) *plan.Stats { // LOAD external scan parallelism is currently sized by // getParallelSizeForExternalScan as Cost*Rowsize/WriteS3Threshold. - // Use input bytes as Cost and keep Rowsize=1 to provide a byte-size hint - // without changing that compile-time formula. This stats shape is valid - // only for LOAD external scan sizing; do not reuse it for query planning - // paths where Cost is interpreted as optimizer cost or row cardinality. + // Keep Cost*Rowsize close to input bytes, but express Cost/Outcnt/BlockNum + // as row/cardinality estimates so large LOAD keeps the expected AP path. stats := &plan.Stats{Rowsize: 1} if param == nil { return stats @@ -305,15 +305,48 @@ func makeLoadExternalStats(param *tree.ExternParam, offset int64) *plan.Stats { if inputSize < 0 { inputSize = 0 } - stats.Cost = float64(inputSize) - if inputSize > 0 { - stats.BlockNum = 1 - stats.TableCnt = 1 - stats.Outcnt = 1 + if inputSize == 0 { + return stats + } + + rowSize := estimateLoadRowsize(param, tableDef, inputSize) + rowCount := math.Ceil(float64(inputSize) / rowSize) + if rowCount < 1 { + rowCount = 1 } + stats.Cost = rowCount + stats.Outcnt = rowCount + stats.TableCnt = rowCount + stats.Rowsize = rowSize + stats.Selectivity = 1 + stats.BlockNum = int32(rowCount/float64(options.DefaultBlockMaxRows)) + 1 return stats } +func estimateLoadRowsize(param *tree.ExternParam, tableDef *TableDef, inputSize int64) float64 { + if param != nil && param.ScanType == tree.INLINE && param.Format == tree.CSV { + if idx := strings.Index(param.Data, "\n"); idx > 0 { + return clampLoadRowsize(float64(idx), inputSize) + } + } + if tableDef != nil { + if rowSize := GetRowSizeFromTableDef(tableDef, true) * 0.8; rowSize > 0 { + return clampLoadRowsize(rowSize, inputSize) + } + } + return clampLoadRowsize(1, inputSize) +} + +func clampLoadRowsize(rowSize float64, inputSize int64) float64 { + if rowSize < 1 { + return 1 + } + if inputSize > 0 && rowSize > float64(inputSize) { + return float64(inputSize) + } + return rowSize +} + func loadParquetMayListFiles(param *tree.ExternParam) bool { return param != nil && param.Format == tree.PARQUET && @@ -420,7 +453,7 @@ func buildLoad(stmt *tree.Load, ctx CompilerContext, isPrepareStmt bool) (*Plan, externalScanNode := &plan.Node{ NodeType: plan.Node_EXTERNAL_SCAN, - Stats: makeLoadExternalStats(stmt.Param, offset), + Stats: makeLoadExternalStats(stmt.Param, tableDef, offset), ProjectList: externalProject, ObjRef: objRef, TableDef: tableDef, diff --git a/pkg/sql/plan/build_load_parquet_test.go b/pkg/sql/plan/build_load_parquet_test.go index 6714e8e60f8d7..f0bc86406f9a8 100644 --- a/pkg/sql/plan/build_load_parquet_test.go +++ b/pkg/sql/plan/build_load_parquet_test.go @@ -19,7 +19,10 @@ import ( "testing" "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/matrixorigin/matrixone/pkg/container/types" + pbplan "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "github.com/stretchr/testify/require" ) @@ -281,18 +284,24 @@ func TestValidateLoadParquetOptionsIgnoresNonParquet(t *testing.T) { } func TestMakeLoadExternalStatsUsesInputBytes(t *testing.T) { + tableDef := &TableDef{ + Cols: []*ColDef{ + {Name: "a", Typ: Type{Id: int32(types.T_int64), Width: 64}}, + {Name: "b", Typ: Type{Id: int32(types.T_char), Width: 1}}, + }, + } stats := makeLoadExternalStats(&tree.ExternParam{ ExParamConst: tree.ExParamConst{FileSize: 100}, - }, 25) - require.Equal(t, float64(75), stats.Cost) - require.Equal(t, float64(1), stats.Rowsize) - require.Equal(t, int32(1), stats.BlockNum) - require.Equal(t, float64(1), stats.TableCnt) - require.Equal(t, float64(1), stats.Outcnt) + }, tableDef, 25) + require.GreaterOrEqual(t, stats.Cost, float64(1)) + require.GreaterOrEqual(t, stats.Rowsize, float64(1)) + requireLoadByteHint(t, stats, 75) + require.Equal(t, stats.Cost, stats.TableCnt) + require.Equal(t, stats.Cost, stats.Outcnt) stats = makeLoadExternalStats(&tree.ExternParam{ ExParamConst: tree.ExParamConst{FileSize: 10}, - }, 20) + }, tableDef, 20) require.Equal(t, float64(0), stats.Cost) require.Equal(t, float64(1), stats.Rowsize) require.Equal(t, int32(0), stats.BlockNum) @@ -300,11 +309,41 @@ func TestMakeLoadExternalStatsUsesInputBytes(t *testing.T) { stats = makeLoadExternalStats(&tree.ExternParam{ ExParamConst: tree.ExParamConst{ ScanType: tree.INLINE, + Format: tree.CSV, Data: "1,2\n3,4\n", }, - }, 0) - require.Equal(t, float64(8), stats.Cost) - require.Equal(t, float64(1), stats.Rowsize) + }, tableDef, 0) + require.Equal(t, float64(3), stats.Rowsize) + requireLoadByteHint(t, stats, 8) +} + +func TestMakeLoadExternalStatsKeepsLargeLoadMultiCN(t *testing.T) { + tableDef := &TableDef{ + Cols: []*ColDef{ + {Name: "a", Typ: Type{Id: int32(types.T_int64), Width: 64}}, + {Name: "b", Typ: Type{Id: int32(types.T_char), Width: 1}}, + }, + } + inputSize := int64(float64(options.DefaultBlockMaxRows) * GetRowSizeFromTableDef(tableDef, true) * 0.8 * float64(BlockThresholdForOneCN+1)) + stats := makeLoadExternalStats(&tree.ExternParam{ + ExParamConst: tree.ExParamConst{FileSize: inputSize}, + }, tableDef, 0) + require.Greater(t, stats.BlockNum, int32(BlockThresholdForOneCN)) + require.Greater(t, stats.Cost, float64(costThresholdForOneCN)) + require.Equal(t, ExecTypeAP_MULTICN, GetExecType(&Query{ + Nodes: []*Node{{ + NodeType: pbplan.Node_EXTERNAL_SCAN, + Stats: stats, + }}, + Steps: []int32{0}, + }, false, false)) +} + +func requireLoadByteHint(t *testing.T, stats *Stats, inputSize int64) { + t.Helper() + estimatedBytes := stats.Cost * stats.Rowsize + require.GreaterOrEqual(t, estimatedBytes, float64(inputSize)) + require.Less(t, estimatedBytes, float64(inputSize)+stats.Rowsize) } func TestLoadParquetMayListFiles(t *testing.T) {