Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions cmd/backup-manager/app/compact/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ func (cm *Manager) buildCompactArgs(base64Storage string) []string {
"-N",
strconv.FormatUint(cm.options.Concurrency, 10),
}
if cm.options.Name != "" {
args = append(args, "--name", cm.options.Name)
}

if cm.options.Sharded {
return cm.buildShardedCompactArgs(args)
Expand All @@ -208,17 +211,16 @@ func (cm *Manager) buildShardedCompactArgs(args []string) []string {
args = append(args,
"--cal-shift-ts",
"--physical-file-cache-capacity",
"150G",
cm.options.PhysicalFileCacheCapacity,
)

// When the CR sets EndTs explicitly, honor it as a hard upper bound via
// --until. Otherwise let tikv-ctl resolve until-ts from the replication
// checkpoint stored under the fixed crr-checkpoint sub-prefix.
// Always pass the checkpoint prefix in sharded mode so tikv-ctl can locate
// the replication checkpoint state. When EndTs is set, --until still acts
// as the explicit upper bound.
if cm.options.UntilTS != 0 {
args = append(args, "--until", strconv.FormatUint(cm.options.UntilTS, 10))
} else {
args = append(args, "--crr-checkpoint-prefix", crrCheckpointPrefix)
}
args = append(args, "--crr-checkpoint-prefix", crrCheckpointPrefix)

// --shard tells tikv-ctl this pod's slice of the keyspace partition.
// --minimal-compaction-size=0 disables the small-segment skip so each
Expand Down
67 changes: 44 additions & 23 deletions cmd/backup-manager/app/compact/manager_sharded_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@ func TestBuildCompactArgsShardedMode(t *testing.T) {
manager := &Manager{
compact: &v1alpha1.CompactBackup{},
options: options.CompactOpts{
FromTS: 11,
UntilTS: 22,
Concurrency: 4,
Sharded: true,
ShardIndex: 1,
ShardCount: 3,
FromTS: 11,
UntilTS: 22,
Concurrency: 4,
PhysicalFileCacheCapacity: "200G",
Sharded: true,
ShardIndex: 1,
ShardCount: 3,
},
}

Expand All @@ -74,8 +75,9 @@ func TestBuildCompactArgsShardedMode(t *testing.T) {
"--from", "11",
"-N", "4",
"--cal-shift-ts",
"--physical-file-cache-capacity", "150G",
"--physical-file-cache-capacity", "200G",
"--until", "22",
"--crr-checkpoint-prefix", "crr-checkpoint",
"--shard", "2/3",
"--minimal-compaction-size", "0",
}
Expand All @@ -87,12 +89,13 @@ func TestBuildCompactArgsCRRModeShardedUsesCheckpointPrefix(t *testing.T) {
manager := &Manager{
compact: &v1alpha1.CompactBackup{},
options: options.CompactOpts{
FromTS: 11,
UntilTS: 0,
Concurrency: 4,
Sharded: true,
ShardIndex: 1,
ShardCount: 3,
FromTS: 11,
UntilTS: 0,
Concurrency: 4,
PhysicalFileCacheCapacity: "200G",
Sharded: true,
ShardIndex: 1,
ShardCount: 3,
},
}

Expand All @@ -105,7 +108,7 @@ func TestBuildCompactArgsCRRModeShardedUsesCheckpointPrefix(t *testing.T) {
"--from", "11",
"-N", "4",
"--cal-shift-ts",
"--physical-file-cache-capacity", "150G",
"--physical-file-cache-capacity", "200G",
"--crr-checkpoint-prefix", "crr-checkpoint",
"--shard", "2/3",
"--minimal-compaction-size", "0",
Expand All @@ -115,20 +118,37 @@ func TestBuildCompactArgsCRRModeShardedUsesCheckpointPrefix(t *testing.T) {
}

func TestBuildCompactArgsShardedModeConvertsKubernetesIndexToOneBasedShard(t *testing.T) {
manager := &Manager{
compact: &v1alpha1.CompactBackup{},
options: options.CompactOpts{
FromTS: 11,
UntilTS: 22,
Concurrency: 4,
PhysicalFileCacheCapacity: "200G",
Sharded: true,
ShardIndex: 0,
ShardCount: 3,
},
}

args := manager.buildCompactArgs("storage-base64")
assertStringSliceContainsPair(t, args, "--shard", "1/3")
}

func TestBuildCompactArgsPassesNameWhenConfigured(t *testing.T) {
manager := &Manager{
compact: &v1alpha1.CompactBackup{},
options: options.CompactOpts{
FromTS: 11,
UntilTS: 22,
Name: "compact-task-a",
Concurrency: 4,
Sharded: true,
ShardIndex: 0,
ShardCount: 3,
},
}

args := manager.buildCompactArgs("storage-base64")
assertStringSliceContainsPair(t, args, "--shard", "1/3")

assertStringSliceContainsPair(t, args, "--name", "compact-task-a")
}

func TestSanitizeCompactCommandArgsRedactsStorageBase64Value(t *testing.T) {
Expand Down Expand Up @@ -271,11 +291,12 @@ func newShardedCompactBackupForManagerTest() *v1alpha1.CompactBackup {
Namespace: "default",
},
Spec: v1alpha1.CompactSpec{
StartTs: "400036290571534337",
EndTs: "400036290571534338",
Concurrency: 4,
Mode: v1alpha1.CompactModeSharded,
ShardCount: &shardCount,
StartTs: "400036290571534337",
EndTs: "400036290571534338",
Concurrency: 4,
Mode: v1alpha1.CompactModeSharded,
ShardCount: &shardCount,
PhysicalFileCacheCapacity: "200G",
},
}
}
Expand Down
48 changes: 33 additions & 15 deletions cmd/backup-manager/app/compact/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (
"math"
"os"
"strconv"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/apis/util/config"
"k8s.io/apimachinery/pkg/api/resource"
)

const (
Expand All @@ -30,16 +32,17 @@ const (
)

type CompactOpts struct {
FromTS uint64
UntilTS uint64
Name string
Concurrency uint64
ShardIndex int
ShardCount int
Sharded bool
Namespace string `json:"namespace"`
ResourceName string `json:"resourceName"`
TikvVersion string `json:"tikvVersion"`
FromTS uint64
UntilTS uint64
Name string
Concurrency uint64
PhysicalFileCacheCapacity string
ShardIndex int
ShardCount int
Sharded bool
Namespace string `json:"namespace"`
ResourceName string `json:"resourceName"`
TikvVersion string `json:"tikvVersion"`
}

func ParseCompactOptions(compact *v1alpha1.CompactBackup, opts *CompactOpts) error {
Expand All @@ -55,8 +58,12 @@ func ParseCompactOptions(compact *v1alpha1.CompactBackup, opts *CompactOpts) err
opts.FromTS = startTs
opts.UntilTS = endTs

opts.Name = compact.Name
opts.Name = strings.TrimSpace(compact.Spec.Name)
opts.Concurrency = uint64(compact.Spec.Concurrency)
opts.PhysicalFileCacheCapacity = strings.TrimSpace(compact.Spec.PhysicalFileCacheCapacity)
if opts.PhysicalFileCacheCapacity == "" {
opts.PhysicalFileCacheCapacity = "0"
}
opts.Sharded = compact.Spec.Mode == v1alpha1.CompactModeSharded
opts.ShardIndex = 0
opts.ShardCount = 0
Expand All @@ -81,10 +88,10 @@ func (c *CompactOpts) Verify() error {
if c.FromTS == fromTSUnset {
return errors.New("from-ts must be set")
}
// UntilTS unset is valid only in sharded CCR checkpoint mode: tikv-ctl
// reads the until-ts from the log-backup global checkpoint via
// --crr-checkpoint-prefix. Non-sharded compact keeps the existing
// requirement that EndTs/UntilTS must be set explicitly.
// UntilTS unset is valid only in sharded CCR checkpoint mode: tikv-ctl can
// read the until-ts from the log-backup global checkpoint. Non-sharded
// compact keeps the existing requirement that EndTs/UntilTS must be set
// explicitly.
if c.UntilTS == untilTSUnset && !c.Sharded {
return errors.New("until-ts must be set")
}
Expand All @@ -95,6 +102,17 @@ func (c *CompactOpts) Verify() error {
return errors.Errorf("concurrency %d must be greater than 0", c.Concurrency)
}
if c.Sharded {
c.PhysicalFileCacheCapacity = strings.TrimSpace(c.PhysicalFileCacheCapacity)
if c.PhysicalFileCacheCapacity == "" {
c.PhysicalFileCacheCapacity = "0"
}
capacity, err := resource.ParseQuantity(c.PhysicalFileCacheCapacity)
if err != nil {
return errors.Annotatef(err, "invalid physicalFileCacheCapacity %q", c.PhysicalFileCacheCapacity)
}
if capacity.Sign() < 0 {
return errors.New("physicalFileCacheCapacity must be greater than or equal to 0")
}
if c.ShardCount <= 0 {
return errors.Errorf("shard-count %d must be greater than 0", c.ShardCount)
}
Expand Down
Loading
Loading