Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions chasm/lib/workflow/library.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,13 @@ func workflowContextFromChasm(ctx chasm.Context) *workflowContext {

func (l *library) Components() []*chasm.RegistrableComponent {
return []*chasm.RegistrableComponent{
chasm.NewRegistrableComponent[*Workflow](chasm.WorkflowComponentName, chasm.WithContextValues(map[any]any{
ctxKeyWorkflowContext: &workflowContext{registry: l.registry},
})),
chasm.NewRegistrableComponent[*Workflow](
chasm.WorkflowComponentName,
chasm.WithBusinessIDAlias("WorkflowId"),
chasm.WithContextValues(map[any]any{
ctxKeyWorkflowContext: &workflowContext{registry: l.registry},
}),
),
chasm.NewRegistrableComponent[*WorkflowUpdate]("update"),
}
}
Expand Down
72 changes: 72 additions & 0 deletions chasm/lib/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
callbackspb "go.temporal.io/server/chasm/lib/callback/gen/callbackpb/v1"
"go.temporal.io/server/chasm/lib/nexusoperation"
chasmworkflowpb "go.temporal.io/server/chasm/lib/workflow/gen/workflowpb/v1"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/softassert"
"go.temporal.io/server/service/history/historybuilder"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -27,6 +29,9 @@ type Workflow struct {
// MSPointer is a special in-memory field for accessing the underlying mutable state.
chasm.MSPointer

// Visibility to store custom search attributes and memo.
Visibility chasm.Field[*chasm.Visibility]

// Callbacks map is used to store the callbacks for the workflow.
Callbacks chasm.Map[string, *callback.Callback]

Expand Down Expand Up @@ -72,6 +77,73 @@ func (w *Workflow) Terminate(
return chasm.TerminateComponentResponse{}, serviceerror.NewInternal("workflow root Terminate should not be called")
}

// SearchAttributes returns the predefined search attributes set in the underlying mutable state.
func (w *Workflow) SearchAttributes(ctx chasm.Context) []chasm.SearchAttributeKeyValue {
searchAttributes, err := w.GetPredefinedSearchAttributes()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have to deep copy/clone the SAs here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need for backwards compatibility if we need to read search attributes from the mutable state that doesn't contain the metadata type. It's only making a deep copy when necessary, ie., when it doesn't contain the metadata type. Otherwise, it makes a shallow copy.

softassert.That(
ctx.Logger(),
err == nil,
"failed to retrieve search attributes from mutable state execution info",
tag.Error(err),
)

var res []chasm.SearchAttributeKeyValue
for saName, value := range searchAttributes {
res = append(res, chasm.SearchAttributeKeyValue{
Alias: saName,
Field: saName,
Value: value,
})
}
return res
}

// CustomSearchAttributes returns the custom search attributes.
func (w *Workflow) CustomSearchAttributes(ctx chasm.Context) map[string]*commonpb.Payload {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where/how will the following four methods be used?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be used in the mutable state. I'll have a PR using those soon.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR using those functions: #10498

if vis, ok := w.Visibility.TryGet(ctx); ok {
return vis.CustomSearchAttributes(ctx)
}
return nil
}

// CustomMemo returns the custom memo.
func (w *Workflow) CustomMemo(ctx chasm.Context) map[string]*commonpb.Payload {
if vis, ok := w.Visibility.TryGet(ctx); ok {
return vis.CustomMemo(ctx)
}
return nil
}

// UpsertCustomSearchAttributes merges the provided custom search attributes into the existing one.
// For details of the merge, see [chasm.Visibility.MergeCustomSearchAttributes].
func (w *Workflow) UpsertCustomSearchAttributes(
ctx chasm.MutableContext,
customSearchAttributes map[string]*commonpb.Payload,
) error {
if vis, ok := w.Visibility.TryGet(ctx); ok {
vis.MergeCustomSearchAttributes(ctx, customSearchAttributes)
} else {
vis := chasm.NewVisibilityWithData(ctx, customSearchAttributes, nil)
w.Visibility = chasm.NewComponentField(ctx, vis)
}
return nil
}

// UpsertCustomMemo merges the provided custom memo into the existing one.
// For details of the merge, see [chasm.Visibility.MergeCustomMemo].
func (w *Workflow) UpsertCustomMemo(
ctx chasm.MutableContext,
customMemo map[string]*commonpb.Payload,
) error {
if vis, ok := w.Visibility.TryGet(ctx); ok {
vis.MergeCustomMemo(ctx, customMemo)
} else {
vis := chasm.NewVisibilityWithData(ctx, nil, customMemo)
w.Visibility = chasm.NewComponentField(ctx, vis)
}
return nil
}

// ProcessCloseCallbacks triggers "WorkflowClosed" callbacks using the CHASM implementation.
// It schedules all workflow-level and update-level callbacks that are in STANDBY state.
func (w *Workflow) ProcessCloseCallbacks(ctx chasm.MutableContext) error {
Expand Down
24 changes: 24 additions & 0 deletions chasm/ms_pointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package chasm
import (
"time"

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/server/common/nexus/nexusrpc"
"go.temporal.io/server/common/searchattribute/sadefs"
"google.golang.org/protobuf/proto"
)

// MSPointer is a special CHASM type which components can use to access their Node's underlying backend (i.e. mutable
Expand Down Expand Up @@ -61,3 +64,24 @@ func (m MSPointer) GetWorkflowTypeName() string {
func (m MSPointer) GetNexusUpdateCompletion(ctx Context, updateID string, requestID string) (nexusrpc.CompleteOperationOptions, error) {
return m.backend.GetNexusUpdateCompletion(ctx.goContext(), updateID, requestID)
}

// GetPredefinedSearchAttributes retrieves the predefined search attributes from the underlying mutable state.
func (m MSPointer) GetPredefinedSearchAttributes() (map[string]VisibilityValue, error) {
Copy link
Copy Markdown
Contributor

@awln-temporal awln-temporal Jun 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I need more context for this, will predefined search attributes end up getting moved to the Workflow Component's state? I guess I am in general having the same question as Yichao; do visibility tasks end up getting duplicated (MS backed and CHASM backed) in this current implementation?

I guess when I say duplicate here, I mean CHASM visibility tasks end up writing both custom and predefined search attributes, whereas the legacy visibility task only reads mutable state, and writes only predefined search attributes?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also for naming purposes, these predefined search attributes include Predefined and System search attributes right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Predefined here means really only the predefined search attributes that are stored together with custom search attributes.

At this moment, the idea is that all predefined search attributes are stored in the mutable state, and CHASM Workflow extracts the values from there. Not stored twice.

For custom search attributes, it would be duplicated. More details in a follow up PR. This PR is not really adding any tasks since it doesn't introduce any changes to the CHASM Visibility component.

msSearchAttributes := m.backend.GetExecutionInfo().GetSearchAttributes()
predefinedWithType := make(map[string]*commonpb.Payload)
for saName, saPayload := range msSearchAttributes {
if saType, ok := predefinedSearchAttributes[saName]; ok {
if sadefs.GetMetadataType(saPayload) == enumspb.INDEXED_VALUE_TYPE_UNSPECIFIED {
saPayload = proto.CloneOf(saPayload)
sadefs.SetMetadataType(saPayload, saType)
}
predefinedWithType[saName] = saPayload
}
}
saMap, err := newSearchAttributesMapFromProto(
&commonpb.SearchAttributes{
IndexedFields: predefinedWithType,
},
)
return saMap.values, err
}
8 changes: 8 additions & 0 deletions chasm/node_backend_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var _ NodeBackend = (*MockNodeBackend)(nil)
// fields (thread-safe).
type MockNodeBackend struct {
// Optional function overrides. If nil, methods return zero-values.
HandleGetExecutionStateUpdated func() bool
HandleGetExecutionState func() *persistencespb.WorkflowExecutionState
HandleGetExecutionInfo func() *persistencespb.WorkflowExecutionInfo
HandleGetCurrentVersion func() int64
Expand Down Expand Up @@ -50,6 +51,13 @@ type MockNodeBackend struct {
}
}

func (m *MockNodeBackend) ExecutionStateUpdated() bool {
if m.HandleGetExecutionStateUpdated != nil {
return m.HandleGetExecutionStateUpdated()
}
return false
}

func (m *MockNodeBackend) GetExecutionState() *persistencespb.WorkflowExecutionState {
if m.HandleGetExecutionState != nil {
return m.HandleGetExecutionState()
Expand Down
3 changes: 3 additions & 0 deletions chasm/search_attribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"go.temporal.io/server/common/searchattribute/sadefs"
)

// Make a copy of predefined search attributes for CHASM internal usage.
var predefinedSearchAttributes = sadefs.Predefined()

// CHASM Search Attribute User Guide:
//
// This contains CHASM search attribute field constants. These predefined fields correspond to the exact column name in Visibility storage.
Expand Down
11 changes: 8 additions & 3 deletions chasm/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"iter"
"maps"
"reflect"
"slices"
"strconv"
Expand All @@ -31,7 +32,6 @@ import (
"go.temporal.io/server/common/persistence/transitionhistory"
"go.temporal.io/server/common/softassert"
"go.temporal.io/server/service/history/tasks"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -205,6 +205,7 @@ type (
// where MutableState is defined.
NodeBackend interface {
// TODO: Add methods needed from MutateState here.
ExecutionStateUpdated() bool
GetExecutionState() *persistencespb.WorkflowExecutionState
GetExecutionInfo() *persistencespb.WorkflowExecutionInfo
GetApproximatePersistedSize() int
Expand Down Expand Up @@ -1688,7 +1689,11 @@ func (n *Node) CloseTransaction() (NodesMutation, error) {
return NodesMutation{}, err
}

if n.isActiveStateDirty {
// Normally, it's unnecessary to check rootLifecycleChanged as it implies
// isActiveStateDirty. However, workflow execution state is managed in the
// mutable state, and closeTransactionHandleRootLifecycleChange returns can
// return true without changing isActiveStateDirty.
if n.isActiveStateDirty || rootLifecycleChanged {
Comment thread
rodrigozhou marked this conversation as resolved.
if err := n.closeTransactionForceUpdateVisibility(immutableContext, rootLifecycleChanged); err != nil {
return NodesMutation{}, err
}
Expand Down Expand Up @@ -1762,7 +1767,7 @@ func (n *Node) closeTransactionHandleRootLifecycleChange(
) (bool, error) {
if n.backend.IsWorkflow() {
// Workflow manages its lifecycle directly in mutable state.
return false, nil
return n.backend.ExecutionStateUpdated(), nil
}

if n.valueState != valueStateNeedSerialize {
Expand Down
1 change: 1 addition & 0 deletions service/history/interfaces/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ type (
GetLastEventVersion() (int64, error)
GetExecutionInfo() *persistencespb.WorkflowExecutionInfo
GetExecutionState() *persistencespb.WorkflowExecutionState
ExecutionStateUpdated() bool
GetStartedWorkflowTask() *WorkflowTaskInfo
GetPendingWorkflowTask() *WorkflowTaskInfo
GetLastFirstEventIDTxnID() (int64, int64)
Expand Down
14 changes: 14 additions & 0 deletions service/history/interfaces/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,10 @@ func (ms *MutableStateImpl) GetExecutionState() *persistencespb.WorkflowExecutio
return ms.executionState
}

func (ms *MutableStateImpl) ExecutionStateUpdated() bool {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the purpose of this method?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execution state is managed in the mutable state currently, and the CHASM Workflow needs to know if the execution state changed to generate CHASM visibility task.

return ms.stateInDB != ms.executionState.GetState()
}

func (ms *MutableStateImpl) FlushBufferedEvents() {
if ms.HasStartedWorkflowTask() {
return
Expand Down
Loading