Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestCheckpoint(t *testing.T) {
}
memLog.Reset()
d := dbs[td.CmdArgs[0].String()]
if err := d.Compact(nil, []byte("\xff"), false); err != nil {
if err := d.Compact(nil, []byte("\xff"), false, 7 /* maxLevel */); err != nil {
return err.Error()
}
return memLog.String()
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestCheckpointCompaction(t *testing.T) {
defer cancel()
defer wg.Done()
for ctx.Err() == nil {
if err := d.Compact([]byte("key"), []byte("key999999"), false); err != nil {
if err := d.Compact([]byte("key"), []byte("key999999"), false, 7 /* maxLevel */); err != nil {
t.Error(err)
return
}
Expand Down
2 changes: 1 addition & 1 deletion cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestCleaner(t *testing.T) {
return "compact <db>"
}
d := dbs[td.CmdArgs[0].String()]
if err := d.Compact(nil, []byte("\xff"), false); err != nil {
if err := d.Compact(nil, []byte("\xff"), false, 7 /* maxLevel */); err != nil {
return err.Error()
}
return memLog.String()
Expand Down
16 changes: 8 additions & 8 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ func TestCompactionCPUGranter(t *testing.T) {
defer d.Close()

d.Set([]byte{'a'}, []byte{'a'}, nil)
err = d.Compact([]byte{'a'}, []byte{'b'}, true)
err = d.Compact([]byte{'a'}, []byte{'b'}, true, 7 /* maxLevel */)
if err != nil {
t.Fatalf("Compact: %v", err)
}
Expand All @@ -951,7 +951,7 @@ func TestCompactionCPUGranterDefault(t *testing.T) {
defer d.Close()

d.Set([]byte{'a'}, []byte{'a'}, nil)
err = d.Compact([]byte{'a'}, []byte{'b'}, true)
err = d.Compact([]byte{'a'}, []byte{'b'}, true, 7 /* maxLevel */)
if err != nil {
t.Fatalf("Compact: %v", err)
}
Expand Down Expand Up @@ -2953,7 +2953,7 @@ func TestCompactionErrorCleanup(t *testing.T) {
d.mu.Lock()
initialSetupDone = true
d.mu.Unlock()
err = d.Compact([]byte("a"), []byte("d"), false)
err = d.Compact([]byte("a"), []byte("d"), false, 7 /* maxLevel */)
require.Error(t, err, "injected error")

d.mu.Lock()
Expand Down Expand Up @@ -3285,7 +3285,7 @@ func TestCompactFlushQueuedMemTableAndFlushMetrics(t *testing.T) {
}
}

require.NoError(t, d.Compact([]byte("a"), []byte("a\x00"), false))
require.NoError(t, d.Compact([]byte("a"), []byte("a\x00"), false, 7 /* maxLevel */))
d.mu.Lock()
require.Equal(t, 1, len(d.mu.mem.queue))
d.mu.Unlock()
Expand Down Expand Up @@ -3340,7 +3340,7 @@ func TestCompactFlushQueuedLargeBatch(t *testing.T) {
require.Greater(t, len(d.mu.mem.queue), 1)
d.mu.Unlock()

require.NoError(t, d.Compact([]byte("a"), []byte("a\x00"), false))
require.NoError(t, d.Compact([]byte("a"), []byte("a\x00"), false, 7 /* maxLevel */))
d.mu.Lock()
require.Equal(t, 1, len(d.mu.mem.queue))
d.mu.Unlock()
Expand Down Expand Up @@ -3464,9 +3464,9 @@ func TestCompactionInvalidBounds(t *testing.T) {
}).WithFSDefaults())
require.NoError(t, err)
defer db.Close()
require.NoError(t, db.Compact([]byte("a"), []byte("b"), false))
require.Error(t, db.Compact([]byte("a"), []byte("a"), false))
require.Error(t, db.Compact([]byte("b"), []byte("a"), false))
require.NoError(t, db.Compact([]byte("a"), []byte("b"), false, 7 /* maxLevel */))
require.Error(t, db.Compact([]byte("a"), []byte("a"), false, 7 /* maxLevel */))
require.Error(t, db.Compact([]byte("b"), []byte("a"), false, 7 /* maxLevel */))
}

func Test_calculateInuseKeyRanges(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ func runCompactCmd(td *datadriven.TestData, d *DB) error {
}
return d.manualCompact(iStart.UserKey, iEnd.UserKey, level, parallelize)
}
return d.Compact([]byte(parts[0]), []byte(parts[1]), parallelize)
return d.Compact([]byte(parts[0]), []byte(parts[1]), parallelize, 7 /* maxLevel */)
}

// runDBDefineCmd prepares a database state, returning the opened
Expand Down
11 changes: 8 additions & 3 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1523,8 +1523,10 @@ func (d *DB) Close() error {
return err
}

// Compact the specified range of keys in the database.
func (d *DB) Compact(start, end []byte, parallelize bool) error {
// Compact the specified range of keys in the database. maxLevel is an exclusive
// paramter. If maxLevel is 4, then compactions may be performed from
// L3 -> L4, but not L4 -> L5.
func (d *DB) Compact(start, end []byte, parallelize bool, maxLevel int) error {
if err := d.closed.Load(); err != nil {
panic(err)
}
Expand Down Expand Up @@ -1591,7 +1593,10 @@ func (d *DB) Compact(start, end []byte, parallelize bool) error {
<-mem.flushed
}

for level := 0; level < maxLevelWithFiles; {
if maxLevel > maxLevelWithFiles {
maxLevel = maxLevelWithFiles
}
for level := 0; level < maxLevel; {
if err := d.manualCompact(
iStart.UserKey, iEnd.UserKey, level, parallelize); err != nil {
return err
Expand Down
10 changes: 5 additions & 5 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ func TestCacheEvict(t *testing.T) {
require.NoError(t, d.Delete(key, nil))
}

require.NoError(t, d.Compact([]byte("0"), []byte("1"), false))
require.NoError(t, d.Compact([]byte("0"), []byte("1"), false, 7 /* maxLevel */))

require.NoError(t, d.Close())

Expand Down Expand Up @@ -1051,7 +1051,7 @@ func TestRollManifest(t *testing.T) {
// adds 9 files in edits. We still need 6 more files in edits based on the
// last snapshot. But the current version has only 9 L0 files and 1 L6 file,
// for a total of 10 files. So 1 flush should push us over that threshold.
d.Compact([]byte("c"), []byte("d"), false)
d.Compact([]byte("c"), []byte("d"), false, 7 /* maxLevel */)
lastSnapshotCount, editsSinceSnapshotCount = sizeRolloverState()
require.EqualValues(t, 16, lastSnapshotCount)
require.EqualValues(t, 10, editsSinceSnapshotCount)
Expand Down Expand Up @@ -1085,7 +1085,7 @@ func TestDBClosed(t *testing.T) {

require.True(t, errors.Is(catch(func() { _ = d.Close() }), ErrClosed))

require.True(t, errors.Is(catch(func() { _ = d.Compact(nil, nil, false) }), ErrClosed))
require.True(t, errors.Is(catch(func() { _ = d.Compact(nil, nil, false, 7 /* maxLevel */) }), ErrClosed))
require.True(t, errors.Is(catch(func() { _ = d.Flush() }), ErrClosed))
require.True(t, errors.Is(catch(func() { _, _ = d.AsyncFlush() }), ErrClosed))

Expand Down Expand Up @@ -1124,7 +1124,7 @@ func TestDBConcurrentCommitCompactFlush(t *testing.T) {
var err error
switch i % 3 {
case 0:
err = d.Compact(nil, []byte("\xff"), false)
err = d.Compact(nil, []byte("\xff"), false, 7 /* maxLevel */)
case 1:
err = d.Flush()
case 2:
Expand Down Expand Up @@ -1228,7 +1228,7 @@ func TestCloseCleanerRace(t *testing.T) {
it := db.NewIter(nil)
require.NotNil(t, it)
require.NoError(t, db.DeleteRange([]byte("a"), []byte("b"), Sync))
require.NoError(t, db.Compact([]byte("a"), []byte("b"), false))
require.NoError(t, db.Compact([]byte("a"), []byte("b"), false, 7 /* maxLevel */))
// Only the iterator is keeping the sstables alive.
files, err := mem.List("/")
require.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestErrors(t *testing.T) {
if err := d.Flush(); err != nil {
return err
}
if err := d.Compact(nil, []byte("\xff"), false); err != nil {
if err := d.Compact(nil, []byte("\xff"), false, 7 /* maxLevel */); err != nil {
return err
}

Expand Down Expand Up @@ -181,7 +181,7 @@ func TestRequireReadError(t *testing.T) {
require.NoError(t, d.Set(key1, value, nil))
require.NoError(t, d.Set(key2, value, nil))
require.NoError(t, d.Flush())
require.NoError(t, d.Compact(key1, key2, false))
require.NoError(t, d.Compact(key1, key2, false, 7 /* maxLevel */))
require.NoError(t, d.DeleteRange(key1, key2, nil))
require.NoError(t, d.Set(key1, value, nil))
require.NoError(t, d.Flush())
Expand Down Expand Up @@ -284,7 +284,7 @@ func TestCorruptReadError(t *testing.T) {
require.NoError(t, d.Set(key1, value, nil))
require.NoError(t, d.Set(key2, value, nil))
require.NoError(t, d.Flush())
require.NoError(t, d.Compact(key1, key2, false))
require.NoError(t, d.Compact(key1, key2, false, 7 /* maxLevel */))
require.NoError(t, d.DeleteRange(key1, key2, nil))
require.NoError(t, d.Set(key1, value, nil))
require.NoError(t, d.Flush())
Expand Down
2 changes: 1 addition & 1 deletion event_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestEventListener(t *testing.T) {
if err := d.Set([]byte("a"), nil, nil); err != nil {
return err.Error()
}
if err := d.Compact([]byte("a"), []byte("b"), false); err != nil {
if err := d.Compact([]byte("a"), []byte("b"), false, 7 /* maxLevel */); err != nil {
return err.Error()
}
return memLog.String()
Expand Down
2 changes: 1 addition & 1 deletion format_major_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func testBasicDB(d *DB) error {
if err := d.Flush(); err != nil {
return err
}
if err := d.Compact(nil, []byte("\xff"), false); err != nil {
if err := d.Compact(nil, []byte("\xff"), false, 7 /* maxLevel */); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ func TestIngest(t *testing.T) {
}
l := td.CmdArgs[0].Key
r := td.CmdArgs[1].Key
err := d.Compact([]byte(l), []byte(r), false)
err := d.Compact([]byte(l), []byte(r), false, 7 /* maxLevel */)
if err != nil {
return err.Error()
}
Expand Down Expand Up @@ -1133,7 +1133,7 @@ func TestConcurrentIngestCompact(t *testing.T) {

compact := func(start, end string) {
t.Helper()
require.NoError(t, d.Compact([]byte(start), []byte(end), false))
require.NoError(t, d.Compact([]byte(start), []byte(end), false, 7 /* maxLevel */))
}

lsm := func() string {
Expand Down
2 changes: 1 addition & 1 deletion internal/metamorphic/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ type compactOp struct {

func (o *compactOp) run(t *test, h historyRecorder) {
err := withRetries(func() error {
return t.db.Compact(o.start, o.end, o.parallelize)
return t.db.Compact(o.start, o.end, o.parallelize, 7 /* maxLevel */)
})
h.Recordf("%s // %v", o, err)
}
Expand Down
2 changes: 1 addition & 1 deletion iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2359,7 +2359,7 @@ func BenchmarkBlockPropertyFilter(b *testing.B) {
}
require.NoError(b, batch.Commit(nil))
require.NoError(b, d.Flush())
require.NoError(b, d.Compact(nil, []byte{0xFF}, false))
require.NoError(b, d.Compact(nil, []byte{0xFF}, false, 7 /* maxLevel */))

for _, filter := range []bool{false, true} {
b.Run(fmt.Sprintf("filter=%t", filter), func(b *testing.B) {
Expand Down
2 changes: 1 addition & 1 deletion metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func TestMetricsWAmpDisableWAL(t *testing.T) {
require.NoError(t, d.Set(testkeys.Key(ks, j), v, &wo))
}
require.NoError(t, d.Flush())
require.NoError(t, d.Compact([]byte("a"), []byte("z"), false /* parallelize */))
require.NoError(t, d.Compact([]byte("a"), []byte("z"), false /* parallelize */, 7 /* maxLevel */))
}
m := d.Metrics()
tot := m.Total()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ func TestTracing(t *testing.T) {
require.NoError(t, d.Flush())
require.NoError(t, d.Set([]byte("c"), []byte("ccc"), nil))
require.NoError(t, d.Flush())
require.NoError(t, d.Compact([]byte("a"), []byte("z"), false /* parallelize */))
require.NoError(t, d.Compact([]byte("a"), []byte("z"), false /* parallelize */, 7 /* maxLevel */))
require.NoError(t, d.Set([]byte("b"), []byte("bbb2"), nil))
require.NoError(t, d.Set([]byte("c"), []byte("ccc2"), nil))
require.NoError(t, d.Set([]byte("d"), []byte("ddd"), nil))
require.NoError(t, d.Flush())
require.NoError(t, d.Compact([]byte("a"), []byte("z"), false /* parallelize */))
require.NoError(t, d.Compact([]byte("a"), []byte("z"), false /* parallelize */, 7 /* maxLevel */))
require.NoError(t, d.Close())

collectEvents := func() []Event {
Expand Down
6 changes: 3 additions & 3 deletions open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestErrorIfNotPristine(t *testing.T) {
opts.ErrorIfNotPristine = false
d2, err := Open("", opts)
require.NoError(t, err)
require.NoError(t, d2.Compact([]byte("a"), []byte("z"), false /* parallelize */))
require.NoError(t, d2.Compact([]byte("a"), []byte("z"), false /* parallelize */, 7 /* maxLevel */))
require.NoError(t, d2.Close())

opts.ErrorIfNotPristine = true
Expand Down Expand Up @@ -440,7 +440,7 @@ func TestOpenReadOnly(t *testing.T) {
require.NoError(t, err)

// Verify various write operations fail in read-only mode.
require.EqualValues(t, ErrReadOnly, d.Compact(nil, []byte("\xff"), false))
require.EqualValues(t, ErrReadOnly, d.Compact(nil, []byte("\xff"), false, 7 /* maxLevel */))
require.EqualValues(t, ErrReadOnly, d.Flush())
require.EqualValues(t, ErrReadOnly, func() error { _, err := d.AsyncFlush(); return err }())

Expand Down Expand Up @@ -964,7 +964,7 @@ func TestOpenWALReplayReadOnlySeqNums(t *testing.T) {
// written to the MANIFEST. This produces a MANIFEST where the `logSeqNum`
// is greater than the sequence numbers contained in the
// `minUnflushedLogNum` log file
require.NoError(t, d.Compact([]byte("a"), []byte("a\x00"), false))
require.NoError(t, d.Compact([]byte("a"), []byte("a\x00"), false, 7 /* maxLevel */))
d.mu.Lock()
for d.mu.compact.compactingCount > 0 {
d.mu.compact.cond.Wait()
Expand Down
20 changes: 10 additions & 10 deletions range_del_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,15 +311,15 @@ func TestRangeDelCompactionTruncation(t *testing.T) {
require.NoError(t, d.DeleteRange([]byte("a"), []byte("d"), nil))

// Compact to produce the L1 tables.
require.NoError(t, d.Compact([]byte("c"), []byte("c\x00"), false))
require.NoError(t, d.Compact([]byte("c"), []byte("c\x00"), false, 7 /* maxLevel */))
expectLSM(`
1:
000008:[a#12,RANGEDEL-b#inf,RANGEDEL]
000009:[b#12,RANGEDEL-d#inf,RANGEDEL]
`)

// Compact again to move one of the tables to L2.
require.NoError(t, d.Compact([]byte("c"), []byte("c\x00"), false))
require.NoError(t, d.Compact([]byte("c"), []byte("c\x00"), false, 7 /* maxLevel */))
expectLSM(`
1:
000008:[a#12,RANGEDEL-b#inf,RANGEDEL]
Expand Down Expand Up @@ -368,7 +368,7 @@ func TestRangeDelCompactionTruncation(t *testing.T) {
// containing "c" will be compacted again with the L2 table creating two
// tables in L2. Lastly, the L2 table containing "c" will be compacted
// creating the L3 table.
require.NoError(t, d.Compact([]byte("c"), []byte("c\x00"), false))
require.NoError(t, d.Compact([]byte("c"), []byte("c\x00"), false, 7 /* maxLevel */))
if formatVersion < FormatSetWithDelete {
expectLSM(`
1:
Expand Down Expand Up @@ -461,14 +461,14 @@ func TestRangeDelCompactionTruncation2(t *testing.T) {
require.NoError(t, d.DeleteRange([]byte("a"), []byte("d"), nil))

// Compact to produce the L1 tables.
require.NoError(t, d.Compact([]byte("b"), []byte("b\x00"), false))
require.NoError(t, d.Compact([]byte("b"), []byte("b\x00"), false, 7 /* maxLevel */))
expectLSM(`
6:
000009:[a#12,RANGEDEL-d#inf,RANGEDEL]
`)

require.NoError(t, d.Set([]byte("c"), bytes.Repeat([]byte("d"), 100), nil))
require.NoError(t, d.Compact([]byte("c"), []byte("c\x00"), false))
require.NoError(t, d.Compact([]byte("c"), []byte("c\x00"), false, 7 /* maxLevel */))
expectLSM(`
6:
000012:[a#12,RANGEDEL-c#inf,RANGEDEL]
Expand Down Expand Up @@ -534,7 +534,7 @@ func TestRangeDelCompactionTruncation3(t *testing.T) {

// Compact a few times to move the tables down to L3.
for i := 0; i < 3; i++ {
require.NoError(t, d.Compact([]byte("b"), []byte("b\x00"), false))
require.NoError(t, d.Compact([]byte("b"), []byte("b\x00"), false, 7 /* maxLevel */))
}
expectLSM(`
3:
Expand All @@ -543,15 +543,15 @@ func TestRangeDelCompactionTruncation3(t *testing.T) {

require.NoError(t, d.Set([]byte("c"), bytes.Repeat([]byte("d"), 100), nil))

require.NoError(t, d.Compact([]byte("c"), []byte("c\x00"), false))
require.NoError(t, d.Compact([]byte("c"), []byte("c\x00"), false, 7 /* maxLevel */))
expectLSM(`
3:
000013:[a#12,RANGEDEL-c#inf,RANGEDEL]
4:
000014:[c#13,SET-d#inf,RANGEDEL]
`)

require.NoError(t, d.Compact([]byte("c"), []byte("c\x00"), false))
require.NoError(t, d.Compact([]byte("c"), []byte("c\x00"), false, 7 /* maxLevel */))
expectLSM(`
3:
000013:[a#12,RANGEDEL-c#inf,RANGEDEL]
Expand All @@ -563,7 +563,7 @@ func TestRangeDelCompactionTruncation3(t *testing.T) {
t.Fatalf("expected not found, but found %v", err)
}

require.NoError(t, d.Compact([]byte("a"), []byte("a\x00"), false))
require.NoError(t, d.Compact([]byte("a"), []byte("a\x00"), false, 7 /* maxLevel */))
expectLSM(`
4:
000013:[a#12,RANGEDEL-c#inf,RANGEDEL]
Expand Down Expand Up @@ -649,7 +649,7 @@ func benchmarkRangeDelIterate(b *testing.B, entries, deleted int, snapshotCompac
}

if snapshotCompact {
require.NoError(b, d.Compact(makeKey(0), makeKey(entries), false))
require.NoError(b, d.Compact(makeKey(0), makeKey(entries), false, 7 /* maxLevel */))
}

b.ResetTimer()
Expand Down
2 changes: 1 addition & 1 deletion snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestSnapshot(t *testing.T) {
if len(keys) != 2 {
return fmt.Sprintf("malformed key range: %s", parts[1])
}
err = d.Compact([]byte(keys[0]), []byte(keys[1]), false)
err = d.Compact([]byte(keys[0]), []byte(keys[1]), false, 7 /* maxLevel */)
default:
return fmt.Sprintf("unknown op: %s", parts[0])
}
Expand Down
Loading