Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pkg/vm/engine/tae/tables/jobs/flushTableTail.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,10 +859,10 @@ func (task *flushTableTailTask) flushAObjsForSnapshot(ctx context.Context, isTom
dataVer.Batch,
task.Name(),
)
subtasks[i] = aobjectTask
if err = task.rt.Scheduler.Schedule(aobjectTask); err != nil {
return
}
subtasks[i] = aobjectTask
}
return
}
Expand All @@ -884,7 +884,6 @@ func (task *flushTableTailTask) waitFlushAObjForSnapshot(ctx context.Context, su
if err = subtask.WaitDone(ictx); err != nil {
return moerr.AttachCause(ictx, err)
}
subtask.done = true
stat := subtask.stat.Clone()
if err = handles[i].UpdateStats(*stat); err != nil {
return
Expand Down
47 changes: 26 additions & 21 deletions pkg/vm/engine/tae/tables/jobs/flushobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ package jobs
import (
"context"
"math/rand"
"sync"
"time"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
Expand All @@ -46,7 +46,7 @@ type flushObjTask struct {
createAt time.Time
partentTask string

done bool
mu sync.Mutex
}

func NewFlushObjTask(
Expand Down Expand Up @@ -86,20 +86,28 @@ func NewFlushObjTask(
func (task *flushObjTask) Scope() *common.ID { return task.meta.AsCommonID() }

func (task *flushObjTask) Execute(ctx context.Context) (err error) {
task.mu.Lock()
data := task.data
task.data = nil
task.mu.Unlock()
if data == nil {
// Parent aborted this task before it started.
return nil
}
defer func() {
if data != nil {
data.Close()
}
}()

if v := ctx.Value(TestFlushBailoutPos1{}); v != nil {
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
}
waitT := time.Since(task.createAt)
seg := task.meta.ID().Segment()
name := objectio.BuildObjectName(seg, 0)
task.name = name
cnBatch := containers.ToCNBatch(task.data)
for _, vec := range cnBatch.Vecs {
if vec == nil {
// this task has been canceled
return nil
}
}
cnBatch := containers.ToCNBatch(data)
arena := objectio.GetArena(objectio.ArenaSmall)
defer func() {
arena.Reset()
Expand Down Expand Up @@ -142,6 +150,8 @@ func (task *flushObjTask) Execute(ctx context.Context) (err error) {
if err != nil {
return err
}
data.Close()
data = nil
copyT := time.Since(inst)
inst = time.Now()
task.blocks, _, err = writer.Sync(ctx)
Expand All @@ -168,17 +178,12 @@ func (task *flushObjTask) release() {
if task == nil {
return
}
if !task.done {
ctx, cancel := context.WithTimeoutCause(
context.Background(),
10*time.Second,
moerr.CauseReleaseFlushObjTasks,
)
defer cancel()
task.WaitDone(ctx)
}

if task.data != nil {
task.data.Close()
task.mu.Lock()
data := task.data
task.data = nil
task.mu.Unlock()
if data != nil {
// Only pending data can be released here; running tasks own their data.
data.Close()
}
}
Loading