diff --git a/common/testing/testhooks/hooks.go b/common/testing/testhooks/hooks.go index 5c10f700036..349dbc1e1ed 100644 --- a/common/testing/testhooks/hooks.go +++ b/common/testing/testhooks/hooks.go @@ -4,6 +4,7 @@ import ( "context" "time" + "go.temporal.io/server/api/historyservice/v1" persistencespb "go.temporal.io/server/api/persistence/v1" replicationspb "go.temporal.io/server/api/replication/v1" "go.temporal.io/server/common/namespace" @@ -25,6 +26,7 @@ var ( HistoryReplicationTaskInterceptor = newKey[func(*replicationspb.ReplicationTask, func() error) error, global]() HistoryReplicationDLQWriteInterceptor = newKey[func(*persistencespb.ReplicationTaskInfo, func() error) error, global]() HistoryTransferTaskInterceptor = newKey[func(historytasks.Task, func()), namespace.ID]() + HistoryDLQTaskDeleteInterceptor = newKey[func(context.Context, *historyservice.DeleteDLQTasksRequest, func(context.Context, *historyservice.DeleteDLQTasksRequest) (*historyservice.DeleteDLQTasksResponse, error)) (*historyservice.DeleteDLQTasksResponse, error), global]() NamespaceReplicationTaskInterceptor = newKey[func(context.Context, *replicationspb.NamespaceTaskAttributes, func() error) error, namespace.Name]() ) diff --git a/service/history/fx.go b/service/history/fx.go index 1ac4afcfc38..024f9826aee 100644 --- a/service/history/fx.go +++ b/service/history/fx.go @@ -166,6 +166,7 @@ func HandlerProvider(args NewHandlerArgs) (*Handler, error) { dlqMetricsEmitter: args.DLQMetricsEmitter, chasmEngine: args.ChasmEngine, chasmRegistry: args.ChasmRegistry, + testHooks: args.TestHooks, replicationTaskFetcherFactory: args.ReplicationTaskFetcherFactory, replicationTaskConverterProvider: args.ReplicationTaskConverterFactory, diff --git a/service/history/handler.go b/service/history/handler.go index 4bd2c154d55..7f29a083581 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -52,6 +52,7 @@ import ( "go.temporal.io/server/common/searchattribute" serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/common/tasktoken" + "go.temporal.io/server/common/testing/testhooks" "go.temporal.io/server/components/nexusoperations" "go.temporal.io/server/service/history/api" "go.temporal.io/server/service/history/api/deletedlqtasks" @@ -104,6 +105,7 @@ type ( chasmEngine chasm.Engine chasmRegistry *chasm.Registry nexusHandler nexus.Handler + testHooks testhooks.TestHooks replicationTaskFetcherFactory replication.TaskFetcherFactory replicationTaskConverterProvider replication.SourceTaskConverterProvider @@ -140,6 +142,7 @@ type ( DLQMetricsEmitter *persistence.DLQMetricsEmitter ChasmEngine chasm.Engine ChasmRegistry *chasm.Registry + TestHooks testhooks.TestHooks ReplicationTaskFetcherFactory replication.TaskFetcherFactory ReplicationTaskConverterFactory replication.SourceTaskConverterProvider @@ -2046,6 +2049,16 @@ func (h *Handler) GetDLQTasks( func (h *Handler) DeleteDLQTasks( ctx context.Context, request *historyservice.DeleteDLQTasksRequest, +) (*historyservice.DeleteDLQTasksResponse, error) { + if hook, ok := testhooks.Get(h.testHooks, testhooks.HistoryDLQTaskDeleteInterceptor, testhooks.GlobalScope); ok { + return hook(ctx, request, h.deleteDLQTasks) + } + return h.deleteDLQTasks(ctx, request) +} + +func (h *Handler) deleteDLQTasks( + ctx context.Context, + request *historyservice.DeleteDLQTasksRequest, ) (*historyservice.DeleteDLQTasksResponse, error) { return deletedlqtasks.Invoke(ctx, h.taskQueueManager, request, h.taskCategoryRegistry) } diff --git a/tests/dlq_test.go b/tests/dlq_test.go index c296e3965c7..ebebbeba802 100644 --- a/tests/dlq_test.go +++ b/tests/dlq_test.go @@ -23,6 +23,7 @@ import ( "go.temporal.io/sdk/workflow" "go.temporal.io/server/api/adminservice/v1" enumsspb "go.temporal.io/server/api/enums/v1" + "go.temporal.io/server/api/historyservice/v1" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/codec" "go.temporal.io/server/common/config" @@ -31,14 +32,13 @@ import ( "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/common/primitives" - "go.temporal.io/server/common/sdk" "go.temporal.io/server/common/testing/await" + "go.temporal.io/server/common/testing/testhooks" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/tests/testcore" "go.temporal.io/server/tests/testutils" "go.temporal.io/server/tools/tdbg" "go.temporal.io/server/tools/tdbg/tdbgtest" - "go.uber.org/fx" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" ) @@ -47,11 +47,11 @@ type ( DLQSuite struct { testcore.FunctionalTestBase - dlq persistence.HistoryTaskQueueManager - writer bytes.Buffer - sdkClientFactory sdk.ClientFactory - tdbgApp *cli.App - deleteBlockCh chan any + dlq persistence.HistoryTaskQueueManager + writer bytes.Buffer + systemSDKClient sdkclient.Client + tdbgApp *cli.App + deleteBlockCh chan any failingWorkflowIDPrefix atomic.Pointer[string] } @@ -66,10 +66,6 @@ type ( targetCluster string expectedNumMessages int } - testTaskQueueManager struct { - suite *DLQSuite - persistence.HistoryTaskQueueManager - } ) const ( @@ -98,21 +94,33 @@ func (s *DLQSuite) SetupSuite() { return serialization.NewDeserializationError(enumspb.ENCODING_TYPE_PROTO3, errors.New("test error")) }, }), - testcore.WithFxOptionsForService(primitives.HistoryService, - fx.Populate(&s.dlq), - fx.Decorate( - func(m persistence.HistoryTaskQueueManager) persistence.HistoryTaskQueueManager { - return &testTaskQueueManager{ - suite: s, - HistoryTaskQueueManager: m, - } - }, - ), - ), - testcore.WithFxOptionsForService(primitives.FrontendService, - fx.Populate(&s.sdkClientFactory), - ), ) + + var err error + s.dlq, err = s.GetTestCluster().TestBase().Factory.NewHistoryTaskQueueManager() + s.Require().NoError(err) + + s.systemSDKClient, err = sdkclient.Dial(sdkclient.Options{ + HostPort: s.FrontendGRPCAddress(), + Namespace: primitives.SystemLocalNamespace, + }) + s.Require().NoError(err) + s.T().Cleanup(func() { + s.systemSDKClient.Close() + }) + + s.InjectHook(testhooks.NewHook( + testhooks.HistoryDLQTaskDeleteInterceptor, + func( + ctx context.Context, + request *historyservice.DeleteDLQTasksRequest, + deleteTasks func(context.Context, *historyservice.DeleteDLQTasksRequest) (*historyservice.DeleteDLQTasksResponse, error), + ) (*historyservice.DeleteDLQTasksResponse, error) { + <-s.deleteBlockCh + return deleteTasks(ctx, request) + }, + )) + s.tdbgApp = tdbgtest.NewCliApp( func(params *tdbg.Params) { params.ClientFactory = tdbg.NewClientFactory(tdbg.WithFrontendAddress(s.FrontendGRPCAddress())) @@ -471,8 +479,7 @@ func (s *DLQSuite) purgeMessages(ctx context.Context, maxMessageIDToDelete int64 var token adminservice.DLQJobToken s.NoError(proto.Unmarshal(response.GetJobToken(), &token)) - systemSDKClient := s.sdkClientFactory.GetSystemClient() - run := systemSDKClient.GetWorkflow(ctx, token.WorkflowId, token.RunId) + run := s.systemSDKClient.GetWorkflow(ctx, token.WorkflowId, token.RunId) s.NoError(run.Get(ctx, nil)) return tokenString } @@ -484,8 +491,7 @@ func (s *DLQSuite) mergeMessages(ctx context.Context, maxMessageID int64) string s.NoError(err) var token adminservice.DLQJobToken s.NoError(token.Unmarshal(tokenBytes)) - systemSDKClient := s.sdkClientFactory.GetSystemClient() - run := systemSDKClient.GetWorkflow(ctx, token.WorkflowId, token.RunId) + run := s.systemSDKClient.GetWorkflow(ctx, token.WorkflowId, token.RunId) s.NoError(run.Get(ctx, nil)) return tokenString } @@ -617,12 +623,3 @@ func (s *DLQSuite) readTransferTasks(file *os.File) []tdbgtest.DLQMessage[*persi s.NoError(err) return dlqTasks } - -// ReadTasks is used to block the dlq job workflow until one of them is cancelled in TestCancelRunningMerge. -func (m *testTaskQueueManager) DeleteTasks( - ctx context.Context, - request *persistence.DeleteTasksRequest, -) (*persistence.DeleteTasksResponse, error) { - <-m.suite.deleteBlockCh - return m.HistoryTaskQueueManager.DeleteTasks(ctx, request) -}