diff --git a/compaction.go b/compaction.go index 025e72dfd62..8cd9bdfe3bc 100644 --- a/compaction.go +++ b/compaction.go @@ -152,6 +152,7 @@ const ( compactionKindRewrite compactionKindIngestedFlushable compactionKindBlobFileRewrite + compactionKindPolicyEnforcement // compactionKindVirtualRewrite must be the last compactionKind. // If a new kind has to be added after VirtualRewrite, // update AllCompactionKindStrings() accordingly. @@ -182,6 +183,8 @@ func (k compactionKind) String() string { return "copy" case compactionKindBlobFileRewrite: return "blob-file-rewrite" + case compactionKindPolicyEnforcement: + return "policy-enforcement" case compactionKindVirtualRewrite: return "virtual-sst-rewrite" } @@ -2048,6 +2051,7 @@ func (d *DB) makeCompactionEnvLocked() *compactionEnv { flushing: d.mu.compact.flushing || d.passedFlushThreshold(), rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction, }, + policyEnforcementFiles: &d.mu.compact.spanPolicyEnforcementFiles, } if !d.problemSpans.IsEmpty() { env.problemSpans = &d.problemSpans diff --git a/compaction_picker.go b/compaction_picker.go index adb1ea70840..dd4fef7e932 100644 --- a/compaction_picker.go +++ b/compaction_picker.go @@ -42,6 +42,9 @@ type compactionEnv struct { earliestSnapshotSeqNum base.SeqNum inProgressCompactions []compactionInfo readCompactionEnv readCompactionEnv + // policyEnforcementFiles contains files marked for policy enforcement + // compaction by the background policy enforcer. + policyEnforcementFiles *manifest.MarkedForCompactionSet // problemSpans is checked by the compaction picker to avoid compactions that // overlap an active "problem span". It can be nil when there are no problem // spans. @@ -1562,6 +1565,13 @@ func (p *compactionPickerByScore) pickAutoNonScore(env compactionEnv) (pc picked } } + // Check for files that violate span policies (e.g., compression settings). + if env.policyEnforcementFiles != nil && env.policyEnforcementFiles.Count() > 0 { + if pc := p.pickPolicyEnforcementCompaction(env); pc != nil { + return pc + } + } + return nil } @@ -1712,6 +1722,28 @@ func (p *compactionPickerByScore) pickRewriteCompaction( return nil } +// pickPolicyEnforcementCompaction attempts to construct a compaction that +// rewrites a file marked for policy enforcement. This handles files that +// violate new span policies such as compression settings. The compaction +// outputs files to the same level as the input level. +func (p *compactionPickerByScore) pickPolicyEnforcementCompaction( + env compactionEnv, +) (pc *pickedTableCompaction) { + for candidate, level := range env.policyEnforcementFiles.Ascending() { + if !p.vers.Contains(level, candidate) { + env.policyEnforcementFiles.Delete(candidate, level) + continue + } + if pc := p.pickedCompactionFromCandidateFile(candidate, env, level, level, compactionKindPolicyEnforcement); pc != nil { + // Remove the file from the set since it's now being compacted. + // This prevents picking the same file again. + env.policyEnforcementFiles.Delete(candidate, level) + return pc + } + } + return nil +} + // pickVirtualRewriteCompaction looks for backing tables that have a low percentage // of referenced data and materializes their virtual sstables. func (p *compactionPickerByScore) pickVirtualRewriteCompaction( diff --git a/compaction_scheduler.go b/compaction_scheduler.go index 3abffc234e7..6f8399da2b1 100644 --- a/compaction_scheduler.go +++ b/compaction_scheduler.go @@ -150,6 +150,8 @@ func init() { compactionOptionalAndPriority{optional: true, priority: 20} scheduledCompactionMap[compactionKindRewrite] = compactionOptionalAndPriority{optional: true, priority: 10} + scheduledCompactionMap[compactionKindPolicyEnforcement] = + compactionOptionalAndPriority{optional: true, priority: 5} } // noopGrantHandle is used in cases that don't interact with a CompactionScheduler. diff --git a/compaction_test.go b/compaction_test.go index a2c660a5d77..c94942794c3 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -1507,6 +1507,31 @@ func runCompactionTest( s := blobRewriteLog.String() return s + case "scan-policy-violations": + // Run the span policy enforcer's scan to detect violations and mark files. + if d.opts.Experimental.SpanPolicyFunc == nil { + return "no span policy configured" + } + // Wait for table stats to be loaded so that table properties + // are available for violation detection. + d.waitTableStats() + + enforcer := newSpanPolicyEnforcer(d, SpanPolicyEnforcerOptions{}) + enforcer.scanAll() + return "" + + case "pending-policy-enforcement": + // Show files pending policy enforcement compaction. + d.mu.Lock() + count := d.mu.compact.spanPolicyEnforcementFiles.Count() + var buf strings.Builder + fmt.Fprintf(&buf, "pending: %d\n", count) + for f, level := range d.mu.compact.spanPolicyEnforcementFiles.Ascending() { + fmt.Fprintf(&buf, " L%d: %s\n", level, f.TableNum) + } + d.mu.Unlock() + return buf.String() + case "set-span-policies": var spanPolicies []SpanPolicy for line := range crstrings.LinesSeq(td.Input) { @@ -1553,6 +1578,11 @@ func runCompactionTest( td.Fatalf(t, "parsing minimum-mvcc-garbage-size: %s", err) } policy.ValueStoragePolicy.MinimumMVCCGarbageSize = int(size) + case "prefer-fast-compression": + if len(parts) != 1 { + td.Fatalf(t, "expected prefer-fast-compression with no value, got: %s", arg) + } + policy.PreferFastCompression = true default: td.Fatalf(t, "unknown span policy arg: %s", arg) } @@ -1658,6 +1688,11 @@ func TestCompaction(t *testing.T) { maxVersion: FormatNewest, verbose: true, }, + "policy_enforcement": { + minVersion: FormatNewest, + maxVersion: FormatNewest, + cmp: DefaultComparer, + }, } datadriven.Walk(t, "testdata/compaction", func(t *testing.T, path string) { filename := filepath.Base(path) diff --git a/data_test.go b/data_test.go index cbb5a0c0d37..1d67fd0410f 100644 --- a/data_test.go +++ b/data_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/blobtest" + "github.com/cockroachdb/pebble/internal/compression" "github.com/cockroachdb/pebble/internal/humanize" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" @@ -36,6 +37,7 @@ import ( "github.com/cockroachdb/pebble/objstorage/objstorageprovider" "github.com/cockroachdb/pebble/objstorage/remote" "github.com/cockroachdb/pebble/sstable" + "github.com/cockroachdb/pebble/sstable/block" "github.com/cockroachdb/pebble/sstable/block/blockkind" "github.com/cockroachdb/pebble/sstable/tablefilters/bloom" "github.com/cockroachdb/pebble/valsep" @@ -1895,6 +1897,32 @@ func parseDBOptionsArgs(opts *Options, args []datadriven.CmdArg) error { Secondary: wal.Dir{FS: opts.FS, Dirname: cmdArg.Vals[0]}, } opts.WALFailover.EnsureDefaults() + case "compression": + var profile block.CompressionProfile + switch cmdArg.Vals[0] { + case "zstd": + profile = *block.ZstdCompression + case "snappy": + profile = *block.SnappyCompression + case "none": + profile = *block.NoCompression + case "zstd-force": + // For testing: Zstd with MinReductionPercent=0 so even small + // values are stored compressed. + profile = block.CompressionProfile{ + Name: "test-zstd-force", + DataBlocks: block.SimpleCompressionSetting(compression.ZstdLevel3), + ValueBlocks: block.SimpleCompressionSetting(compression.ZstdLevel3), + OtherBlocks: compression.ZstdLevel3, + MinReductionPercent: 0, + } + default: + return errors.Newf("unrecognized compression %q", cmdArg.Vals[0]) + } + for i := range opts.Levels { + p := profile + opts.Levels[i].Compression = func() *block.CompressionProfile { return &p } + } } } if len(spanPolicies) > 0 { diff --git a/db.go b/db.go index 359357d44ae..dd261151966 100644 --- a/db.go +++ b/db.go @@ -316,6 +316,10 @@ type DB struct { compactionScheduler CompactionScheduler + // spanPolicyEnforcer is the background goroutine that scans the LSM for tables + // that violate the current span policy and marks those files for compaction. + spanPolicyEnforcer *spanPolicyEnforcer + // During an iterator close, we may asynchronously schedule read compactions. // We want to wait for those goroutines to finish, before closing the DB. // compactionShedulers.Wait() should not be called while the DB.mu is held. @@ -467,6 +471,12 @@ type DB struct { // compactions which we might have to perform. readCompactions readCompactionQueue + // spanPolicyEnforcementFiles contains files that have been marked for + // span policy enforcement compaction by the background policy enforcer. + // Unlike MarkedForCompaction, this is not persisted to the manifest + // since the policy enforcer will re-scan on restart. + // TODO(xinhaoz): Create new compaction that will utilize this set. + spanPolicyEnforcementFiles manifest.MarkedForCompactionSet // The cumulative duration of all completed compactions since Open. // Does not include flushes. duration time.Duration @@ -1531,6 +1541,10 @@ func (d *DB) Close() error { // CompactionScheduler will never again call a method on the DB. Note that // this must be called without holding d.mu. d.compactionScheduler.Unregister() + // Stop the background policy enforcer if it was started. + if d.spanPolicyEnforcer != nil { + d.spanPolicyEnforcer.Stop() + } // Lock the commit pipeline for the duration of Close. This prevents a race // with makeRoomForWrite. Rotating the WAL in makeRoomForWrite requires // dropping d.mu several times for I/O. If Close only holds d.mu, an diff --git a/internal/manifest/scan_cursor.go b/internal/manifest/scan_cursor.go index 30e28d7e4a3..ede5d2bf63a 100644 --- a/internal/manifest/scan_cursor.go +++ b/internal/manifest/scan_cursor.go @@ -72,7 +72,7 @@ func MakeScanCursor(f *TableMetadata, level int) ScanCursor { // been processed. // // The cursor is positioned such that the file would be considered "before" the -// cursor (i.e., cursor.Compare(MakeScanCursorAtFile(f)) > 0). +// cursor (i.e., cursor.Compare(MakeScanCursor(f, level)) > 0). func MakeScanCursorAfterFile(f *TableMetadata, level int) ScanCursor { return ScanCursor{ Level: level, @@ -81,10 +81,10 @@ func MakeScanCursorAfterFile(f *TableMetadata, level int) ScanCursor { } } -// FileIsAfterCursor returns true if the given file is strictly after the cursor +// FileIsAfterCursor returns true if the given file is at or after the cursor // position. This is useful for skipping files that have already been processed. func (c *ScanCursor) FileIsAfterCursor(cmp base.Compare, f *TableMetadata, level int) bool { - return c.Compare(cmp, MakeScanCursorAfterFile(f, level)) < 0 + return c.Compare(cmp, MakeScanCursor(f, level)) <= 0 } // NextExternalFile returns the first external file after the cursor, returning @@ -131,6 +131,21 @@ func (c *ScanCursor) NextExternalFileOnLevel( return first } +// firstFileInLevelIter returns the first file at or after the cursor position +// in the given level iterator. It is assumed that the iterator corresponds to +// c.Level. +func (c *ScanCursor) firstFileInLevelIter(cmp base.Compare, it *LevelIterator) *TableMetadata { + if len(c.Key) == 0 { + return it.First() + } + f := it.SeekGE(cmp, c.Key) + // Skip files that are before the cursor position. + for f != nil && !c.FileIsAfterCursor(cmp, f, c.Level) { + f = it.Next() + } + return f +} + // FirstExternalFileInLevelIter finds the first external file after the cursor // but which starts before the endBound. It is assumed that the iterator // corresponds to cursor.Level. @@ -140,12 +155,7 @@ func (c *ScanCursor) FirstExternalFileInLevelIter( it LevelIterator, endBound base.UserKeyBoundary, ) *TableMetadata { - f := it.SeekGE(cmp, c.Key) - // Skip the file if it starts before cursor.Key or is at that same key with lower - // sequence number. - for f != nil && !c.FileIsAfterCursor(cmp, f, c.Level) { - f = it.Next() - } + f := c.firstFileInLevelIter(cmp, &it) for ; f != nil && endBound.IsUpperBoundFor(cmp, f.Smallest().UserKey); f = it.Next() { if f.Virtual && objstorage.IsExternalTable(objProvider, f.TableBacking.DiskFileNum) { return f @@ -153,3 +163,43 @@ func (c *ScanCursor) FirstExternalFileInLevelIter( } return nil } + +// NextFile returns the first file at or after the cursor, returning the file and the +// level. If no such file exists, returns nil. +func (c *ScanCursor) NextFile(cmp base.Compare, v *Version) (_ *TableMetadata, level int) { + for !c.AtEnd() { + if f := c.nextFileOnLevel(cmp, v); f != nil { + return f, c.Level + } + // Go to the next level. + c.Key = nil + c.SeqNum = 0 + c.Level++ + } + return nil, NumLevels +} + +// nextFileOnLevel returns the first file on c.Level which is at or after the +// cursor position. +func (c *ScanCursor) nextFileOnLevel(cmp base.Compare, v *Version) *TableMetadata { + if c.Level > 0 { + it := v.Levels[c.Level].Iter() + return c.firstFileInLevelIter(cmp, &it) + } + // For L0, we look at all sublevel iterators and take the first file + // (ordered by Smallest.UserKey, then by SeqNums.High). + var first *TableMetadata + var firstCursor ScanCursor + for _, sublevel := range v.L0SublevelFiles { + it := sublevel.Iter() + f := c.firstFileInLevelIter(cmp, &it) + if f != nil { + fc := MakeScanCursor(f, c.Level) + if first == nil || fc.Compare(cmp, firstCursor) < 0 { + first = f + firstCursor = fc + } + } + } + return first +} diff --git a/internal/manifest/scan_cursor_test.go b/internal/manifest/scan_cursor_test.go index fe5d7526828..3859988b3f6 100644 --- a/internal/manifest/scan_cursor_test.go +++ b/internal/manifest/scan_cursor_test.go @@ -78,6 +78,15 @@ func TestScanCursor(t *testing.T) { } fmt.Fprintf(&buf, " %s\n", &cursor) + case "reset": + // Reset cursor to a specific level without bounds. + level := 0 + if len(fields) > 1 { + fmt.Sscanf(fields[1], "level=%d", &level) + } + cursor = ScanCursor{Level: level} + fmt.Fprintf(&buf, " %s\n", &cursor) + case "next-external-file": f, level := cursor.NextExternalFile(cmp, objProvider, bounds, vers) if f != nil { @@ -104,6 +113,32 @@ func TestScanCursor(t *testing.T) { cursor = MakeScanCursorAfterFile(f, level) } + case "next-file": + f, level := cursor.NextFile(cmp, vers) + if f != nil { + // Verify that cursor still points to this file. + f2, level2 := cursor.NextFile(cmp, vers) + if f != f2 { + td.Fatalf(t, "NextFile returned different file") + } + if level != level2 { + td.Fatalf(t, "NextFile returned different level") + } + cursor = MakeScanCursorAfterFile(f, level) + } + fmt.Fprintf(&buf, " file: %v level: %d\n", f, level) + + case "iterate-files": + for { + f, level := cursor.NextFile(cmp, vers) + if f == nil { + fmt.Fprintf(&buf, " no more files\n") + break + } + fmt.Fprintf(&buf, " file: %v level: %d\n", f, level) + cursor = MakeScanCursorAfterFile(f, level) + } + default: td.Fatalf(t, "unknown cursor command %q", cmd) } diff --git a/internal/manifest/testdata/scan_cursor b/internal/manifest/testdata/scan_cursor index a9c20376994..2fe3a2b567f 100644 --- a/internal/manifest/testdata/scan_cursor +++ b/internal/manifest/testdata/scan_cursor @@ -46,6 +46,22 @@ iterate-external-files: file: 000005:[b#1,SET-c#1,SET] level: 2 no more files +# Test NextFile with multiple L0 sublevels (reuses the version above). +# Files are ordered by (Smallest.UserKey, SeqNums.High). +cursor lower=a upper=z +reset level=0 +iterate-files +---- +reset: + level=0 key="" seqNum=0 +iterate-files: + file: 000004:[a#1,SET-c#1,SET] level: 0 + file: 000001:[a#1,SET-a#1,SET] level: 0 + file: 000002:[b#1,SET-d#1,SET] level: 0 + file: 000003:[e#1,SET-g#1,SET] level: 0 + file: 000005:[b#1,SET-c#1,SET] level: 2 + no more files + # Verify that non-external files are skipped. define L0: @@ -142,3 +158,78 @@ next-external-file: file: 000005:[d1#1,SET-g#1,SET] level: 2 next-external-file: file: level: 7 + +# Test NextFile: iterate through all files (not just external). +# L0 files are ordered by (Smallest.UserKey, SeqNums.High). +# Note: L0 files must be defined in decreasing seqnum order in input. +define +L0: + 2:[c#1,SET-d#1,SET] seqnums:[#90-#100] + 1:[a#1,SET-b#1,SET] seqnums:[#40-#50] +L1: + 3:[e#1,SET-f#1,SET] + 4:[g#1,SET-h#1,SET] +---- +L0.0: + 000001:[a#1,SET-b#1,SET] seqnums:[#40-#50] points:[a#1,SET-b#1,SET] + 000002:[c#1,SET-d#1,SET] seqnums:[#90-#100] points:[c#1,SET-d#1,SET] +L1: + 000003:[e#1,SET-f#1,SET] seqnums:[#0-#0] points:[e#1,SET-f#1,SET] + 000004:[g#1,SET-h#1,SET] seqnums:[#0-#0] points:[g#1,SET-h#1,SET] + +# Iterate all files: L0 by (Smallest.UserKey, SeqNums.High), then L1 by key. +cursor lower=a upper=z +reset level=0 +iterate-files +---- +reset: + level=0 key="" seqNum=0 +iterate-files: + file: 000001:[a#1,SET-b#1,SET] level: 0 + file: 000002:[c#1,SET-d#1,SET] level: 0 + file: 000003:[e#1,SET-f#1,SET] level: 1 + file: 000004:[g#1,SET-h#1,SET] level: 1 + no more files + +# Test NextFile: L1 and L2 only (no L0). +define +L1: + 1:[a#1,SET-b#1,SET] + 2:[c#1,SET-d#1,SET] +L2: + 3:[e#1,SET-f#1,SET] +---- +L1: + 000001:[a#1,SET-b#1,SET] seqnums:[#0-#0] points:[a#1,SET-b#1,SET] + 000002:[c#1,SET-d#1,SET] seqnums:[#0-#0] points:[c#1,SET-d#1,SET] +L2: + 000003:[e#1,SET-f#1,SET] seqnums:[#0-#0] points:[e#1,SET-f#1,SET] + +cursor lower=a upper=z +reset level=0 +iterate-files +---- +reset: + level=0 key="" seqNum=0 +iterate-files: + file: 000001:[a#1,SET-b#1,SET] level: 1 + file: 000002:[c#1,SET-d#1,SET] level: 1 + file: 000003:[e#1,SET-f#1,SET] level: 2 + no more files + +# Test NextFile: skip empty levels. +define +L2: + 3:[e#1,SET-f#1,SET] +---- +L2: + 000003:[e#1,SET-f#1,SET] seqnums:[#0-#0] points:[e#1,SET-f#1,SET] + +cursor lower=a upper=z +reset level=1 +next-file +---- +reset: + level=1 key="" seqNum=0 +next-file: + file: 000003:[e#1,SET-f#1,SET] level: 2 diff --git a/metrics.go b/metrics.go index b3c820b5503..41533d6ce4d 100644 --- a/metrics.go +++ b/metrics.go @@ -385,18 +385,19 @@ type Metrics struct { // CompactMetrics contains metric related to compaction activity. type CompactMetrics struct { // The total number of compactions, and per-compaction type counts. - Count int64 - DefaultCount int64 - DeleteOnlyCount int64 - ElisionOnlyCount int64 - CopyCount int64 - MoveCount int64 - ReadCount int64 - TombstoneDensityCount int64 - RewriteCount int64 - MultiLevelCount int64 - BlobFileRewriteCount int64 - VirtualRewriteCount int64 + Count int64 + DefaultCount int64 + DeleteOnlyCount int64 + ElisionOnlyCount int64 + CopyCount int64 + MoveCount int64 + ReadCount int64 + TombstoneDensityCount int64 + RewriteCount int64 + MultiLevelCount int64 + BlobFileRewriteCount int64 + VirtualRewriteCount int64 + PolicyEnforcementCount int64 // An estimate of the number of bytes that need to be compacted for the LSM // to reach a stable state. EstimatedDebt uint64 diff --git a/open.go b/open.go index 97e9049432a..4971e3051db 100644 --- a/open.go +++ b/open.go @@ -481,6 +481,12 @@ func Open(dirname string, opts *Options) (db *DB, err error) { d.maybeScheduleFlush() d.maybeScheduleCompaction() + // Start the background policy enforcer if enabled. + if opts.Experimental.SpanPolicyEnforcerOptions != nil { + d.spanPolicyEnforcer = newSpanPolicyEnforcer(d, *opts.Experimental.SpanPolicyEnforcerOptions) + d.spanPolicyEnforcer.Start() + } + // Locks in RecoveryDirLocks can be closed as soon as we've finished opening // the database. if err = rs.dirs.RecoveryDirLocks.Close(); err != nil { diff --git a/options.go b/options.go index d54fdc7a807..6498acf2535 100644 --- a/options.go +++ b/options.go @@ -789,6 +789,13 @@ type Options struct { // SpanPolicyFunc is used to determine the SpanPolicy for a key region. SpanPolicyFunc SpanPolicyFunc + // SpanPolicyEnforcerOptions configures the background policy enforcer that + // scans the LSM for span policy violations. If nil, the policy enforcer + // is disabled. + // + // Default is nil (disabled). + SpanPolicyEnforcerOptions *SpanPolicyEnforcerOptions + // VirtualTableRewriteUnreferencedFraction configures the minimum fraction of // unreferenced data in a backing table required to trigger a virtual table // rewrite compaction. This is calculated as the ratio of unreferenced diff --git a/span_policy_enforcer.go b/span_policy_enforcer.go new file mode 100644 index 00000000000..15e367d8138 --- /dev/null +++ b/span_policy_enforcer.go @@ -0,0 +1,292 @@ +// Copyright 2026 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package pebble + +import ( + "sync" + "time" + + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/compression" + "github.com/cockroachdb/pebble/internal/manifest" +) + +// spanPolicyEnforcer is a background process that periodically scans the LSM to +// detect span policy violations and marks those files for compaction. +// +// Scan rate is determined by checkInterval. Scanning pauses when there are +// pending files violating policies marked for compaction. +type spanPolicyEnforcer struct { + db *DB + cmp base.Compare + + // checkInterval is the time between file checks. + checkInterval time.Duration + + // cursor tracks the current position in the LSM scan. + cursor manifest.ScanCursor + + // stopCh is closed to signal the background goroutine to stop. + stopCh chan struct{} + // wg is used to wait for the background goroutine to finish. + wg sync.WaitGroup +} + +// SpanPolicyEnforcerOptions contains options for the policy enforcer. +type SpanPolicyEnforcerOptions struct { + // CheckInterval is the time between file checks. + // + // Default: 1 second. + CheckInterval time.Duration +} + +// DefaultPolicyEnforcerOptions returns the default options. +func DefaultPolicyEnforcerOptions() SpanPolicyEnforcerOptions { + return SpanPolicyEnforcerOptions{ + CheckInterval: time.Second, + } +} + +// newSpanPolicyEnforcer creates a new policy enforcer. +func newSpanPolicyEnforcer(db *DB, opts SpanPolicyEnforcerOptions) *spanPolicyEnforcer { + if opts.CheckInterval <= 0 { + opts.CheckInterval = time.Second + } + enforcer := &spanPolicyEnforcer{ + db: db, + cmp: db.cmp, + checkInterval: opts.CheckInterval, + cursor: manifest.ScanCursor{Level: 0}, + stopCh: make(chan struct{}), + } + return enforcer +} + +// Start begins the background policy enforcement goroutine. +func (s *spanPolicyEnforcer) Start() { + s.wg.Add(1) + go s.run() +} + +// Stop stops the background policy enforcement goroutine and waits for it to +// finish. +func (s *spanPolicyEnforcer) Stop() { + close(s.stopCh) + // Broadcast in case the goroutine is waiting on db.mu.compact.cond. + s.db.mu.Lock() + s.db.mu.compact.cond.Broadcast() + s.db.mu.Unlock() + s.wg.Wait() +} + +// isStopped returns true if Stop has been called. +func (s *spanPolicyEnforcer) isStopped() bool { + select { + case <-s.stopCh: + return true + default: + return false + } +} + +// run is the main loop for the policy enforcer. +func (s *spanPolicyEnforcer) run() { + defer s.wg.Done() + timer := time.NewTimer(0) + defer timer.Stop() + + for { + // Sleep to maintain the target rate. + if !s.waitForInterval(timer, s.checkInterval) { + return + } + + nextFile, level, endOfScan := s.getNextFile(true /* waitForPendingWork */) + if endOfScan { + continue + } + if nextFile == nil { + // File was compacting, skip it. + continue + } + + // Check if this file violates any policy. + if s.checkPolicyViolation(nextFile) { + s.markForEnforcement(nextFile, level) + } + } +} + +// scanAll performs a single full scan of the LSM, checking all files for +// policy violations. +func (s *spanPolicyEnforcer) scanAll() { + for { + nextFile, level, endOfScan := s.getNextFile(false /* waitForPendingWork */) + if endOfScan { + return + } + if nextFile == nil { + continue + } + if s.checkPolicyViolation(nextFile) { + s.markForEnforcement(nextFile, level) + } + } +} + +// getNextFile finds the file to check for policy violations, advances the cursor past +// the file, and returns the file. If waitForPendingWork is true, it blocks until there +// are no files marked for policy enforcement waiting to be compacted. +func (s *spanPolicyEnforcer) getNextFile( + waitForPendingWork bool, +) (f *manifest.TableMetadata, level int, endOfScan bool) { + s.db.mu.Lock() + defer s.db.mu.Unlock() + + if waitForPendingWork { + // Wait until there's no pending compaction work from the enforcer + // to prevent us from queuing up too much work at one time. + for s.db.mu.compact.spanPolicyEnforcementFiles.Count() > 0 && !s.isStopped() { + s.db.mu.compact.cond.Wait() + } + // Check if we're shutting down. + if s.isStopped() { + return nil, 0, true + } + } + + vers := s.db.mu.versions.currentVersion() + nextFile, level := s.cursor.NextFile(s.cmp, vers) + if nextFile == nil { + // Reached end of scan. Reset cursor. + s.cursor = manifest.ScanCursor{Level: 0} + return nil, 0, true + } + + // Advance cursor past this file. + s.cursor = manifest.MakeScanCursorAfterFile(nextFile, level) + + // If the file is already compacting, skip it. + if nextFile.IsCompacting() { + return nil, 0, false + } + + return nextFile, level, false +} + +// waitForInterval waits for the interval or returns false if stopCh is closed. +func (s *spanPolicyEnforcer) waitForInterval(timer *time.Timer, interval time.Duration) bool { + timer.Reset(interval) + select { + case <-s.stopCh: + return false + case <-timer.C: + return true + } +} + +// checkPolicyViolation checks if a file violates any span policies across its +// key range. +func (s *spanPolicyEnforcer) checkPolicyViolation(f *manifest.TableMetadata) bool { + // SpanPolicyFunc may be nil if not configured. + if s.db.opts.Experimental.SpanPolicyFunc == nil { + return false + } + + props, ok := f.TableBacking.Properties() + if !ok { + // Properties not yet populated; skip this file for now. + return false + } + + // Get the file's key bounds and iterate over all policies that apply. + // In practice, there should only be one policy that applies to a file. + bounds := f.UserKeyBounds() + for { + policy, err := s.db.opts.Experimental.SpanPolicyFunc(bounds) + if err != nil { + // On error, skip further policy checks for this file. + s.db.opts.Logger.Errorf( + "policy enforcer: span policy lookup failed for %s: %v", + f.TableNum, + err, + ) + break + } + + // Check compression policy for this span. + if s.checkCompressionViolation(props, policy) { + return true + } + + // TODO(xinhaoz): Check other policy violations here (tiering, file size). + + if len(policy.KeyRange.End) == 0 { + // Policy extends to the end of the keyspace. + break + } + cmpResult := s.cmp(bounds.End.Key, policy.KeyRange.End) + if cmpResult < 0 || (cmpResult == 0 && bounds.End.Kind == base.Exclusive) { + // Policy covers the key range. + break + } + bounds.Start = policy.KeyRange.End + } + + return false +} + +// checkCompressionViolation checks if a file violates the compression policy. +// When PreferFastCompression is true, the file should only use the designated +// fast compression algorithms (Snappy or MinLZ) or no compression. Using Zstd +// or other slower compression algorithms is a violation. +func (s *spanPolicyEnforcer) checkCompressionViolation( + props *manifest.TableBackingProperties, policy base.SpanPolicy, +) bool { + if !policy.PreferFastCompression { + return false + } + + // If using PreferFastCompression, the file should only use fast compression + // for all blocks. + // Check compression settings used in this file. A violation occurs if + // any blocks use a non-fast compression algorithm. Fast compression means + // Snappy or MinLZ; Zstd is considered slow. + for setting := range props.CompressionStats.All() { + switch setting.Algorithm { + case compression.NoAlgorithm, compression.Snappy, compression.MinLZ: + continue + default: + // Unknown or other algorithms are also considered violations. + return true + } + } + return false +} + +// markForEnforcement marks a file for policy enforcement compaction. +func (s *spanPolicyEnforcer) markForEnforcement(f *manifest.TableMetadata, level int) { + s.db.mu.Lock() + defer s.db.mu.Unlock() + + vers := s.db.mu.versions.currentVersion() + + // Verify the file still exists at this level in the current version. + // The file may have been compacted away between detection and marking. + if !vers.Contains(level, f) { + return + } + + // Check if file is already marked. + if s.db.mu.compact.spanPolicyEnforcementFiles.Contains(f, level) { + return + } + + // Mark the file for policy enforcement. + s.db.mu.compact.spanPolicyEnforcementFiles.Insert(f, level) + + // Trigger compaction scheduling. + s.db.maybeScheduleCompaction() +} diff --git a/span_policy_enforcer_test.go b/span_policy_enforcer_test.go new file mode 100644 index 00000000000..9f46c5182be --- /dev/null +++ b/span_policy_enforcer_test.go @@ -0,0 +1,87 @@ +// Copyright 2026 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package pebble + +import ( + "testing" + + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/manifest" + "github.com/cockroachdb/pebble/sstable" + "github.com/cockroachdb/pebble/sstable/block" + "github.com/stretchr/testify/require" +) + +func makeTestTableMeta(tableNum int, smallest, largest string) *manifest.TableMetadata { + meta := &manifest.TableMetadata{ + TableNum: base.TableNum(tableNum), + Size: 1, + } + meta.SeqNums.Low = base.SeqNum(tableNum) + meta.SeqNums.High = base.SeqNum(tableNum) + meta.LargestSeqNumAbsolute = base.SeqNum(tableNum) + smallestKey := base.MakeInternalKey([]byte(smallest), meta.SeqNums.Low, base.InternalKeyKindSet) + largestKey := base.MakeInternalKey([]byte(largest), meta.SeqNums.High, base.InternalKeyKindSet) + meta.ExtendPointKeyBounds(base.DefaultComparer.Compare, smallestKey, largestKey) + meta.InitPhysicalBacking() + return meta +} + +func TestSpanPolicyEnforcerCompressionViolation(t *testing.T) { + enforcer := &spanPolicyEnforcer{} + fastPolicy := base.SpanPolicy{PreferFastCompression: true} + noPolicy := base.SpanPolicy{} + + for _, tc := range []struct { + stats string + policy base.SpanPolicy + violation bool + }{ + {"Zstd1:100/200", fastPolicy, true}, + {"Zstd1:100/200", noPolicy, false}, + {"Snappy:100/200", fastPolicy, false}, + {"MinLZ1:100/200", fastPolicy, false}, + {"None:100", fastPolicy, false}, + {"Snappy:50/100,Zstd1:50/100", fastPolicy, true}, + } { + compressionStats, err := block.ParseCompressionStats(tc.stats) + if err != nil { + t.Fatalf("error parsing compression stats: %v", err) + } + backingProps := &manifest.TableBackingProperties{CompressionStats: compressionStats} + require.Equal(t, tc.violation, enforcer.checkCompressionViolation(backingProps, tc.policy), + "stats=%s", tc.stats) + } +} + +func TestSpanPolicyEnforcerCheckPolicyViolation(t *testing.T) { + opts := &Options{Comparer: base.DefaultComparer} + opts.Experimental.SpanPolicyFunc = func(bounds base.UserKeyBounds) (base.SpanPolicy, error) { + // Policy changes at "m": no preference before, fast preference after. + if string(bounds.Start) < "m" { + return base.SpanPolicy{KeyRange: base.KeyRange{Start: []byte("a"), End: []byte("m")}}, nil + } + return base.SpanPolicy{KeyRange: base.KeyRange{Start: []byte("m"), End: []byte("z")}, PreferFastCompression: true}, nil + } + db := &DB{opts: opts, cmp: base.DefaultComparer.Compare} + enforcer := &spanPolicyEnforcer{db: db, cmp: base.DefaultComparer.Compare} + + testCases := []struct { + tableMeta *manifest.TableMetadata + compressionStats string + expectedViolation bool + }{ + // File in first span (no compression preference) - no violation. + {makeTestTableMeta(1, "a", "l"), "Zstd1:100/200", false}, + // File in second span (fast compression) - violation. + {makeTestTableMeta(2, "n", "z"), "Zstd1:100/200", true}, + // File spanning both policies - violation in second policy. + {makeTestTableMeta(3, "a", "z"), "Zstd1:100/200", true}, + } + for _, tc := range testCases { + tc.tableMeta.TableBacking.PopulateProperties(&sstable.Properties{CompressionStats: tc.compressionStats}) + require.Equal(t, tc.expectedViolation, enforcer.checkPolicyViolation(tc.tableMeta)) + } +} diff --git a/testdata/compaction/policy_enforcement b/testdata/compaction/policy_enforcement new file mode 100644 index 00000000000..e269f8420a7 --- /dev/null +++ b/testdata/compaction/policy_enforcement @@ -0,0 +1,140 @@ +# Test span policy enforcer detecting compression violations and triggering +# policy enforcement compactions. + +# Create files with Zstd compression using zstd-force (MinReductionPercent=0). +# The span policy requires fast compression for keys >= "m". +# We use long repetitive values to ensure they compress with Zstd. + +define compression=zstd-force +L1 + a#10,SET:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa b#10,SET:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb +L1 + m#10,SET:mmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmm n#10,SET:nnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnn +L2 + x#5,SET:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx z#5,SET:zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz +---- +L1: + 000004:[a#10,SET-b#10,SET] + 000005:[m#10,SET-n#10,SET] +L2: + 000006:[x#5,SET-z#5,SET] + +# Configure span policy: keys >= "m" require fast compression. +set-span-policies +m,zzz prefer-fast-compression +---- + +# Initially no pending enforcement files. +pending-policy-enforcement +---- +pending: 0 + +# Run the enforcer scan to detect violations and mark files. +# Files 000005 (m-n) and 000006 (x-z) are in the policy span and use Zstd. +scan-policy-violations +---- + +# Verify files are now pending enforcement. The enforcer's run loop would pause +# here until these are processed. +pending-policy-enforcement +---- +pending: 2 + L2: 000006 + L1: 000005 + +# Run compaction. The scheduler should pick up policy enforcement compactions. +auto-compact +---- +L1: + 000004:[a#10,SET-b#10,SET] + 000005:[m#10,SET-n#10,SET] +L2: + 000007:[x#0,SET-z#0,SET] + +# After auto-compact processes enforcement compactions, pending should be cleared. +# (auto-compact may process one or both files depending on scheduling) +pending-policy-enforcement +---- +pending: 0 + +# Scan again to check for any remaining violations. +scan-policy-violations +---- + +# One file may still violate policy if not recompacted in first pass. +pending-policy-enforcement +---- +pending: 1 + L1: 000005 + +auto-compact +---- +L1: + 000004:[a#10,SET-b#10,SET] + 000008:[m#0,SET-n#0,SET] +L2: + 000007:[x#0,SET-z#0,SET] + +# All enforcement compactions complete. +pending-policy-enforcement +---- +pending: 0 + +# Test interaction with manual compaction: a file marked for enforcement +# gets moved by a manual compaction. Since the move keeps the same file +# reference, the enforcement mark remains valid. + +define compression=zstd-force +L1 + a#10,SET:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa b#10,SET:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb +L2 + m#10,SET:mmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmm n#10,SET:nnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnn +L3 + x#5,SET:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx z#5,SET:zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz +---- +L1: + 000004:[a#10,SET-b#10,SET] +L2: + 000005:[m#10,SET-n#10,SET] +L3: + 000006:[x#5,SET-z#5,SET] + +# Set policy and scan to mark files. +set-span-policies +m,zzz prefer-fast-compression +---- + +scan-policy-violations +---- + +# Manually compact the L2 file. This moves 000005 to L3 but keeps the same +# file reference, so the enforcement mark remains valid. +compact m-o L2 +---- +L1: + 000004:[a#10,SET-b#10,SET] +L3: + 000005:[m#10,SET-n#10,SET] + 000006:[x#5,SET-z#5,SET] + +# Auto-compact picks up the enforcement compaction for one of the marked files. +# File 000006 is outside the policy span (x-z < m), so only 000005 was marked. +auto-compact +---- +L1: + 000004:[a#10,SET-b#10,SET] +L3: + 000005:[m#10,SET-n#10,SET] + 000007:[x#0,SET-z#0,SET] + +# Scan and compact again. This should compact file 000005. +scan-policy-violations +---- + +auto-compact +---- +L1: + 000004:[a#10,SET-b#10,SET] +L3: + 000008:[m#0,SET-n#0,SET] + 000007:[x#0,SET-z#0,SET] diff --git a/tool/logs/compaction.go b/tool/logs/compaction.go index 141d2dbbba9..0faefec74ce 100644 --- a/tool/logs/compaction.go +++ b/tool/logs/compaction.go @@ -321,6 +321,7 @@ const ( compactionTypeRewrite compactionTypeBlobRewrite compactionTypeVirtualRewrite + compactionTypePolicyEnforcement ) // String implements fmt.Stringer. @@ -346,6 +347,8 @@ func (c compactionType) String() string { return "blob-rewrite" case compactionTypeVirtualRewrite: return "virtual-sst-rewrite" + case compactionTypePolicyEnforcement: + return "policy-enforcement" default: panic(errors.Newf("unknown compaction type: %s", c)) } @@ -375,6 +378,8 @@ func parseCompactionType(s string) (t compactionType, err error) { t = compactionTypeBlobRewrite case "virtual-sst-rewrite": t = compactionTypeVirtualRewrite + case "policy-enforcement": + t = compactionTypePolicyEnforcement default: err = errors.Newf("unknown compaction type: %s", s) } diff --git a/version_set.go b/version_set.go index 50e8f0e62c9..f752b8e4e24 100644 --- a/version_set.go +++ b/version_set.go @@ -783,6 +783,9 @@ func (vs *versionSet) incrementCompactions( case compactionKindVirtualRewrite: vs.metrics.Compact.VirtualRewriteCount++ + case compactionKindPolicyEnforcement: + vs.metrics.Compact.PolicyEnforcementCount++ + default: if invariants.Enabled { panic("unhandled compaction kind")