diff --git a/admin_client.go b/admin_client.go index 7a83e66e..8d8dbc82 100644 --- a/admin_client.go +++ b/admin_client.go @@ -63,7 +63,6 @@ func newAdminClient(zkquorum string, options ...Option) AdminClient { zkRoot: defaultZkRoot, effectiveUser: defaultEffectiveUser, regionLookupTimeout: region.DefaultLookupTimeout, - regionReadTimeout: region.DefaultReadTimeout, newRegionClientFn: region.NewClient, logger: slog.Default(), } diff --git a/client.go b/client.go index 110de13d..97e5db80 100644 --- a/client.go +++ b/client.go @@ -95,9 +95,6 @@ type client struct { // session timeout. regionLookupTimeout time.Duration - // regionReadTimeout is the maximum amount of time to wait for regionserver reply - regionReadTimeout time.Duration - done chan struct{} closeOnce sync.Once @@ -139,7 +136,6 @@ func newClient(zkquorum string, options ...Option) *client { zkTimeout: defaultZkTimeout, effectiveUser: defaultEffectiveUser, regionLookupTimeout: region.DefaultLookupTimeout, - regionReadTimeout: region.DefaultReadTimeout, done: make(chan struct{}), newRegionClientFn: func(addr string, ctype region.ClientType, options *region.RegionClientOptions) hrpc.RegionClient { @@ -216,7 +212,6 @@ func (c *client) MarshalJSON() ([]byte, error) { AdminRegionInfo hrpc.RegionInfo Done_Status string RegionLookupTimeout time.Duration - RegionReadTimeout time.Duration }{ ClientType: c.clientType, ClientRegionMap: clientRegionsMap, @@ -227,7 +222,6 @@ func (c *client) MarshalJSON() ([]byte, error) { AdminRegionInfo: c.adminRegionInfo, Done_Status: done, RegionLookupTimeout: c.regionLookupTimeout, - RegionReadTimeout: c.regionReadTimeout, } jsonVal, err := json.Marshal(state) @@ -263,11 +257,14 @@ func RegionLookupTimeout(to time.Duration) Option { } } -// RegionReadTimeout will return an option that sets the region read timeout +// RegionReadTimeout will return an option that sets the region read +// timeout +// +// Deprecated: RegionReadTimeouts have been replaced with TCP +// keepalives, which are configured by default. To use alternative +// keepalive configuration use [RegionDialer]. func RegionReadTimeout(to time.Duration) Option { - return func(c *client) { - c.regionReadTimeout = to - } + return func(c *client) {} } // EffectiveUser will return an option that will set the user used when accessing regions. diff --git a/debug_state_test.go b/debug_state_test.go index 251c669c..dd09af7e 100644 --- a/debug_state_test.go +++ b/debug_state_test.go @@ -23,7 +23,6 @@ func TestDebugStateSanity(t *testing.T) { QueueSize: defaultRPCQueueSize, FlushInterval: defaultFlushInterval, EffectiveUser: defaultEffectiveUser, - ReadTimeout: region.DefaultReadTimeout, Codec: client.compressionCodec, Logger: slog.Default(), } diff --git a/region/client.go b/region/client.go index 6f448c58..1dcf3cc4 100644 --- a/region/client.go +++ b/region/client.go @@ -90,8 +90,6 @@ var ( const ( //DefaultLookupTimeout is the default region lookup timeout DefaultLookupTimeout = 30 * time.Second - //DefaultReadTimeout is the default region read timeout - DefaultReadTimeout = 30 * time.Second // DefaultRPCQueueSize is the default size of the RPC queue DefaultRPCQueueSize = 100 // DefaultFlushInterval is the default interval for flushing RPCs @@ -107,6 +105,20 @@ const ( MasterClient = ClientType("MasterService") ) +var ( + defaultDialer = net.Dialer{ + KeepAliveConfig: net.KeepAliveConfig{ + Enable: true, + Idle: 15 * time.Second, + Interval: 10 * time.Second, + Count: 3, + }, + // tcpUserTimeout value should equal Idle + Interval*Count config. + // See https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die/ + ControlContext: tcpUserTimeoutControl(15*time.Second + 10*time.Second*3), + } +) + var bufferPool sync.Pool func newBuffer(size int) []byte { @@ -182,19 +194,12 @@ type client struct { sentM sync.Mutex // protects sent sent map[uint32]hrpc.Call - // inFlight is number of rpcs sent to regionserver awaiting response - inFlightM sync.Mutex // protects inFlight and SetReadDeadline - inFlight uint32 - id uint32 rpcQueueSize int flushInterval time.Duration effectiveUser string - // readTimeout is the maximum amount of time to wait for regionserver reply - readTimeout time.Duration - // compressor for cellblocks. if nil, then no compression compressor *compressor @@ -267,33 +272,6 @@ func (c *client) String() string { return fmt.Sprintf("RegionClient{Addr: %s}", c.addr) } -func (c *client) inFlightUp() error { - c.inFlightM.Lock() - c.inFlight++ - // we expect that at least the last request can be completed within readTimeout - if err := c.conn.SetReadDeadline(time.Now().Add(c.readTimeout)); err != nil { - c.inFlightM.Unlock() - return err - } - c.inFlightM.Unlock() - return nil -} - -func (c *client) inFlightDown() error { - c.inFlightM.Lock() - c.inFlight-- - // reset read timeout if we are not waiting for any responses - // in order to prevent from closing this client if there are no request - if c.inFlight == 0 { - if err := c.conn.SetReadDeadline(time.Time{}); err != nil { - c.inFlightM.Unlock() - return err - } - } - c.inFlightM.Unlock() - return nil -} - func (c *client) fail(err error) { c.failOnce.Do(func() { if err != ErrClientClosed { @@ -580,10 +558,6 @@ func (c *client) receive(r io.Reader) (err error) { return ServerError{fmt.Errorf("got a response with an unexpected call ID: %d", callID)} } - if err := c.inFlightDown(); err != nil { - return ServerError{err} - } - select { case <-rpc.Context().Done(): // context has expired, don't bother deserializing @@ -744,9 +718,6 @@ func (c *client) send(rpc hrpc.Call) (uint32, error) { return id, ServerError{err} } - if err := c.inFlightUp(); err != nil { - return id, ServerError{err} - } return id, nil } @@ -858,9 +829,9 @@ func (c *client) MarshalJSON() ([]byte, error) { } } - c.inFlightM.Lock() - inFlight := c.inFlight - c.inFlightM.Unlock() + c.sentM.Lock() + inFlight := len(c.sent) + c.sentM.Unlock() // if conn is nil then we don't want to panic. So just get the addresses if conn is not nil var localAddr, remoteAddr Address @@ -879,7 +850,7 @@ func (c *client) MarshalJSON() ([]byte, error) { ConnectionRemoteAddress Address RegionServerAddress string ClientType ClientType - InFlight uint32 + InFlight int Id uint32 Done_status string }{ diff --git a/region/client_test.go b/region/client_test.go index a49a2e27..86e1d469 100644 --- a/region/client_test.go +++ b/region/client_test.go @@ -193,7 +193,6 @@ func TestQueueRPCMultiWithClose(t *testing.T) { defer ctrl.Finish() mockConn := mock.NewMockConn(ctrl) - mockConn.EXPECT().SetReadDeadline(gomock.Any()).AnyTimes() ncalls := 1000 @@ -345,7 +344,6 @@ func TestQueueRPC(t *testing.T) { queueSize := 30 flushInterval := 20 * time.Millisecond mockConn := mock.NewMockConn(ctrl) - mockConn.EXPECT().SetReadDeadline(gomock.Any()).AnyTimes() c := &client{ conn: mockConn, rpcs: make(chan []hrpc.Call), @@ -514,7 +512,6 @@ func TestServerErrorExceptionResponse(t *testing.T) { ctrl := test.NewController(t) defer ctrl.Finish() mockConn := mock.NewMockConn(ctrl) - mockConn.EXPECT().SetReadDeadline(gomock.Any()).Times(2) c := &client{ conn: mockConn, rpcs: make(chan []hrpc.Call), @@ -530,9 +527,6 @@ func TestServerErrorExceptionResponse(t *testing.T) { } c.registerRPC(rpc) - if err := c.inFlightUp(); err != nil { - t.Fatal(err) - } var response []byte header := &pb.ResponseHeader{ @@ -626,7 +620,6 @@ func TestReceiveDecodeProtobufError(t *testing.T) { mockCall.EXPECT().Name().Return("Whatever").Times(1) c.sent[1] = mockCall - c.inFlight = 1 // Append mutate response with a chunk in the middle missing response := []byte{6, 8, 1, 26, 2, 8, 38, 34, 0, 0, 0, 22, @@ -636,7 +629,6 @@ func TestReceiveDecodeProtobufError(t *testing.T) { Do(func(buf []byte) { binary.BigEndian.PutUint32(buf, uint32(len(response))) }) mockConn.EXPECT().Read(readBufSizeMatcher{l: len(response)}).Times(1). Return(len(response), nil).Do(func(buf []byte) { copy(buf, response) }) - mockConn.EXPECT().SetReadDeadline(time.Time{}).Times(1) expErrorPefix := "region.RetryableError: failed to decode the response: proto:" err := c.receive(mockConn) @@ -675,7 +667,6 @@ func TestReceiveDeserializeCellblocksError(t *testing.T) { mockCall.EXPECT().Name().Return("Get").Times(1) c.sent[1] = callWithCellBlocksError{mockCall} - c.inFlight = 1 // Append mutate response response := []byte{6, 8, 1, 26, 2, 8, 38, 6, 10, 4, 16, 1, 32, 0, 0, 0, 0, 34, 0, 0, 0, 22, @@ -685,7 +676,6 @@ func TestReceiveDeserializeCellblocksError(t *testing.T) { Do(func(buf []byte) { binary.BigEndian.PutUint32(buf, uint32(len(response))) }) mockConn.EXPECT().Read(readBufSizeMatcher{l: len(response)}).Times(1). Return(len(response), nil).Do(func(buf []byte) { copy(buf, response) }) - mockConn.EXPECT().SetReadDeadline(time.Time{}).Times(1) expError := errors.New("region.RetryableError: failed to decode the response: OOPS") err := c.receive(mockConn) @@ -789,8 +779,6 @@ func TestProcessRPCs(t *testing.T) { wgWrite.Done() // test will timeout if didn't get at least minsent } }) - mockConn.EXPECT().SetReadDeadline(gomock.Any()). - MinTimes(tcase.minsent).MaxTimes(tcase.maxsent) calls := make([]hrpc.Call, tcase.ncalls) for i := range calls { @@ -824,10 +812,6 @@ func TestProcessRPCs(t *testing.T) { wgProcessRPCs.Wait() t.Log("num batches sent", sent) - if f := int(c.inFlight); f < tcase.minsent || f > tcase.maxsent { - t.Errorf("expected [%d:%d] in-flight rpcs, got %d", - tcase.minsent, tcase.maxsent, c.inFlight) - } }) } } @@ -849,8 +833,6 @@ func TestRPCContext(t *testing.T) { logger: slog.Default(), } - mockConn.EXPECT().SetReadDeadline(gomock.Any()).Times(1) - // queue rpc with background context mockCall := mock.NewMockCall(ctrl) mockCall.EXPECT().Name().Return("Get").Times(1) @@ -870,9 +852,6 @@ func TestRPCContext(t *testing.T) { // this shouldn't block c.QueueRPC(callWithCancel) - if int(c.inFlight) != 1 { - t.Errorf("expected %d in-flight rpcs, got %d", 1, c.inFlight) - } // clean up c.Close() } @@ -925,7 +904,6 @@ func TestSanity(t *testing.T) { NewInfo(0, nil, []byte("test1"), []byte("test1,,lololololololololololo"), nil, nil)) mockConn.EXPECT().Write(gomock.Any()).Times(2).Return(0, nil) - mockConn.EXPECT().SetReadDeadline(gomock.Any()).Times(1) c.QueueRPC(app) @@ -944,7 +922,6 @@ func TestSanity(t *testing.T) { mockConn.EXPECT().Read(gomock.Any()).MaxTimes(1). Return(0, errors.New("closed")).Do(func(buf []byte) { <-c.done }) }) - mockConn.EXPECT().SetReadDeadline(time.Time{}).Times(1) wg.Add(1) go func() { c.receiveRPCs() @@ -976,9 +953,6 @@ func TestSanity(t *testing.T) { if !proto.Equal(expResult, r.Result) { t.Errorf("expected %v, got %v", expResult, r.Result) } - if int(c.inFlight) != 0 { - t.Errorf("expected %d in-flight rpcs, got %d", 0, c.inFlight) - } mockConn.EXPECT().Close().Times(1) c.Close() @@ -1027,7 +1001,6 @@ func TestSanityCompressor(t *testing.T) { "\b\x01\x12\x1dtest1,,lololololololololololo\x12\f\n\x04yolo\x10\x000\x00@\x01")).Return( 71, nil) mockConn.EXPECT().Write([]byte(compressedCellblocks)).Return(58, nil) - mockConn.EXPECT().SetReadDeadline(gomock.Any()) c.QueueRPC(app) @@ -1070,7 +1043,6 @@ func TestSanityCompressor(t *testing.T) { mockConn.EXPECT().Read(gomock.Any()).MaxTimes(1). Return(0, errors.New("closed")).Do(func(buf []byte) { <-c.done }) }) - mockConn.EXPECT().SetReadDeadline(time.Time{}).Times(1) wg.Add(1) go func() { c.receiveRPCs() @@ -1102,9 +1074,6 @@ func TestSanityCompressor(t *testing.T) { if !proto.Equal(expResult, r.Result) { t.Errorf("expected %v, got %v", expResult, r.Result) } - if int(c.inFlight) != 0 { - t.Errorf("expected %d in-flight rpcs, got %d", 0, c.inFlight) - } mockConn.EXPECT().Close().Times(1) c.Close() @@ -1137,7 +1106,6 @@ func BenchmarkSendBatchMemory(b *testing.B) { mockConn.EXPECT().Write(gomock.Any()).AnyTimes().Return(0, nil).Do(func(buf []byte) { wgWrites.Done() }) - mockConn.EXPECT().SetReadDeadline(gomock.Any()).AnyTimes() go c.processRPCs() b.ResetTimer() @@ -1151,40 +1119,6 @@ func BenchmarkSendBatchMemory(b *testing.B) { // we don't care about cleaning up } -func BenchmarkSetReadDeadline(b *testing.B) { - l, err := net.Listen("tcp", "localhost:0") - if err != nil { - b.Fatal(err) - } - var wg sync.WaitGroup - wg.Add(1) - go func() { - conn, err := l.Accept() - if err != nil { - b.Error(err) - } - wg.Done() - conn.Close() - }() - - conn, err := net.Dial("tcp", l.Addr().String()) - if err != nil { - b.Fatal(err) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - if err := conn.SetReadDeadline(time.Now().Add(DefaultReadTimeout)); err != nil { - b.Fatal(err) - } - } - b.StopTimer() - - conn.Close() - l.Close() - wg.Wait() -} - func TestBuffer(t *testing.T) { size := 42 b := newBuffer(size) diff --git a/region/info_test.go b/region/info_test.go index a7bc2552..50cacde9 100644 --- a/region/info_test.go +++ b/region/info_test.go @@ -175,11 +175,9 @@ func TestRegionInfoMarshalJson(t *testing.T) { rpcs: make(chan []hrpc.Call), done: make(chan struct{}), sent: make(map[uint32]hrpc.Call), - inFlight: 20, effectiveUser: "effectiveUser", rpcQueueSize: queueSize, flushInterval: flushInterval, - readTimeout: DefaultReadTimeout, } ctx, cancel := context.WithCancel(context.Background()) diff --git a/region/net_linux.go b/region/net_linux.go new file mode 100644 index 00000000..a6306551 --- /dev/null +++ b/region/net_linux.go @@ -0,0 +1,26 @@ +// Copyright (C) 2025 The GoHBase Authors. All rights reserved. +// This file is part of GoHBase. +// Use of this source code is governed by the Apache License 2.0 +// that can be found in the COPYING file. + +package region + +import ( + "context" + "syscall" + "time" + + "golang.org/x/sys/unix" +) + +func tcpUserTimeoutControl(timeout time.Duration) func( + context.Context, string, string, syscall.RawConn) error { + return func(_ context.Context, network, address string, c syscall.RawConn) error { + var err error + c.Control(func(fd uintptr) { + err = syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, unix.TCP_USER_TIMEOUT, + int(timeout.Milliseconds())) + }) + return err + } +} diff --git a/region/net_other.go b/region/net_other.go new file mode 100644 index 00000000..c2f05617 --- /dev/null +++ b/region/net_other.go @@ -0,0 +1,20 @@ +// Copyright (C) 2025 The GoHBase Authors. All rights reserved. +// This file is part of GoHBase. +// Use of this source code is governed by the Apache License 2.0 +// that can be found in the COPYING file. + +//go:build !linux + +package region + +import ( + "context" + "syscall" + "time" +) + +// tcpUserTimeoutControl is not supported on OS's other than linux +func tcpUserTimeoutControl(timeout time.Duration) func( + context.Context, string, string, syscall.RawConn) error { + return nil +} diff --git a/region/new.go b/region/new.go index a8af02bb..b8f6a4fe 100644 --- a/region/new.go +++ b/region/new.go @@ -26,8 +26,6 @@ type RegionClientOptions struct { FlushInterval time.Duration // EffectiveUser sets the effective user for the connection EffectiveUser string - // ReadTimeout sets the read timeout for RPCs - ReadTimeout time.Duration // Codec sets the compression codec for cellblocks Codec compression.Codec // Dialer sets a custom dialer for connecting to region servers @@ -66,19 +64,13 @@ func NewClient(addr string, ctype ClientType, opts *RegionClientOptions) hrpc.Re rpcQueueSize: DefaultRPCQueueSize, flushInterval: DefaultFlushInterval, effectiveUser: DefaultEffectiveUser, - readTimeout: DefaultReadTimeout, + dialer: defaultDialer.DialContext, + logger: slog.Default(), rpcs: make(chan []hrpc.Call), done: make(chan struct{}), sent: make(map[uint32]hrpc.Call), } - // Set default dialer - var d net.Dialer - c.dialer = d.DialContext - - // Set default logger - c.logger = slog.Default() - // Apply options if provided if opts != nil { if opts.QueueSize > 0 { @@ -90,9 +82,6 @@ func NewClient(addr string, ctype ClientType, opts *RegionClientOptions) hrpc.Re if opts.EffectiveUser != "" { c.effectiveUser = opts.EffectiveUser } - if opts.ReadTimeout > 0 { - c.readTimeout = opts.ReadTimeout - } if opts.Codec != nil { c.compressor = &compressor{Codec: opts.Codec} } diff --git a/rpc.go b/rpc.go index 2c56fdad..e727a55e 100644 --- a/rpc.go +++ b/rpc.go @@ -1041,7 +1041,6 @@ func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) { QueueSize: c.rpcQueueSize, FlushInterval: c.flushInterval, EffectiveUser: c.effectiveUser, - ReadTimeout: c.regionReadTimeout, Dialer: c.regionDialer, Logger: c.logger, } @@ -1052,7 +1051,6 @@ func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) { QueueSize: c.rpcQueueSize, FlushInterval: c.flushInterval, EffectiveUser: c.effectiveUser, - ReadTimeout: c.regionReadTimeout, Codec: c.compressionCodec, Dialer: c.regionDialer, Logger: c.logger, diff --git a/rpc_test.go b/rpc_test.go index 09b2bf1f..8f9a52c0 100644 --- a/rpc_test.go +++ b/rpc_test.go @@ -64,7 +64,6 @@ func newMockClient(zkClient zk.Client) *client { zkTimeout: defaultZkTimeout, zkClient: zkClient, regionLookupTimeout: region.DefaultLookupTimeout, - regionReadTimeout: region.DefaultReadTimeout, newRegionClientFn: newMockRegionClient, logger: slog.Default(), }