Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2994,6 +2994,15 @@ the CHASM implementation. When disabled, new update callbacks will not be regist
but existing callbacks will still be processed and fired.`,
)

EnableCHASMVisibilityRatio = NewNamespaceFloatSetting(
"history.enableCHASMVisibilityRatio",
0.0,
`Controls whether workflows uses CHASM Visibility component to manage custom
search attributes and memo. Set ratio=0 to disable, set ratio=1 to enable,
set 0 < ratio < 1 to enable double write for a ratio of the workflows.
Note: system.visibilityAllowList must be false to enable CHASM Visibility.`,
)

VersionMembershipCacheTTL = NewGlobalDurationSetting(
"history.versionMembershipCacheTTL",
1*time.Second,
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/visibility/visibility_manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,9 +576,9 @@ func aliasChasmSearchAttributes(
aliasName, err := mapper.Alias(fieldName)
if err != nil {
// Silently ignore serviceerror.InvalidArgument because it indicates unregistered field.
// INFO: Chasm search attributes must be registered with the CHASM Registry using the WithSearchAttributes() option.
var invalidArgumentErr *serviceerror.InvalidArgument
if errors.As(err, &invalidArgumentErr) {
// INFO: Chasm search attributes must be registered with the CHASM Registry
// using the WithSearchAttributes() option.
if _, ok := errors.AsType[*serviceerror.InvalidArgument](err); ok {
continue
}
return nil, err
Expand Down
3 changes: 1 addition & 2 deletions common/searchattribute/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,7 @@ func AliasFields(
// Silently ignore serviceerror.InvalidArgument because it indicates unmapped field (alias was deleted, for example).
// IMPORTANT: AliasFields should never return serviceerror.InvalidArgument because it is used by Poll API and the error
// goes through up to SDK, which shutdowns worker when it receives serviceerror.InvalidArgument as poll response.
var invalidArgumentErr *serviceerror.InvalidArgument
if errors.As(err, &invalidArgumentErr) {
if _, ok := errors.AsType[*serviceerror.InvalidArgument](err); ok {
continue
}
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions config/dynamicconfig/development-cass.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,7 @@ history.enableChasm:
- value: true
activity.enableStandalone:
- value: true
history.enableCHASMVisibilityRatio:
- value: 0.999
system.visibilityAllowList:
- value: false
15 changes: 0 additions & 15 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,21 +716,6 @@ func (wh *WorkflowHandler) validateTimeSkippingConfig(
return nil
}

func (wh *WorkflowHandler) unaliasedSearchAttributesFrom(
attributes *commonpb.SearchAttributes,
namespaceName namespace.Name,
) (*commonpb.SearchAttributes, error) {
sa, err := searchattribute.UnaliasFields(wh.saMapperProvider, attributes, namespaceName.String())
if err != nil {
return nil, err
}

if err = wh.validator.ValidateSearchAttributes(sa, namespaceName.String()); err != nil {
return nil, err
}
return sa, nil
}

func (wh *WorkflowHandler) ExecuteMultiOperation(
ctx context.Context,
request *workflowservice.ExecuteMultiOperationRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,7 @@ func (handler *workflowTaskCompletedHandler) handleCommandSignalExternalWorkflow
}

func (handler *workflowTaskCompletedHandler) handleCommandUpsertWorkflowSearchAttributes(
_ context.Context,
ctx context.Context,
attr *commandpb.UpsertWorkflowSearchAttributesCommandAttributes,
) (*historypb.HistoryEvent, error) {
// get namespace name
Expand All @@ -1304,6 +1304,11 @@ func (handler *workflowTaskCompletedHandler) handleCommandUpsertWorkflowSearchAt
}
namespace := namespaceEntry.Name()

currentSearchAttributes, err := handler.mutableState.GetSearchAttributes(ctx)
if err != nil {
return nil, serviceerror.NewUnavailable("Unable to load current search attributes")
}

unaliasedSas, err := searchattribute.UnaliasFields(
handler.searchAttributesMapperProvider,
attr.GetSearchAttributes(),
Expand Down Expand Up @@ -1343,7 +1348,7 @@ func (handler *workflowTaskCompletedHandler) handleCommandUpsertWorkflowSearchAt
err = handler.sizeLimitChecker.checkIfSearchAttributesSizeExceedsLimit(
&commonpb.SearchAttributes{
IndexedFields: payload.MergeMapOfPayload(
executionInfo.SearchAttributes,
currentSearchAttributes,
attr.GetSearchAttributes().GetIndexedFields(),
),
},
Expand All @@ -1360,7 +1365,7 @@ func (handler *workflowTaskCompletedHandler) handleCommandUpsertWorkflowSearchAt
}

func (handler *workflowTaskCompletedHandler) handleCommandModifyWorkflowProperties(
_ context.Context,
ctx context.Context,
attr *commandpb.ModifyWorkflowPropertiesCommandAttributes,
) (*historypb.HistoryEvent, error) {
// get namespace name
Expand All @@ -1371,6 +1376,11 @@ func (handler *workflowTaskCompletedHandler) handleCommandModifyWorkflowProperti
return nil, serviceerror.NewUnavailablef("Unable to get namespace for namespaceID: %v.", namespaceID)
}

currentMemo, err := handler.mutableState.GetMemo(ctx)
if err != nil {
return nil, serviceerror.NewUnavailable("Unable to load current memo")
}

// valid properties
if err := handler.validateCommandAttr(
func() (enumspb.WorkflowTaskFailedCause, error) {
Expand All @@ -1392,7 +1402,7 @@ func (handler *workflowTaskCompletedHandler) handleCommandModifyWorkflowProperti
// new memo size limit check
err = handler.sizeLimitChecker.checkIfMemoSizeExceedsLimit(
&commonpb.Memo{
Fields: payload.MergeMapOfPayload(executionInfo.Memo, attr.GetUpsertedMemo().GetFields()),
Fields: payload.MergeMapOfPayload(currentMemo, attr.GetUpsertedMemo().GetFields()),
},
metrics.CommandTypeTag(enumspb.COMMAND_TYPE_MODIFY_WORKFLOW_PROPERTIES.String()),
"ModifyWorkflowPropertiesCommandAttributes. Memo exceeds size limit.",
Expand Down
2 changes: 2 additions & 0 deletions service/history/archival_queue_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,8 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
RelocatableAttributesRemoved: p.RelocatableAttributesRemoved,
}
mutableState.EXPECT().GetExecutionInfo().Return(executionInfo).AnyTimes()
mutableState.EXPECT().GetSearchAttributes(gomock.Any()).Return(nil, nil).AnyTimes()
mutableState.EXPECT().GetMemo(gomock.Any()).Return(nil, nil).AnyTimes()
executionState := &persistencespb.WorkflowExecutionState{
State: 0,
Status: 0,
Expand Down
2 changes: 2 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Config struct {
EnableChasm dynamicconfig.BoolPropertyFnWithNamespaceFilter
EnableCHASMCallbacks dynamicconfig.BoolPropertyFnWithNamespaceFilter
EnableCHASMSignalBacklinks dynamicconfig.BoolPropertyFnWithNamespaceFilter
EnableCHASMVisibilityRatio dynamicconfig.FloatPropertyFnWithNamespaceFilter
EnableWorkflowUpdateCallbacks dynamicconfig.BoolPropertyFnWithNamespaceFilter
ChasmMaxInMemoryPureTasks dynamicconfig.IntPropertyFn
EnableCHASMSchedulerCreation dynamicconfig.BoolPropertyFnWithNamespaceFilter
Expand Down Expand Up @@ -500,6 +501,7 @@ func NewConfig(

EnableCHASMCallbacks: dynamicconfig.EnableCHASMCallbacks.Get(dc),
EnableCHASMSignalBacklinks: dynamicconfig.EnableCHASMSignalBacklinks.Get(dc),
EnableCHASMVisibilityRatio: dynamicconfig.EnableCHASMVisibilityRatio.Get(dc),
ExternalPayloadsEnabled: dynamicconfig.ExternalPayloadsEnabled.Get(dc),
EnableWorkflowUpdateCallbacks: dynamicconfig.EnableWorkflowUpdateCallbacks.Get(dc),

Expand Down
6 changes: 4 additions & 2 deletions service/history/interfaces/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ type (
IsNonCurrentWorkflowGuaranteed() (bool, error)
IsSignalRequested(requestID string) bool
GetApproximatePersistedSize() int
GetSearchAttributes(ctx context.Context) (map[string]*commonpb.Payload, error)
GetMemo(ctx context.Context) (map[string]*commonpb.Payload, error)

CurrentTaskQueue() *taskqueuepb.TaskQueue
SetStickyTaskQueue(name string, scheduleToStartTimeout *durationpb.Duration)
Expand Down Expand Up @@ -267,8 +269,8 @@ type (
ApplyTimerFiredEvent(*historypb.HistoryEvent) error
ApplyTimerStartedEvent(*historypb.HistoryEvent) (*persistencespb.TimerInfo, error)
ApplyTransientWorkflowTaskScheduled() (*WorkflowTaskInfo, error)
ApplyWorkflowPropertiesModifiedEvent(*historypb.HistoryEvent)
ApplyUpsertWorkflowSearchAttributesEvent(*historypb.HistoryEvent)
ApplyWorkflowPropertiesModifiedEvent(*historypb.HistoryEvent) error
ApplyUpsertWorkflowSearchAttributesEvent(*historypb.HistoryEvent) error
ApplyWorkflowExecutionCancelRequestedEvent(*historypb.HistoryEvent) error
ApplyWorkflowExecutionCanceledEvent(int64, *historypb.HistoryEvent) error
ApplyWorkflowExecutionCompletedEvent(int64, *historypb.HistoryEvent) error
Expand Down
42 changes: 38 additions & 4 deletions service/history/interfaces/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 54 additions & 8 deletions service/history/visibility_queue_task_executor.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package history

import (
"bytes"
"context"
"strconv"
"time"

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
persistencepb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/chasm"
"go.temporal.io/server/common"
"go.temporal.io/server/common/definition"
Expand Down Expand Up @@ -414,9 +416,14 @@ func (t *visibilityQueueTaskExecutor) processChasmTask(
for alias, value := range aliasedCustomSearchAttributes {
fieldName, err := customSaMapper.GetFieldName(alias, namespaceEntry.Name().String())
if err != nil {
// To reach here, either the search attribute has been deregistered before task execution, which is valid behavior,
// or there are delays in propagating search attribute mappings to History.
t.logger.Warn("Failed to get field name for alias, ignoring search attribute", tag.String("alias", alias), tag.Error(err))
// To reach here, either the search attribute has been deregistered before task execution,
// which is valid behavior, or there are delays in propagating search attribute mappings to
// History.
t.logger.Warn(
"Failed to get field name for alias, ignoring search attribute",
tag.String("alias", alias),
tag.Error(err),
)
continue
}
searchAttributes[fieldName] = value
Expand Down Expand Up @@ -469,11 +476,22 @@ func (t *visibilityQueueTaskExecutor) processChasmTask(
searchAttributes,
)

// We reuse the TemporalNamespaceDivision column to store the string representation of ArchetypeID.
searchattribute.AddSearchAttributes(
&requestBase.SearchAttributes,
chasm.SearchAttributeTemporalNamespaceDivision.Value(strconv.FormatUint(uint64(tree.ArchetypeID()), 10)),
)
if !mutableState.IsWorkflow() {
// We reuse the TemporalNamespaceDivision column to store the string
// representation of ArchetypeID.
searchattribute.AddSearchAttributes(
&requestBase.SearchAttributes,
chasm.SearchAttributeTemporalNamespaceDivision.Value(
strconv.FormatUint(uint64(tree.ArchetypeID()), 10),
),
)
} else if t.shardContext.GetConfig().EnableCHASMVisibilityRatio(namespaceEntry.Name().String()) < 1 {
t.validateChasmWorkflowVisibility(
mutableState.GetExecutionInfo(),
searchAttributes,
userMemoMap,
)
}

// Override TaskQueue if provided by CHASM search attributes.
if chasmTaskQueue != "" {
Expand Down Expand Up @@ -660,6 +678,21 @@ func (t *visibilityQueueTaskExecutor) cleanupExecutionInfo(
return weContext.SetWorkflowExecution(ctx, t.shardContext)
}

func (t *visibilityQueueTaskExecutor) validateChasmWorkflowVisibility(
executionInfo *persistencepb.WorkflowExecutionInfo,
chasmCustomSearchAttributes map[string]*commonpb.Payload,
chasmCustomMemo map[string]*commonpb.Payload,
) {
if !isEqualMapPayload(executionInfo.GetSearchAttributes(), chasmCustomSearchAttributes) {
// Log there is a difference, but do not fail the task.
t.logger.Warn("CHASM Visibility custom search attributes does not match values in ExecutionInfo")
}
if !isEqualMapPayload(executionInfo.GetMemo(), chasmCustomMemo) {
// Log there is a difference, but do not fail the task.
t.logger.Warn("CHASM Visibility custom memo does not match values in ExecutionInfo")
}
}

func getWorkflowMemo(
memoFields map[string]*commonpb.Payload,
) *commonpb.Memo {
Expand Down Expand Up @@ -688,3 +721,16 @@ func copyMapPayload(input map[string]*commonpb.Payload) map[string]*commonpb.Pay
}
return result
}

func isEqualMapPayload(a, b map[string]*commonpb.Payload) bool {
if len(a) != len(b) {
return false
}
for key, value := range a {
other := b[key]
if !bytes.Equal(value.GetData(), other.GetData()) {
return false
}
}
return true
}
Loading
Loading