Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/sql/plan/bind_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (builder *QueryBuilder) bindExternalScan(

externalScanNode := &plan.Node{
NodeType: plan.Node_EXTERNAL_SCAN,
Stats: makeLoadExternalStats(stmt.Param, offset),
Stats: makeLoadExternalStats(stmt.Param, tableDef, offset, ctx.GetContext()),
ObjRef: objRef,
TableDef: tableDef,
ExternScan: &plan.ExternScan{
Expand Down
146 changes: 135 additions & 11 deletions pkg/sql/plan/build_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package plan

import (
"bufio"
"context"
"encoding/json"
"io"
"math"
"strings"
"time"

Expand All @@ -30,6 +32,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
"github.com/matrixorigin/matrixone/pkg/sql/util/csvparser"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options"
)

const (
Expand Down Expand Up @@ -285,13 +288,11 @@ func hasLoadUserVariable(cols []tree.LoadColumn) bool {
return false
}

func makeLoadExternalStats(param *tree.ExternParam, offset int64) *plan.Stats {
func makeLoadExternalStats(param *tree.ExternParam, tableDef *TableDef, offset int64, ctx context.Context) *plan.Stats {
// LOAD external scan parallelism is currently sized by
// getParallelSizeForExternalScan as Cost*Rowsize/WriteS3Threshold.
// Use input bytes as Cost and keep Rowsize=1 to provide a byte-size hint
// without changing that compile-time formula. This stats shape is valid
// only for LOAD external scan sizing; do not reuse it for query planning
// paths where Cost is interpreted as optimizer cost or row cardinality.
// Keep Cost*Rowsize close to input bytes, but express Cost/Outcnt/BlockNum
// as row/cardinality estimates so large LOAD keeps the expected AP path.
stats := &plan.Stats{Rowsize: 1}
if param == nil {
return stats
Expand All @@ -305,15 +306,138 @@ func makeLoadExternalStats(param *tree.ExternParam, offset int64) *plan.Stats {
if inputSize < 0 {
inputSize = 0
}
stats.Cost = float64(inputSize)
if inputSize > 0 {
stats.BlockNum = 1
stats.TableCnt = 1
stats.Outcnt = 1
if inputSize == 0 {
return stats
}

rowSize := estimateLoadRowsize(param, tableDef, inputSize, offset, ctx)
rowCount := math.Ceil(float64(inputSize) / rowSize)
if rowCount < 1 {
rowCount = 1
}
stats.Cost = rowCount
stats.Outcnt = rowCount
stats.TableCnt = rowCount
stats.Rowsize = rowSize
stats.Selectivity = 1
stats.BlockNum = int32(math.Ceil(rowCount / float64(options.DefaultBlockMaxRows)))
return stats
}

func estimateLoadRowsize(param *tree.ExternParam, tableDef *TableDef, inputSize int64, offset int64, ctx context.Context) float64 {
if param != nil && param.ScanType == tree.INLINE && param.Format == tree.CSV {
if rowSize := inlineCSVRowsize(param.Data, loadLinesTerminatedBy(param)); rowSize > 0 {
return clampLoadRowsize(rowSize, inputSize)
}
}
if rowSize := estimateLoadRowsizeFromFirstLine(param, inputSize, offset, ctx); rowSize > 0 {
return rowSize
}
if tableDef != nil {
if rowSize := GetRowSizeFromTableDef(tableDef, true) * 0.8; rowSize > 0 {
return clampLoadRowsize(rowSize, inputSize)
}
}
return clampLoadRowsize(1, inputSize)
}

func inlineCSVRowsize(data string, terminatedBy string) float64 {
if terminatedBy == "" {
terminatedBy = "\n"
}
if idx := strings.Index(data, terminatedBy); idx >= 0 {
return float64(idx + len(terminatedBy))
}
return float64(len(data))
}

func loadLinesTerminatedBy(param *tree.ExternParam) string {
if param != nil && param.Tail != nil && param.Tail.Lines != nil {
if terminated := param.Tail.Lines.TerminatedBy; terminated != nil && terminated.Value != "" {
return terminated.Value
}
}
return "\n"
}

func estimateLoadRowsizeFromFirstLine(param *tree.ExternParam, inputSize int64, offset int64, ctx context.Context) float64 {
lineTerminator := loadLinesTerminatedBy(param)
if param == nil ||
param.ScanType == tree.INLINE ||
param.Local ||
param.Format == tree.PARQUET ||
getCompressType(param, param.Filepath) != tree.NOCOMPRESS ||
(lineTerminator != "\n" && lineTerminator != "\r\n") ||
strings.HasPrefix(param.Filepath, "SHARED:/query_result/") {
return 0
}

if size := readExternalFirstLineSize(param, offset, ctx); size > 0 {
return clampLoadRowsize(float64(size), inputSize)
}
return 0
}

func readExternalFirstLineSize(param *tree.ExternParam, offset int64, ctx context.Context) int {
if param == nil {
return 0
}
if ctx == nil {
ctx = param.Ctx
}
if ctx == nil {
return 0
}

fs, readPath, err := GetForETLWithType(param, param.Filepath)
if err != nil {
return 0
}
var r io.ReadCloser
vec := fileservice.IOVector{
FilePath: readPath,
Entries: []fileservice.IOEntry{
0: {
Offset: offset,
Size: -1,
ReadCloserForRead: &r,
},
},
}
if err = fs.Read(ctx, &vec); err != nil {
return 0
}
if r == nil {
return 0
}
defer r.Close()

reader := bufio.NewReader(r)
if offset == 0 && param.Tail != nil {
for i := uint64(0); i < param.Tail.IgnoredLines; i++ {
if _, err := reader.ReadString('\n'); err != nil {
return 0
}
}
}

line, err := reader.ReadString('\n')
if len(line) == 0 && err != nil {
return 0
}
return len(line)
}

func clampLoadRowsize(rowSize float64, inputSize int64) float64 {
if rowSize < 1 {
return 1
}
if inputSize > 0 && rowSize > float64(inputSize) {
return float64(inputSize)
}
return rowSize
}

func loadParquetMayListFiles(param *tree.ExternParam) bool {
return param != nil &&
param.Format == tree.PARQUET &&
Expand Down Expand Up @@ -420,7 +544,7 @@ func buildLoad(stmt *tree.Load, ctx CompilerContext, isPrepareStmt bool) (*Plan,

externalScanNode := &plan.Node{
NodeType: plan.Node_EXTERNAL_SCAN,
Stats: makeLoadExternalStats(stmt.Param, offset),
Stats: makeLoadExternalStats(stmt.Param, tableDef, offset, ctx.GetContext()),
ProjectList: externalProject,
ObjRef: objRef,
TableDef: tableDef,
Expand Down
Loading
Loading