diff --git a/pkg/vm/engine/tae/tables/jobs/flushTableTail.go b/pkg/vm/engine/tae/tables/jobs/flushTableTail.go index 0d99979b279e0..4a242494a91a9 100644 --- a/pkg/vm/engine/tae/tables/jobs/flushTableTail.go +++ b/pkg/vm/engine/tae/tables/jobs/flushTableTail.go @@ -859,10 +859,10 @@ func (task *flushTableTailTask) flushAObjsForSnapshot(ctx context.Context, isTom dataVer.Batch, task.Name(), ) + subtasks[i] = aobjectTask if err = task.rt.Scheduler.Schedule(aobjectTask); err != nil { return } - subtasks[i] = aobjectTask } return } @@ -884,7 +884,6 @@ func (task *flushTableTailTask) waitFlushAObjForSnapshot(ctx context.Context, su if err = subtask.WaitDone(ictx); err != nil { return moerr.AttachCause(ictx, err) } - subtask.done = true stat := subtask.stat.Clone() if err = handles[i].UpdateStats(*stat); err != nil { return diff --git a/pkg/vm/engine/tae/tables/jobs/flushobj.go b/pkg/vm/engine/tae/tables/jobs/flushobj.go index 1cd2bc2c7a7df..5754efe2d87b6 100644 --- a/pkg/vm/engine/tae/tables/jobs/flushobj.go +++ b/pkg/vm/engine/tae/tables/jobs/flushobj.go @@ -17,9 +17,9 @@ package jobs import ( "context" "math/rand" + "sync" "time" - "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" @@ -46,7 +46,7 @@ type flushObjTask struct { createAt time.Time partentTask string - done bool + mu sync.Mutex } func NewFlushObjTask( @@ -86,6 +86,20 @@ func NewFlushObjTask( func (task *flushObjTask) Scope() *common.ID { return task.meta.AsCommonID() } func (task *flushObjTask) Execute(ctx context.Context) (err error) { + task.mu.Lock() + data := task.data + task.data = nil + task.mu.Unlock() + if data == nil { + // Parent aborted this task before it started. + return nil + } + defer func() { + if data != nil { + data.Close() + } + }() + if v := ctx.Value(TestFlushBailoutPos1{}); v != nil { time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond) } @@ -93,13 +107,7 @@ func (task *flushObjTask) Execute(ctx context.Context) (err error) { seg := task.meta.ID().Segment() name := objectio.BuildObjectName(seg, 0) task.name = name - cnBatch := containers.ToCNBatch(task.data) - for _, vec := range cnBatch.Vecs { - if vec == nil { - // this task has been canceled - return nil - } - } + cnBatch := containers.ToCNBatch(data) arena := objectio.GetArena(objectio.ArenaSmall) defer func() { arena.Reset() @@ -142,6 +150,8 @@ func (task *flushObjTask) Execute(ctx context.Context) (err error) { if err != nil { return err } + data.Close() + data = nil copyT := time.Since(inst) inst = time.Now() task.blocks, _, err = writer.Sync(ctx) @@ -168,17 +178,12 @@ func (task *flushObjTask) release() { if task == nil { return } - if !task.done { - ctx, cancel := context.WithTimeoutCause( - context.Background(), - 10*time.Second, - moerr.CauseReleaseFlushObjTasks, - ) - defer cancel() - task.WaitDone(ctx) - } - - if task.data != nil { - task.data.Close() + task.mu.Lock() + data := task.data + task.data = nil + task.mu.Unlock() + if data != nil { + // Only pending data can be released here; running tasks own their data. + data.Close() } }