Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/sql/plan/bind_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (builder *QueryBuilder) bindExternalScan(

externalScanNode := &plan.Node{
NodeType: plan.Node_EXTERNAL_SCAN,
Stats: makeLoadExternalStats(stmt.Param, offset),
Stats: makeLoadExternalStats(stmt.Param, tableDef, offset),
ObjRef: objRef,
TableDef: tableDef,
ExternScan: &plan.ExternScan{
Expand Down
55 changes: 44 additions & 11 deletions pkg/sql/plan/build_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bufio"
"encoding/json"
"io"
"math"
"strings"
"time"

Expand All @@ -30,6 +31,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
"github.com/matrixorigin/matrixone/pkg/sql/util/csvparser"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options"
)

const (
Expand Down Expand Up @@ -285,13 +287,11 @@ func hasLoadUserVariable(cols []tree.LoadColumn) bool {
return false
}

func makeLoadExternalStats(param *tree.ExternParam, offset int64) *plan.Stats {
func makeLoadExternalStats(param *tree.ExternParam, tableDef *TableDef, offset int64) *plan.Stats {
// LOAD external scan parallelism is currently sized by
// getParallelSizeForExternalScan as Cost*Rowsize/WriteS3Threshold.
// Use input bytes as Cost and keep Rowsize=1 to provide a byte-size hint
// without changing that compile-time formula. This stats shape is valid
// only for LOAD external scan sizing; do not reuse it for query planning
// paths where Cost is interpreted as optimizer cost or row cardinality.
// Keep Cost*Rowsize close to input bytes, but express Cost/Outcnt/BlockNum
// as row/cardinality estimates so large LOAD keeps the expected AP path.
stats := &plan.Stats{Rowsize: 1}
if param == nil {
return stats
Expand All @@ -305,15 +305,48 @@ func makeLoadExternalStats(param *tree.ExternParam, offset int64) *plan.Stats {
if inputSize < 0 {
inputSize = 0
}
stats.Cost = float64(inputSize)
if inputSize > 0 {
stats.BlockNum = 1
stats.TableCnt = 1
stats.Outcnt = 1
if inputSize == 0 {
return stats
}

rowSize := estimateLoadRowsize(param, tableDef, inputSize)
rowCount := math.Ceil(float64(inputSize) / rowSize)
if rowCount < 1 {
rowCount = 1
}
stats.Cost = rowCount
stats.Outcnt = rowCount
stats.TableCnt = rowCount
stats.Rowsize = rowSize
stats.Selectivity = 1
stats.BlockNum = int32(rowCount/float64(options.DefaultBlockMaxRows)) + 1
return stats
}

func estimateLoadRowsize(param *tree.ExternParam, tableDef *TableDef, inputSize int64) float64 {
if param != nil && param.ScanType == tree.INLINE && param.Format == tree.CSV {
if idx := strings.Index(param.Data, "\n"); idx > 0 {
return clampLoadRowsize(float64(idx), inputSize)
}
}
if tableDef != nil {
if rowSize := GetRowSizeFromTableDef(tableDef, true) * 0.8; rowSize > 0 {
return clampLoadRowsize(rowSize, inputSize)
}
}
return clampLoadRowsize(1, inputSize)
}

func clampLoadRowsize(rowSize float64, inputSize int64) float64 {
if rowSize < 1 {
return 1
}
if inputSize > 0 && rowSize > float64(inputSize) {
return float64(inputSize)
}
return rowSize
}

func loadParquetMayListFiles(param *tree.ExternParam) bool {
return param != nil &&
param.Format == tree.PARQUET &&
Expand Down Expand Up @@ -420,7 +453,7 @@ func buildLoad(stmt *tree.Load, ctx CompilerContext, isPrepareStmt bool) (*Plan,

externalScanNode := &plan.Node{
NodeType: plan.Node_EXTERNAL_SCAN,
Stats: makeLoadExternalStats(stmt.Param, offset),
Stats: makeLoadExternalStats(stmt.Param, tableDef, offset),
ProjectList: externalProject,
ObjRef: objRef,
TableDef: tableDef,
Expand Down
59 changes: 49 additions & 10 deletions pkg/sql/plan/build_load_parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (
"testing"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/types"
pbplan "github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -281,30 +284,66 @@ func TestValidateLoadParquetOptionsIgnoresNonParquet(t *testing.T) {
}

func TestMakeLoadExternalStatsUsesInputBytes(t *testing.T) {
tableDef := &TableDef{
Cols: []*ColDef{
{Name: "a", Typ: Type{Id: int32(types.T_int64), Width: 64}},
{Name: "b", Typ: Type{Id: int32(types.T_char), Width: 1}},
},
}
stats := makeLoadExternalStats(&tree.ExternParam{
ExParamConst: tree.ExParamConst{FileSize: 100},
}, 25)
require.Equal(t, float64(75), stats.Cost)
require.Equal(t, float64(1), stats.Rowsize)
require.Equal(t, int32(1), stats.BlockNum)
require.Equal(t, float64(1), stats.TableCnt)
require.Equal(t, float64(1), stats.Outcnt)
}, tableDef, 25)
require.GreaterOrEqual(t, stats.Cost, float64(1))
require.GreaterOrEqual(t, stats.Rowsize, float64(1))
requireLoadByteHint(t, stats, 75)
require.Equal(t, stats.Cost, stats.TableCnt)
require.Equal(t, stats.Cost, stats.Outcnt)

stats = makeLoadExternalStats(&tree.ExternParam{
ExParamConst: tree.ExParamConst{FileSize: 10},
}, 20)
}, tableDef, 20)
require.Equal(t, float64(0), stats.Cost)
require.Equal(t, float64(1), stats.Rowsize)
require.Equal(t, int32(0), stats.BlockNum)

stats = makeLoadExternalStats(&tree.ExternParam{
ExParamConst: tree.ExParamConst{
ScanType: tree.INLINE,
Format: tree.CSV,
Data: "1,2\n3,4\n",
},
}, 0)
require.Equal(t, float64(8), stats.Cost)
require.Equal(t, float64(1), stats.Rowsize)
}, tableDef, 0)
require.Equal(t, float64(3), stats.Rowsize)
requireLoadByteHint(t, stats, 8)
}

func TestMakeLoadExternalStatsKeepsLargeLoadMultiCN(t *testing.T) {
tableDef := &TableDef{
Cols: []*ColDef{
{Name: "a", Typ: Type{Id: int32(types.T_int64), Width: 64}},
{Name: "b", Typ: Type{Id: int32(types.T_char), Width: 1}},
},
}
inputSize := int64(float64(options.DefaultBlockMaxRows) * GetRowSizeFromTableDef(tableDef, true) * 0.8 * float64(BlockThresholdForOneCN+1))
stats := makeLoadExternalStats(&tree.ExternParam{
ExParamConst: tree.ExParamConst{FileSize: inputSize},
}, tableDef, 0)
require.Greater(t, stats.BlockNum, int32(BlockThresholdForOneCN))
require.Greater(t, stats.Cost, float64(costThresholdForOneCN))
require.Equal(t, ExecTypeAP_MULTICN, GetExecType(&Query{
Nodes: []*Node{{
NodeType: pbplan.Node_EXTERNAL_SCAN,
Stats: stats,
}},
Steps: []int32{0},
}, false, false))
}

func requireLoadByteHint(t *testing.T, stats *Stats, inputSize int64) {
t.Helper()
estimatedBytes := stats.Cost * stats.Rowsize
require.GreaterOrEqual(t, estimatedBytes, float64(inputSize))
require.Less(t, estimatedBytes, float64(inputSize)+stats.Rowsize)
}

func TestLoadParquetMayListFiles(t *testing.T) {
Expand Down
Loading