Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1755,6 +1755,13 @@ This can help reduce effects of shard movement.`,
false,
`EnableHostLevelEventsCache controls if the events cache is host level. Requires service restart to take effect.`,
)
EventsCacheBackgroundEvict = NewGlobalTypedSetting(
"history.eventsCacheBackgroundEvict",
DefaultEventsCacheBackgroundEvictSettings,
`EventsCacheBackgroundEvict configures background processing to purge expired entries from the events cache.
Requires service restart to take effect.`,
)

AcquireShardInterval = NewGlobalDurationSetting(
"history.acquireShardInterval",
time.Minute,
Expand Down
6 changes: 6 additions & 0 deletions common/dynamicconfig/shared_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ var DefaultHistoryCacheBackgroundEvictSettings = CacheBackgroundEvictSettings{
MaxEntryPerCall: 1024,
}

var DefaultEventsCacheBackgroundEvictSettings = CacheBackgroundEvictSettings{
Enabled: false,
LoopInterval: 1 * time.Minute,
MaxEntryPerCall: 1024,
}

type PartitionScaleAllowedDrift struct {
// Delta and Ratio controls how far off client counts can be before we reject an RPC.
// If the client count is within the delta, it's allowed. Also, if the ratio of
Expand Down
2 changes: 2 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type Config struct {
// Change of these configs require service restart
EnableHostLevelEventsCache dynamicconfig.BoolPropertyFn
EventsHostLevelCacheMaxSizeBytes dynamicconfig.IntPropertyFn
EventsCacheBackgroundEvict dynamicconfig.TypedPropertyFn[dynamicconfig.CacheBackgroundEvictSettings]

// ShardController settings
RangeSizeBits uint
Expand Down Expand Up @@ -507,6 +508,7 @@ func NewConfig(
EventsHostLevelCacheMaxSizeBytes: dynamicconfig.EventsHostLevelCacheMaxSizeBytes.Get(dc), // 256MB
EventsCacheTTL: dynamicconfig.EventsCacheTTL.Get(dc),
EnableHostLevelEventsCache: dynamicconfig.EnableHostLevelEventsCache.Get(dc),
EventsCacheBackgroundEvict: dynamicconfig.EventsCacheBackgroundEvict.Get(dc),

RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range

Expand Down
7 changes: 5 additions & 2 deletions service/history/events/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.temporal.io/api/serviceerror"
"go.temporal.io/server/common"
"go.temporal.io/server/common/cache"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
Expand Down Expand Up @@ -60,7 +61,7 @@ func NewHostLevelEventsCache(
logger log.Logger,
disabled bool,
) Cache {
return newEventsCache(executionManager, handler, logger, config.EventsHostLevelCacheMaxSizeBytes(), config.EventsCacheTTL(), disabled)
return newEventsCache(executionManager, handler, logger, config.EventsHostLevelCacheMaxSizeBytes(), config.EventsCacheTTL(), config.EventsCacheBackgroundEvict, disabled)
}

func NewShardLevelEventsCache(
Expand All @@ -70,7 +71,7 @@ func NewShardLevelEventsCache(
logger log.Logger,
disabled bool,
) Cache {
return newEventsCache(executionManager, handler, logger, config.EventsShardLevelCacheMaxSizeBytes(), config.EventsCacheTTL(), disabled)
return newEventsCache(executionManager, handler, logger, config.EventsShardLevelCacheMaxSizeBytes(), config.EventsCacheTTL(), config.EventsCacheBackgroundEvict, disabled)
}

func newEventsCache(
Expand All @@ -79,10 +80,12 @@ func newEventsCache(
logger log.Logger,
maxSize int,
ttl time.Duration,
backgroundEvict dynamicconfig.TypedPropertyFn[dynamicconfig.CacheBackgroundEvictSettings],
disabled bool,
) *CacheImpl {
opts := &cache.Options{}
opts.TTL = ttl
opts.BackgroundEvict = backgroundEvict

taggedMetricHandler := metricsHandler.WithTags(metrics.CacheTypeTag(metrics.EventsCacheTypeTagValue))
return &CacheImpl{
Expand Down
4 changes: 4 additions & 0 deletions service/history/events/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
Expand Down Expand Up @@ -66,6 +67,9 @@ func (s *eventsCacheSuite) newTestEventsCache() *CacheImpl {
s.logger,
32,
time.Minute,
func() dynamicconfig.CacheBackgroundEvictSettings {
return dynamicconfig.DefaultEventsCacheBackgroundEvictSettings
},
false)
}

Expand Down