diff --git a/service/matching/matching_engine_test.go b/service/matching/matching_engine_test.go index cab912877b1..f3c6c95c90a 100644 --- a/service/matching/matching_engine_test.go +++ b/service/matching/matching_engine_test.go @@ -48,6 +48,7 @@ import ( "go.temporal.io/server/common/cluster/clustertest" "go.temporal.io/server/common/collection" "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/future" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/membership" @@ -64,6 +65,7 @@ import ( "go.temporal.io/server/common/searchattribute" serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/common/tasktoken" + "go.temporal.io/server/common/testing/await" "go.temporal.io/server/common/testing/protoassert" "go.temporal.io/server/common/testing/testlogger" "go.temporal.io/server/common/testing/testvars" @@ -2697,9 +2699,19 @@ func (s *matchingEngineSuite) TestGetTaskQueueUserData_LongPoll_WakesUp_FromNoth ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() + pollFuture := future.NewFuture[*matchingservice.GetTaskQueueUserDataResponse]() go func() { - time.Sleep(200 * time.Millisecond) + res, err := s.matchingEngine.GetTaskQueueUserData(ctx, &matchingservice.GetTaskQueueUserDataRequest{ + NamespaceId: namespaceID.String(), + TaskQueue: tq, + TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW, + LastKnownUserDataVersion: 0, // must be zero to start + WaitNewData: true, + }) + pollFuture.Set(res, err) + }() + await.Require(ctx, s.T(), func(t *await.T) { _, err := s.matchingEngine.UpdateWorkerBuildIdCompatibility(context.Background(), &matchingservice.UpdateWorkerBuildIdCompatibilityRequest{ NamespaceId: namespaceID.String(), TaskQueue: tq, @@ -2715,18 +2727,14 @@ func (s *matchingEngineSuite) TestGetTaskQueueUserData_LongPoll_WakesUp_FromNoth }, }, }) - s.NoError(err) - }() + require.NoError(t, err) + }, 5*time.Second, 50*time.Millisecond) - res, err := s.matchingEngine.GetTaskQueueUserData(ctx, &matchingservice.GetTaskQueueUserDataRequest{ - NamespaceId: namespaceID.String(), - TaskQueue: tq, - TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW, - LastKnownUserDataVersion: 0, // must be zero to start - WaitNewData: true, - }) - s.NoError(err) - s.NotNil(res.UserData.Data.VersioningData) + await.Require(ctx, s.T(), func(t *await.T) { + res, err := pollFuture.Get(t.Context()) + require.NoError(t, err) + require.NotNil(t, res.UserData.Data.VersioningData) + }, 5*time.Second, 50*time.Millisecond) } func (s *matchingEngineSuite) TestGetTaskQueueUserData_LongPoll_WakesUp_From2to3() { @@ -2751,9 +2759,19 @@ func (s *matchingEngineSuite) TestGetTaskQueueUserData_LongPoll_WakesUp_From2to3 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() + pollFuture := future.NewFuture[*matchingservice.GetTaskQueueUserDataResponse]() go func() { - time.Sleep(200 * time.Millisecond) + res, err := s.matchingEngine.GetTaskQueueUserData(ctx, &matchingservice.GetTaskQueueUserDataRequest{ + NamespaceId: namespaceID.String(), + TaskQueue: tq, + TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW, + LastKnownUserDataVersion: userData.Version, + WaitNewData: true, + }) + pollFuture.Set(res, err) + }() + await.Require(ctx, s.T(), func(t *await.T) { _, err := s.matchingEngine.UpdateWorkerBuildIdCompatibility(context.Background(), &matchingservice.UpdateWorkerBuildIdCompatibilityRequest{ NamespaceId: namespaceID.String(), TaskQueue: tq, @@ -2769,19 +2787,15 @@ func (s *matchingEngineSuite) TestGetTaskQueueUserData_LongPoll_WakesUp_From2to3 }, }, }) - s.NoError(err) - }() + require.NoError(t, err) + }, 5*time.Second, 50*time.Millisecond) - res, err := s.matchingEngine.GetTaskQueueUserData(ctx, &matchingservice.GetTaskQueueUserDataRequest{ - NamespaceId: namespaceID.String(), - TaskQueue: tq, - TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW, - LastKnownUserDataVersion: userData.Version, - WaitNewData: true, - }) - s.NoError(err) - s.True(hlc.Greater(res.UserData.Data.Clock, userData.Data.Clock)) - s.NotNil(res.UserData.Data.VersioningData) + await.Require(ctx, s.T(), func(t *await.T) { + res, err := pollFuture.Get(t.Context()) + require.NoError(t, err) + require.True(t, hlc.Greater(res.UserData.Data.Clock, userData.Data.Clock)) + require.NotNil(t, res.UserData.Data.VersioningData) + }, 5*time.Second, 50*time.Millisecond) } func (s *matchingEngineSuite) TestGetTaskQueueUserData_LongPoll_Closes() {