Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,208 changes: 791 additions & 417 deletions pkg/pb/pipeline/pipeline.pb.go

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion pkg/sql/colexec/external/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,15 @@ func (external *External) Call(proc *process.Process) (vm.CallResult, error) {
t1 := time.Now()

analyzer := external.OpAnalyzer
param := external.Es
defer func() {
analyzer.AddScanTime(t1)
param.flushParquetProfile(analyzer)
span.End()
v2.TxnStatementExternalScanDurationHistogram.Observe(time.Since(t).Seconds())
}()

result := vm.NewCallResult()
param := external.Es
if param.Fileparam.End {
result.Status = vm.ExecStop
return result, nil
Expand Down Expand Up @@ -262,6 +263,7 @@ func (external *External) Call(proc *process.Process) (vm.CallResult, error) {
result.Batch = external.ctr.buf
if external.ctr.buf != nil {
external.ctr.maxAllocSize = max(external.ctr.maxAllocSize, external.ctr.buf.Size())
param.addParquetProfile(process.ParquetProfileStats{PeakBatchBytes: int64(external.ctr.buf.Size())})
result.Batch.ShuffleIDX = int32(param.Idx)
}

Expand Down
150 changes: 122 additions & 28 deletions pkg/sql/colexec/external/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ func newParquetHandler(param *ExternalParam) (*ParquetHandler, error) {
if err != nil {
return nil, err
}
if err := h.initRowGroupSelection(param); err != nil {
return nil, err
}

// Empty file handling (0 rows): only check column count, skip column name and type checks.
if h.file.NumRows() == 0 {
Expand All @@ -77,6 +80,9 @@ func newParquetHandler(param *ExternalParam) (*ParquetHandler, error) {
// Caller treats (nil, nil) as "empty file, advance to next".
return nil, nil
}
if h.rowGroupRows == 0 {
return nil, nil
}

err = h.prepare(param)
if err != nil {
Expand All @@ -86,6 +92,39 @@ func newParquetHandler(param *ExternalParam) (*ParquetHandler, error) {
return &h, nil
}

func (h *ParquetHandler) initRowGroupSelection(param *ExternalParam) error {
all := h.file.RowGroups()
if len(param.ParquetRowGroupShards) == 0 {
h.rowGroups = all
} else {
currentFileIndex := int32(0)
if param.Fileparam != nil && param.Fileparam.FileIndex > 0 {
currentFileIndex = int32(param.Fileparam.FileIndex - 1)
}
for _, shard := range param.ParquetRowGroupShards {
if shard.FileIndex != currentFileIndex {
continue
}
start := int(shard.RowGroupStart)
end := int(shard.RowGroupEnd)
if start < 0 || end <= start || end > len(all) {
return moerr.NewInvalidInputf(param.Ctx,
"invalid parquet row group shard [%d,%d) for file index %d with %d row groups",
start, end, currentFileIndex, len(all))
}
h.rowGroups = append(h.rowGroups, all[start:end]...)
}
}
if len(h.rowGroups) == 0 {
h.rowGroup = parquet.MultiRowGroup()
h.rowGroupRows = 0
return nil
}
h.rowGroup = parquet.MultiRowGroup(h.rowGroups...)
h.rowGroupRows = h.rowGroup.NumRows()
return nil
}

func hasPhysicalParquetAttrs(param *ExternalParam) bool {
for _, attr := range param.Attrs {
colIdx := int(attr.ColIndex)
Expand Down Expand Up @@ -114,6 +153,7 @@ func (h *ParquetHandler) openFile(param *ExternalParam, prefetchS3 bool) error {
data := util.UnsafeStringToBytes(param.Extern.Data)
r = bytes.NewReader(data)
fileSize = int64(len(data))
param.addParquetProfile(process.ParquetProfileStats{BytesRead: fileSize})
case param.Extern.Local:
return moerr.NewNYI(param.Ctx, "load parquet local")
default:
Expand All @@ -128,7 +168,7 @@ func (h *ParquetHandler) openFile(param *ExternalParam, prefetchS3 bool) error {
}
fileSize = param.FileSize[param.Fileparam.FileIndex-1]

if shouldPrefetchS3Parquet(param.Extern.ScanType, prefetchS3, fileSize) {
if shouldPrefetchS3Parquet(param.Extern.ScanType, prefetchS3, fileSize, len(param.ParquetRowGroupShards) > 0) {
data := make([]byte, int(fileSize))
vec := fileservice.IOVector{
FilePath: readPath,
Expand All @@ -143,12 +183,17 @@ func (h *ParquetHandler) openFile(param *ExternalParam, prefetchS3 bool) error {
if err := fs.Read(param.Ctx, &vec); err != nil {
return err
}
param.addParquetProfile(process.ParquetProfileStats{
BytesRead: fileSize,
PrefetchBytes: fileSize,
})
r = bytes.NewReader(data)
} else {
r = &fsReaderAt{
fs: fs,
readPath: readPath,
ctx: param.Ctx,
param: param,
}
}
}
Expand All @@ -157,30 +202,37 @@ func (h *ParquetHandler) openFile(param *ExternalParam, prefetchS3 bool) error {
return moerr.ConvertGoError(param.Ctx, err)
}

func shouldPrefetchS3Parquet(scanType int, prefetchS3 bool, fileSize int64) bool {
return scanType == tree.S3 && prefetchS3 && fileSize >= 0 && fileSize <= maxParquetS3PrefetchSize
func shouldPrefetchS3Parquet(scanType int, prefetchS3 bool, fileSize int64, hasRowGroupShards bool) bool {
return scanType == tree.S3 &&
prefetchS3 &&
!hasRowGroupShards &&
fileSize >= 0 &&
fileSize <= maxParquetS3PrefetchSize
}

// findColumnIgnoreCase finds a column in the Parquet schema with case-insensitive matching.
// It first tries exact match for performance, then falls back to case-insensitive match.
// Returns error if multiple columns match case-insensitively (ambiguous), even if one is an exact match.
func (h *ParquetHandler) findColumnIgnoreCase(ctx context.Context, name string) (*parquet.Column, error) {
root := h.file.Root()
nameLower := strings.ToLower(name)

// Single pass: find all columns that match case-insensitively
var exactMatch *parquet.Column
var caseInsensitiveMatches []*parquet.Column
type parquetColumnLookup struct {
exact map[string]*parquet.Column
folded map[string][]*parquet.Column
}

func newParquetColumnLookup(root *parquet.Column) parquetColumnLookup {
lookup := parquetColumnLookup{
exact: make(map[string]*parquet.Column),
folded: make(map[string][]*parquet.Column),
}
for _, col := range root.Columns() {
if col.Name() == name {
exactMatch = col
caseInsensitiveMatches = append(caseInsensitiveMatches, col)
} else if strings.ToLower(col.Name()) == nameLower {
caseInsensitiveMatches = append(caseInsensitiveMatches, col)
}
lookup.exact[col.Name()] = col
nameLower := strings.ToLower(col.Name())
lookup.folded[nameLower] = append(lookup.folded[nameLower], col)
}
return lookup
}

// find finds a column in the Parquet schema with case-insensitive matching.
// It returns an ambiguity error if multiple columns match case-insensitively,
// even when one of them is an exact match.
func (lookup parquetColumnLookup) find(ctx context.Context, name string) (*parquet.Column, error) {
caseInsensitiveMatches := lookup.folded[strings.ToLower(name)]
// Check for ambiguity: multiple columns match case-insensitively
if len(caseInsensitiveMatches) > 1 {
return nil, moerr.NewInvalidInputf(ctx,
Expand All @@ -189,7 +241,7 @@ func (h *ParquetHandler) findColumnIgnoreCase(ctx context.Context, name string)
}

// Return exact match if found, otherwise the single case-insensitive match
if exactMatch != nil {
if exactMatch := lookup.exact[name]; exactMatch != nil {
return exactMatch, nil
}
if len(caseInsensitiveMatches) == 1 {
Expand All @@ -199,12 +251,33 @@ func (h *ParquetHandler) findColumnIgnoreCase(ctx context.Context, name string)
return nil, nil
}

// findColumnIgnoreCase is kept for direct unit tests; prepare() builds the
// lookup once and uses it for all target columns.
func (h *ParquetHandler) findColumnIgnoreCase(ctx context.Context, name string) (*parquet.Column, error) {
return newParquetColumnLookup(h.file.Root()).find(ctx, name)
}

func (h *ParquetHandler) prepare(param *ExternalParam) error {
if h.rowGroup == nil && h.file != nil {
if len(h.rowGroups) == 0 {
h.rowGroups = h.file.RowGroups()
}
if len(h.rowGroups) > 0 {
h.rowGroup = parquet.MultiRowGroup(h.rowGroups...)
h.rowGroupRows = h.rowGroup.NumRows()
}
}

h.cols = make([]*parquet.Column, len(param.Cols))
h.mappers = make([]*columnMapper, len(param.Cols))
h.pages = make([]parquet.Pages, len(param.Cols))
h.currentPage = make([]parquet.Page, len(param.Cols))
h.pageOffset = make([]int64, len(param.Cols))
columnLookup := newParquetColumnLookup(h.file.Root())
var rowGroupChunks []parquet.ColumnChunk
if h.rowGroup != nil {
rowGroupChunks = h.rowGroup.ColumnChunks()
}
for _, attr := range param.Attrs {
colIdx := int(attr.ColIndex)
if colIdx < 0 || colIdx >= len(param.Cols) {
Expand All @@ -228,7 +301,7 @@ func (h *ParquetHandler) prepare(param *ExternalParam) error {
h.hasPhysicalCol = true

// Use case-insensitive column lookup (fix for issue #15621)
col, err := h.findColumnIgnoreCase(param.Ctx, attr.ColName)
col, err := columnLookup.find(param.Ctx, attr.ColName)
if err != nil {
return err
}
Expand Down Expand Up @@ -272,7 +345,12 @@ func (h *ParquetHandler) prepare(param *ExternalParam) error {
h.cols[colIdx] = physicalCol
h.mappers[colIdx] = fn
if physicalCol.Leaf() {
h.pages[colIdx] = physicalCol.Pages()
leafIdx := int(physicalCol.Index())
if leafIdx < 0 || leafIdx >= len(rowGroupChunks) {
return moerr.NewInvalidInputf(param.Ctx,
"invalid parquet leaf column index %d for column %s", leafIdx, attr.ColName)
}
h.pages[colIdx] = rowGroupChunks[leafIdx].Pages()
}
}

Expand All @@ -282,7 +360,7 @@ func (h *ParquetHandler) prepare(param *ExternalParam) error {

// init row reader if has nested columns
if h.hasNestedCols {
h.rowReader = parquet.NewReader(h.file)
h.rowReader = h.rowGroup.Rows()
}

return nil
Expand Down Expand Up @@ -2493,6 +2571,10 @@ func (h *ParquetHandler) getData(bat *batch.Batch, param *ExternalParam, proc *p
return h.getDataByPage(bat, param, proc)
}

func (h *ParquetHandler) isFinished() bool {
return h == nil || h.offset >= h.rowGroupRows
}

func (h *ParquetHandler) closePages(ctx context.Context) error {
var firstErr error
for i, pages := range h.pages {
Expand Down Expand Up @@ -2523,12 +2605,11 @@ func (h *ParquetHandler) getDataRowCountOnly(bat *batch.Batch) error {
rowCount = min(h.rowCountRemaining, batchLimit)
h.rowCountRemaining -= rowCount
} else {
rgs := h.file.RowGroups()
if h.currentRowGroup >= len(rgs) {
if h.currentRowGroup >= len(h.rowGroups) {
bat.SetRowCount(0)
return nil
}
total := int(rgs[h.currentRowGroup].NumRows())
total := int(h.rowGroups[h.currentRowGroup].NumRows())
h.currentRowGroup++
rowCount = min(total, batchLimit)
h.rowCountRemaining = total - rowCount
Expand Down Expand Up @@ -2564,7 +2645,11 @@ func (h *ParquetHandler) getDataByPage(bat *batch.Batch, param *ExternalParam, p
page := h.currentPage[colIdx]
if page == nil {
var err error
readStart := time.Now()
page, err = pages.ReadPage()
param.addParquetProfile(process.ParquetProfileStats{
ReadPageTime: time.Since(readStart).Nanoseconds(),
})
switch {
case errors.Is(err, io.EOF):
finish = true
Expand Down Expand Up @@ -2607,7 +2692,11 @@ func (h *ParquetHandler) getDataByPage(bat *batch.Batch, param *ExternalParam, p
h.pageOffset[colIdx] = 0
}

mapStart := time.Now()
err := h.mappers[colIdx].mapping(slicedPage, proc, vec)
param.addParquetProfile(process.ParquetProfileStats{
MapTime: time.Since(mapStart).Nanoseconds(),
})
if err != nil {
return h.closePagesOnError(param.Ctx, err)
}
Expand All @@ -2618,7 +2707,7 @@ func (h *ParquetHandler) getDataByPage(bat *batch.Batch, param *ExternalParam, p
bat.SetRowCount(length)

h.offset += int64(length)
if h.file != nil && h.offset >= h.file.NumRows() {
if h.isFinished() {
finish = true
}

Expand All @@ -2640,6 +2729,7 @@ type fsReaderAt struct {
fs fileservice.ETLFileService
readPath string
ctx context.Context
param *ExternalParam
}

func (r *fsReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
Expand All @@ -2659,7 +2749,11 @@ func (r *fsReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
if err != nil {
return 0, err
}
return int(vec.Entries[0].Size), nil
n = int(vec.Entries[0].Size)
if n > 0 {
r.param.addParquetProfile(process.ParquetProfileStats{BytesRead: int64(n)})
}
return n, nil
}

// parseStringToDecimal64 converts a string to DECIMAL64 with given precision and scale.
Expand Down
14 changes: 13 additions & 1 deletion pkg/sql/colexec/external/parquet_nested.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ import (
"errors"
"fmt"
"io"
"time"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/bytejson"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/sql/plan"
"github.com/matrixorigin/matrixone/pkg/util/trace"
"github.com/matrixorigin/matrixone/pkg/vm/process"
"github.com/parquet-go/parquet-go"
)
Expand All @@ -52,6 +54,16 @@ func (h *ParquetHandler) getNestedMapper(col *parquet.Column, dt plan.Type) *col

// getDataByRow reads data row by row (used when has nested columns)
func (h *ParquetHandler) getDataByRow(bat *batch.Batch, param *ExternalParam, proc *process.Process) error {
_, span := trace.Start(proc.Ctx, "ParquetHandler.getDataByRow")
defer span.End()

rowModeStart := time.Now()
defer func() {
param.addParquetProfile(process.ParquetProfileStats{
RowModeTime: time.Since(rowModeStart).Nanoseconds(),
})
}()

if h.offset > 0 {
if err := h.rowReader.SeekToRow(h.offset); err != nil {
return moerr.ConvertGoError(param.Ctx, err)
Expand All @@ -74,7 +86,7 @@ func (h *ParquetHandler) getDataByRow(bat *batch.Batch, param *ExternalParam, pr
bat.SetRowCount(n)
h.offset += int64(n)

finish := n == 0 || h.offset >= h.file.NumRows()
finish := n == 0 || h.isFinished()
if finish {
h.cleanup()
// File completion (FileFin/End) is now handled by Call's finishCurrentFile
Expand Down
Loading
Loading