diff --git a/pkg/fileservice/s3_fs_test.go b/pkg/fileservice/s3_fs_test.go index 2e65d4402738b..c2c4bf496e9ec 100644 --- a/pkg/fileservice/s3_fs_test.go +++ b/pkg/fileservice/s3_fs_test.go @@ -874,6 +874,90 @@ func TestS3FSRangeReadSkipsFullObjectIOMergeBeforeDiskCacheUpdate(t *testing.T) } } +func TestS3FSRangeReadSkipsPrefetchFullObjectIOMerge(t *testing.T) { + ctx := context.Background() + fs, err := NewS3FS( + ctx, + ObjectStorageArguments{ + Name: "s3", + Endpoint: "disk", + Bucket: t.TempDir(), + KeyPrefix: time.Now().Format("2006-01-02.15:04:05.000000"), + }, + CacheConfig{ + DiskPath: ptrTo(t.TempDir()), + DiskCapacity: ptrTo[toml.ByteSize](1 << 30), + }, + nil, + false, + false, + ) + assert.Nil(t, err) + defer fs.Close(ctx) + + data := bytes.Repeat([]byte("abcd"), 1<<10) + err = fs.Write(ctx, IOVector{ + FilePath: "foo/bar", + Entries: []IOEntry{ + { + Size: int64(len(data)), + Data: data, + }, + }, + Policy: SkipDiskCache | SkipMemoryCache, + }) + assert.Nil(t, err) + + doneMerge, waitMerge := fs.ioMerger.Merge(IOMergeKey{ + Path: "foo/bar", + FullObject: true, + }, maxIOWaitDuration) + assert.NotNil(t, doneMerge) + assert.Nil(t, waitMerge) + releasedMerge := false + releaseMerge := func() { + if !releasedMerge { + doneMerge() + releasedMerge = true + } + } + defer releaseMerge() + + type readResult struct { + data []byte + err error + } + readDone := make(chan readResult, 1) + go func() { + vec := &IOVector{ + FilePath: "foo/bar", + Entries: []IOEntry{ + { + Offset: 5, + Size: 9, + }, + }, + } + err := fs.Read(ctx, vec) + defer vec.Release() + if err != nil { + readDone <- readResult{err: err} + return + } + readDone <- readResult{data: append([]byte(nil), vec.Entries[0].Data...)} + }() + + select { + case result := <-readDone: + assert.Nil(t, result.err) + assert.Equal(t, data[5:14], result.data) + case <-time.After(2 * time.Second): + releaseMerge() + result := <-readDone + t.Fatalf("range read waited for prefetch full-object io merge: %v", result.err) + } +} + func TestS3FSReadFullObjectToDiskCacheStreamingDoesNotOpenReaderWhenCacheExists(t *testing.T) { ctx := context.Background() cache, err := NewDiskCache(ctx, t.TempDir(), fscache.ConstCapacity(1<<20), nil, false, nil, "") @@ -952,6 +1036,118 @@ func TestS3FSReadFullObjectToDiskCacheStreamingReturnsReaderError(t *testing.T) assert.Equal(t, int64(3), vector.Entries[0].Size) } +func TestS3FSReadFullObjectToDiskCacheStreamingReadToEnd(t *testing.T) { + ctx := context.Background() + cache, err := NewDiskCache(ctx, t.TempDir(), fscache.ConstCapacity(1<<20), nil, false, nil, "") + assert.Nil(t, err) + defer cache.Close(ctx) + + fs := &S3FS{ + diskCache: cache, + } + vector := &IOVector{ + FilePath: "foo/bar", + Entries: []IOEntry{ + { + Offset: 6, + Size: -1, + }, + }, + } + + done, err := fs.readFullObjectToDiskCacheStreaming( + ctx, + vector, + "foo/bar", + func(context.Context, *int64, *int64) (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader([]byte("hello world"))), nil + }, + ) + + assert.True(t, done) + assert.Nil(t, err) + assert.False(t, vector.Entries[0].done) + assert.Equal(t, []byte("world"), vector.Entries[0].Data) + assert.Equal(t, int64(5), vector.Entries[0].Size) +} + +func TestS3FSReadFullObjectToDiskCacheStreamingUnexpectedEOF(t *testing.T) { + ctx := context.Background() + cache, err := NewDiskCache(ctx, t.TempDir(), fscache.ConstCapacity(1<<20), nil, false, nil, "") + assert.Nil(t, err) + defer cache.Close(ctx) + + fs := &S3FS{ + diskCache: cache, + } + vector := &IOVector{ + FilePath: "foo/bar", + Entries: []IOEntry{ + { + Offset: 6, + Size: 20, + }, + }, + } + + done, err := fs.readFullObjectToDiskCacheStreaming( + ctx, + vector, + "foo/bar", + func(context.Context, *int64, *int64) (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader([]byte("hello world"))), nil + }, + ) + + assert.True(t, done) + assert.True(t, moerr.IsMoErrCode(err, moerr.ErrUnexpectedEOF)) + assert.False(t, vector.Entries[0].done) + assert.Nil(t, vector.Entries[0].Data) + assert.Equal(t, int64(20), vector.Entries[0].Size) +} + +func TestS3FSShouldStreamFullObjectToDiskCacheExclusions(t *testing.T) { + ctx := context.Background() + cache, err := NewDiskCache(ctx, t.TempDir(), fscache.ConstCapacity(1<<20), nil, false, nil, "") + assert.Nil(t, err) + defer cache.Close(ctx) + + fs := &S3FS{ + diskCache: cache, + } + + assert.False(t, fs.shouldStreamFullObjectToDiskCache(&IOVector{ + FilePath: "foo/bar", + Entries: []IOEntry{ + { + Size: 0, + }, + }, + })) + + buf := new(bytes.Buffer) + assert.False(t, fs.shouldStreamFullObjectToDiskCache(&IOVector{ + FilePath: "foo/bar", + Entries: []IOEntry{ + { + Size: 3, + WriterForRead: buf, + }, + }, + })) + + var reader io.ReadCloser + assert.False(t, fs.shouldStreamFullObjectToDiskCache(&IOVector{ + FilePath: "foo/bar", + Entries: []IOEntry{ + { + Size: 3, + ReadCloserForRead: &reader, + }, + }, + })) +} + type errorReadCloser struct { reader *bytes.Reader err error