Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
branches:
- master
- crl-release-*
- disagg
pull_request:
branches:
- master
Expand Down Expand Up @@ -32,19 +33,6 @@ jobs:

- run: make test generate

linux-crossversion:
name: go-linux-crossversion
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2

- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: "1.18"

- run: make crossversion-meta

linux-race:
name: go-linux-race
runs-on: ubuntu-latest
Expand Down
11 changes: 1 addition & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ GOFLAGS :=
STRESSFLAGS :=
TAGS := invariants
TESTS := .
LATEST_RELEASE := $(shell git fetch origin && git branch -r --list '*/crl-release-*' | grep -o 'crl-release-.*$$' | sort | tail -1)

.PHONY: all
all:
Expand All @@ -24,7 +23,7 @@ test:
${GO} test -mod=vendor -tags '$(TAGS)' ${testflags} -run ${TESTS} ${PKG}

.PHONY: testrace
testrace: testflags += -race -timeout 20m
testrace: testflags += -v -race -timeout 20m
testrace: test

.PHONY: stress stressrace
Expand All @@ -38,14 +37,6 @@ stressmeta: override STRESSFLAGS += -p 1
stressmeta: override TESTS = TestMeta$$
stressmeta: stress

.PHONY: crossversion-meta
crossversion-meta:
git checkout ${LATEST_RELEASE}; \
${GO} test -c ./internal/metamorphic -o './internal/metamorphic/crossversion/${LATEST_RELEASE}.test'; \
git checkout -; \
${GO} test -c ./internal/metamorphic -o './internal/metamorphic/crossversion/head.test'; \
${GO} test -tags '$(TAGS)' ${testflags} -v -run 'TestMetaCrossVersion' ./internal/metamorphic/crossversion --version '${LATEST_RELEASE},${LATEST_RELEASE},${LATEST_RELEASE}.test' --version 'HEAD,HEAD,./head.test'

.PHONY: generate
generate:
${GO} generate -mod=vendor ${PKG}
Expand Down
159 changes: 140 additions & 19 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"runtime/pprof"
"sort"
"strings"
"sync"
"sync/atomic"
"time"

Expand All @@ -28,6 +29,8 @@ import (
"github.com/cockroachdb/pebble/vfs"
)

const sharedLevel = 5

var errEmptyTable = errors.New("pebble: empty table")
var errFlushInvariant = errors.New("pebble: flush next log number is unset")

Expand Down Expand Up @@ -2149,6 +2152,43 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
return err
}

func moveFileToSharedFS(filepath string, fs vfs.FS, sharedPath string, sharedFS vfs.FS) error {
file, err := fs.Open(filepath, vfs.SequentialReadsOption)
if err != nil {
return err
}
defer func() {
if file != nil {
_ = file.Close()
}
}()
destFile, err := sharedFS.Create(sharedPath)
if err != nil {
return err
}
defer func() {
_ = destFile.Close()
}()

_, err = io.Copy(destFile, file)
if err != nil {
return err
}

if err := file.Close(); err != nil {
return err
}
file = nil
if err := fs.Remove(filepath); err != nil {
return err
}
return nil
}

// this function just copies the overall boundaries of keys but
// it can be injected by tests
var setSharedSSTMetadata func(meta *manifest.FileMetadata, creatorID uint32)

// runCompactions runs a compaction that produces new on-disk tables from
// memtables or old on-disk tables.
//
Expand Down Expand Up @@ -2251,6 +2291,7 @@ func (d *DB) runCompaction(
var (
filenames []string
tw *sstable.Writer
movers sync.WaitGroup
)
defer func() {
if iter != nil {
Expand Down Expand Up @@ -2534,6 +2575,26 @@ func (d *DB) runCompaction(
meta.ExtendRangeKeyBounds(d.cmp, writerMeta.SmallestRangeKey, writerMeta.LargestRangeKey)
}

// If the output SSTable falls in lower levels than sharedLevel, it will be moved to the shared
// file system asynchronously
if d.opts.SharedFS != nil && c.outputLevel.level >= sharedLevel {
if writerMeta.HasRangeKeys {
panic("runCompaction: shared sst does not support range keys")
}

setSharedSSTMetadata(meta, d.opts.UniqueID)

oldFilename := base.MakeFilepath(d.opts.FS, d.dirname, fileTypeTable, meta.FileNum)
sharedFilename := base.MakeSharedSSTPath(d.opts.SharedFS, d.opts.SharedDir, meta.CreatorUniqueID, meta.PhysicalFileNum)
movers.Add(1)
go func() {
defer movers.Done()
if err := moveFileToSharedFS(oldFilename, d.opts.FS, sharedFilename, d.opts.SharedFS); err == nil {
meta.IsShared = true
}
}()
}

// Verify that the sstable bounds fall within the compaction input
// bounds. This is a sanity check that we don't have a logic error
// elsewhere that causes the sstable bounds to accidentally expand past the
Expand Down Expand Up @@ -2723,6 +2784,7 @@ func (d *DB) runCompaction(
}] = f
}
}
movers.Wait()

if err := d.dataDir.Sync(); err != nil {
return nil, pendingOutputs, err
Expand Down Expand Up @@ -2918,15 +2980,22 @@ func (d *DB) deleteObsoleteFiles(jobID int, waitForOngoing bool) {

// obsoleteFile holds information about a file that needs to be deleted soon.
type obsoleteFile struct {
dir string
fileNum base.FileNum
fileType fileType
fileSize uint64
dir string
fileNum base.FileNum
fileType fileType
fileSize uint64
skipMetrics bool
isShared bool
creatorUniqueID uint32
physicalFileNum base.FileNum
}

type fileInfo struct {
fileNum FileNum
fileSize uint64
fileNum FileNum
fileSize uint64
isShared bool
creatorUniqueID uint32
physicalFileNum base.FileNum
}

// d.mu must be held when calling this, but the mutex may be dropped and
Expand Down Expand Up @@ -2955,9 +3024,21 @@ func (d *DB) doDeleteObsoleteFiles(jobID int) {
}

for _, table := range d.mu.versions.obsoleteTables {
// consider the file was created locally by default
physicalFileNum := table.FileNum
creatorUniqueID := d.opts.UniqueID
if table.IsShared {
// if the file is shared, then use its metadata regardless of it
// is local or foreign
physicalFileNum = table.PhysicalFileNum
creatorUniqueID = table.CreatorUniqueID
}
obsoleteTables = append(obsoleteTables, fileInfo{
fileNum: table.FileNum,
fileSize: table.Size,
fileNum: table.FileNum,
fileSize: table.Size,
isShared: table.IsShared,
creatorUniqueID: creatorUniqueID,
physicalFileNum: physicalFileNum,
})
}
d.mu.versions.obsoleteTables = nil
Expand Down Expand Up @@ -3017,10 +3098,13 @@ func (d *DB) doDeleteObsoleteFiles(jobID int) {
}

filesToDelete = append(filesToDelete, obsoleteFile{
dir: dir,
fileNum: fi.fileNum,
fileType: f.fileType,
fileSize: fi.fileSize,
dir: dir,
fileNum: fi.fileNum,
fileType: f.fileType,
fileSize: fi.fileSize,
isShared: fi.isShared,
creatorUniqueID: fi.creatorUniqueID,
physicalFileNum: fi.physicalFileNum,
})
}
}
Expand All @@ -3047,13 +3131,21 @@ func (d *DB) paceAndDeleteObsoleteFiles(jobID int, files []obsoleteFile) {
for _, of := range files {
path := base.MakeFilepath(d.opts.FS, of.dir, of.fileType, of.fileNum)
if of.fileType == fileTypeTable {
if of.isShared {
path = base.MakeSharedSSTPath(d.opts.SharedFS, d.opts.SharedDir, of.creatorUniqueID, of.physicalFileNum)
if d.persistentCache != nil {
d.persistentCache.MarkDeleted(of.fileNum)
}
}
_ = pacer.maybeThrottle(of.fileSize)
d.mu.Lock()
d.mu.versions.metrics.Table.ObsoleteCount--
d.mu.versions.metrics.Table.ObsoleteSize -= of.fileSize
d.mu.Unlock()
if !of.skipMetrics {
d.mu.Lock()
d.mu.versions.metrics.Table.ObsoleteCount--
d.mu.versions.metrics.Table.ObsoleteSize -= of.fileSize
d.mu.Unlock()
}
}
d.deleteObsoleteFile(of.fileType, jobID, path, of.fileNum)
d.deleteObsoleteFile(of.fileType, jobID, path, of.fileNum, of.isShared)
}
}

Expand Down Expand Up @@ -3082,10 +3174,21 @@ func (d *DB) maybeScheduleObsoleteTableDeletion() {
}

// deleteObsoleteFile deletes file that is no longer needed.
func (d *DB) deleteObsoleteFile(fileType fileType, jobID int, path string, fileNum FileNum) {
func (d *DB) deleteObsoleteFile(
fileType fileType, jobID int, path string, fileNum FileNum, isShared bool,
) {
// TODO(peter): need to handle this error, probably by re-adding the
// file that couldn't be deleted to one of the obsolete slices map.
err := d.opts.Cleaner.Clean(d.opts.FS, fileType, path)
fs := d.opts.FS
if isShared {
fs = d.opts.SharedFS
}
var err error
err = nil
//TODO(chen): really delete obsolete files in shared fs after refcnt is implemented
if !isShared {
err = d.opts.Cleaner.Clean(fs, fileType, path)
}
if oserror.IsNotExist(err) {
return
}
Expand Down Expand Up @@ -3154,3 +3257,21 @@ func mergeFileMetas(a, b []*fileMetadata) []*fileMetadata {
}
return a[:n]
}

func init() {
// Inject the shared sst metadata setting function here.
// Since the result is created by the current Pebble instance,
// it can access all the data in the table
setSharedSSTMetadata = func(meta *manifest.FileMetadata, creatorUniqueID uint32) {
// The output sst is shared so update its boundaries
meta.FileSmallest, meta.FileLargest = meta.Smallest.Clone(), meta.Largest.Clone()

// assign virtual boundaries for all boundary properties
lb, ub := meta.Smallest.Clone(), meta.Largest.Clone()
meta.Smallest, meta.Largest = lb, ub
meta.SmallestPointKey, meta.LargestPointKey = lb, ub

meta.CreatorUniqueID = creatorUniqueID
meta.PhysicalFileNum = meta.FileNum
}
}
4 changes: 3 additions & 1 deletion compaction_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/rangekey"
"github.com/cockroachdb/pebble/sstable"
)

// compactionIter provides a forward-only iterator that encapsulates the logic
Expand Down Expand Up @@ -886,5 +887,6 @@ func (i *compactionIter) maybeZeroSeqnum(snapshotIdx int) {
// This is not the last snapshot
return
}
i.key.SetSeqNum(0)
// Not really zeroing out the SeqNum but set it to the smallest possible one
i.key.SetSeqNum(sstable.SeqNumZero)
}
6 changes: 4 additions & 2 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2016,6 +2016,7 @@ func TestCompactionDeleteOnlyHints(t *testing.T) {
if err != nil {
return err.Error()
}
seqNum += sstable.SeqNumZero
d.mu.Lock()
var s *Snapshot
l := &d.mu.snapshots
Expand Down Expand Up @@ -2150,6 +2151,7 @@ func TestCompactionTombstones(t *testing.T) {
if err != nil {
return err.Error()
}
seqNum += sstable.SeqNumZero
d.mu.Lock()
var s *Snapshot
l := &d.mu.snapshots
Expand Down Expand Up @@ -2802,7 +2804,7 @@ func TestCompactionErrorCleanup(t *testing.T) {
require.NoError(t, w.Set([]byte(k), nil))
}
require.NoError(t, w.Close())
require.NoError(t, d.Ingest([]string{"ext"}))
require.NoError(t, d.Ingest([]string{"ext"}, nil))
}
ingest("a", "c")
ingest("b")
Expand Down Expand Up @@ -3713,7 +3715,7 @@ func TestCompaction_LogAndApplyFails(t *testing.T) {
require.NoError(t, w.Set(key, nil))
require.NoError(t, w.Close())
// Ingest the SST.
return db.Ingest([]string{fName})
return db.Ingest([]string{fName}, nil)
}

testCases := []struct {
Expand Down
Loading