From 1e6b97c983e4a34992f4a91303205dd20a3ac83f Mon Sep 17 00:00:00 2001 From: GreatRiver <14086886+LeftHandCold@users.noreply.github.com> Date: Mon, 20 Apr 2026 18:05:51 +0800 Subject: [PATCH 1/2] compile: allow local notify streams for shuffle dispatch (#24159) This fixes the self-connection bug in the compile/remoterun notification path. `Scope.RemoteRun` already handles the local-scope case with `ipAddrMatch`, but `Scope.sendNotifyMessage` still creates a pipeline RPC stream for every `RemoteReceivRegInfo.FromAddr`. When one dispatch operator is scheduled on the same CN as the coordinator, `fromAddr` equals the local service address. The old `pipelineClient.NewStream` rejected that loopback connection with `remote run pipeline in local`, so the prepare-done notification path could not be established for the local dispatch operator. That can break remote dispatch/shuffle coordination for queries such as secondary-index UPDATE plans using shuffle join with `serial_full(...)` and surface as failure or hang. This PR keeps the existing `RemoteRun` local-execution guard intact, but allows the notification stream itself to connect to the local CN address. That is the smallest fix that restores the existing dispatch notification protocol without refactoring local/remote receiver handling. Also included: - a unit test that pins the real regression point in `pipelineClient.NewStream` Approved by: @XuPeng-SH --- pkg/cnservice/cnclient/client.go | 6 --- pkg/cnservice/cnclient/client_test.go | 55 +++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 6 deletions(-) create mode 100644 pkg/cnservice/cnclient/client_test.go diff --git a/pkg/cnservice/cnclient/client.go b/pkg/cnservice/cnclient/client.go index 294fd8835a2fe..df4a1abe6d084 100644 --- a/pkg/cnservice/cnclient/client.go +++ b/pkg/cnservice/cnclient/client.go @@ -15,10 +15,7 @@ package cnclient import ( - "fmt" - "github.com/fagongzi/goetty/v2" - "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/morpc" "github.com/matrixorigin/matrixone/pkg/common/runtime" "github.com/matrixorigin/matrixone/pkg/logutil" @@ -111,9 +108,6 @@ func NewPipelineClient( } func (c *pipelineClient) NewStream(backend string) (morpc.Stream, error) { - if backend == c.localServiceAddress { - return nil, moerr.NewInternalErrorNoCtx(fmt.Sprintf("remote run pipeline in local: %s", backend)) - } return c.client.NewStream(backend, true) } diff --git a/pkg/cnservice/cnclient/client_test.go b/pkg/cnservice/cnclient/client_test.go new file mode 100644 index 0000000000000..a91e4f9e4d18c --- /dev/null +++ b/pkg/cnservice/cnclient/client_test.go @@ -0,0 +1,55 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cnclient + +import ( + "context" + "testing" + + "github.com/matrixorigin/matrixone/pkg/common/morpc" + "github.com/stretchr/testify/require" +) + +type testRPCClient struct { + backend string + lock bool +} + +func (c *testRPCClient) Send(ctx context.Context, backend string, request morpc.Message) (*morpc.Future, error) { + return nil, nil +} + +func (c *testRPCClient) NewStream(ctx context.Context, backend string, lock bool) (morpc.Stream, error) { + c.backend = backend + c.lock = lock + return nil, nil +} + +func (c *testRPCClient) Ping(ctx context.Context, backend string) error { return nil } +func (c *testRPCClient) Close() error { return nil } +func (c *testRPCClient) CloseBackend() error { return nil } + +func TestPipelineClient_NewStreamAllowsLocalBackend(t *testing.T) { + rpcClient := &testRPCClient{} + client := &pipelineClient{ + localServiceAddress: "127.0.0.1:1234", + client: rpcClient, + } + + _, err := client.NewStream(context.Background(), "127.0.0.1:1234") + require.NoError(t, err) + require.Equal(t, "127.0.0.1:1234", rpcClient.backend) + require.True(t, rpcClient.lock) +} From 983b4ef82fcbe100b0b86f70d6e02a217752657a Mon Sep 17 00:00:00 2001 From: GreatRiver Date: Sun, 7 Jun 2026 10:03:37 +0800 Subject: [PATCH 2/2] test: fix pipeline client stream mock signature --- pkg/cnservice/cnclient/client_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/cnservice/cnclient/client_test.go b/pkg/cnservice/cnclient/client_test.go index a91e4f9e4d18c..51d18040903f0 100644 --- a/pkg/cnservice/cnclient/client_test.go +++ b/pkg/cnservice/cnclient/client_test.go @@ -31,7 +31,7 @@ func (c *testRPCClient) Send(ctx context.Context, backend string, request morpc. return nil, nil } -func (c *testRPCClient) NewStream(ctx context.Context, backend string, lock bool) (morpc.Stream, error) { +func (c *testRPCClient) NewStream(backend string, lock bool) (morpc.Stream, error) { c.backend = backend c.lock = lock return nil, nil @@ -48,7 +48,7 @@ func TestPipelineClient_NewStreamAllowsLocalBackend(t *testing.T) { client: rpcClient, } - _, err := client.NewStream(context.Background(), "127.0.0.1:1234") + _, err := client.NewStream("127.0.0.1:1234") require.NoError(t, err) require.Equal(t, "127.0.0.1:1234", rpcClient.backend) require.True(t, rpcClient.lock)