Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func newAdminClient(zkquorum string, options ...Option) AdminClient {
zkRoot: defaultZkRoot,
effectiveUser: defaultEffectiveUser,
regionLookupTimeout: region.DefaultLookupTimeout,
regionReadTimeout: region.DefaultReadTimeout,
newRegionClientFn: region.NewClient,
logger: slog.Default(),
}
Expand Down
17 changes: 7 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,6 @@ type client struct {
// session timeout.
regionLookupTimeout time.Duration

// regionReadTimeout is the maximum amount of time to wait for regionserver reply
regionReadTimeout time.Duration

done chan struct{}
closeOnce sync.Once

Expand Down Expand Up @@ -139,7 +136,6 @@ func newClient(zkquorum string, options ...Option) *client {
zkTimeout: defaultZkTimeout,
effectiveUser: defaultEffectiveUser,
regionLookupTimeout: region.DefaultLookupTimeout,
regionReadTimeout: region.DefaultReadTimeout,
done: make(chan struct{}),
newRegionClientFn: func(addr string, ctype region.ClientType,
options *region.RegionClientOptions) hrpc.RegionClient {
Expand Down Expand Up @@ -216,7 +212,6 @@ func (c *client) MarshalJSON() ([]byte, error) {
AdminRegionInfo hrpc.RegionInfo
Done_Status string
RegionLookupTimeout time.Duration
RegionReadTimeout time.Duration
}{
ClientType: c.clientType,
ClientRegionMap: clientRegionsMap,
Expand All @@ -227,7 +222,6 @@ func (c *client) MarshalJSON() ([]byte, error) {
AdminRegionInfo: c.adminRegionInfo,
Done_Status: done,
RegionLookupTimeout: c.regionLookupTimeout,
RegionReadTimeout: c.regionReadTimeout,
}

jsonVal, err := json.Marshal(state)
Expand Down Expand Up @@ -263,11 +257,14 @@ func RegionLookupTimeout(to time.Duration) Option {
}
}

// RegionReadTimeout will return an option that sets the region read timeout
// RegionReadTimeout will return an option that sets the region read
// timeout
//
// Deprecated: RegionReadTimeouts have been replaced with TCP
// keepalives, which are configured by default. To use alternative
// keepalive configuration use [RegionDialer].
func RegionReadTimeout(to time.Duration) Option {
return func(c *client) {
c.regionReadTimeout = to
}
return func(c *client) {}
}

// EffectiveUser will return an option that will set the user used when accessing regions.
Expand Down
1 change: 0 additions & 1 deletion debug_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ func TestDebugStateSanity(t *testing.T) {
QueueSize: defaultRPCQueueSize,
FlushInterval: defaultFlushInterval,
EffectiveUser: defaultEffectiveUser,
ReadTimeout: region.DefaultReadTimeout,
Codec: client.compressionCodec,
Logger: slog.Default(),
}
Expand Down
65 changes: 18 additions & 47 deletions region/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ var (
const (
//DefaultLookupTimeout is the default region lookup timeout
DefaultLookupTimeout = 30 * time.Second
//DefaultReadTimeout is the default region read timeout
DefaultReadTimeout = 30 * time.Second
// DefaultRPCQueueSize is the default size of the RPC queue
DefaultRPCQueueSize = 100
// DefaultFlushInterval is the default interval for flushing RPCs
Expand All @@ -107,6 +105,20 @@ const (
MasterClient = ClientType("MasterService")
)

var (
defaultDialer = net.Dialer{
KeepAliveConfig: net.KeepAliveConfig{
Enable: true,
Idle: 15 * time.Second,
Interval: 10 * time.Second,
Count: 3,
},
// tcpUserTimeout value should equal Idle + Interval*Count config.
// See https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die/
ControlContext: tcpUserTimeoutControl(15*time.Second + 10*time.Second*3),
}
)

var bufferPool sync.Pool

func newBuffer(size int) []byte {
Expand Down Expand Up @@ -182,19 +194,12 @@ type client struct {
sentM sync.Mutex // protects sent
sent map[uint32]hrpc.Call

// inFlight is number of rpcs sent to regionserver awaiting response
inFlightM sync.Mutex // protects inFlight and SetReadDeadline
inFlight uint32

id uint32

rpcQueueSize int
flushInterval time.Duration
effectiveUser string

// readTimeout is the maximum amount of time to wait for regionserver reply
readTimeout time.Duration

// compressor for cellblocks. if nil, then no compression
compressor *compressor

Expand Down Expand Up @@ -267,33 +272,6 @@ func (c *client) String() string {
return fmt.Sprintf("RegionClient{Addr: %s}", c.addr)
}

func (c *client) inFlightUp() error {
c.inFlightM.Lock()
c.inFlight++
// we expect that at least the last request can be completed within readTimeout
if err := c.conn.SetReadDeadline(time.Now().Add(c.readTimeout)); err != nil {
c.inFlightM.Unlock()
return err
}
c.inFlightM.Unlock()
return nil
}

func (c *client) inFlightDown() error {
c.inFlightM.Lock()
c.inFlight--
// reset read timeout if we are not waiting for any responses
// in order to prevent from closing this client if there are no request
if c.inFlight == 0 {
if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
c.inFlightM.Unlock()
return err
}
}
c.inFlightM.Unlock()
return nil
}

func (c *client) fail(err error) {
c.failOnce.Do(func() {
if err != ErrClientClosed {
Expand Down Expand Up @@ -580,10 +558,6 @@ func (c *client) receive(r io.Reader) (err error) {
return ServerError{fmt.Errorf("got a response with an unexpected call ID: %d", callID)}
}

if err := c.inFlightDown(); err != nil {
return ServerError{err}
}

select {
case <-rpc.Context().Done():
// context has expired, don't bother deserializing
Expand Down Expand Up @@ -744,9 +718,6 @@ func (c *client) send(rpc hrpc.Call) (uint32, error) {
return id, ServerError{err}
}

if err := c.inFlightUp(); err != nil {
return id, ServerError{err}
}
return id, nil
}

Expand Down Expand Up @@ -858,9 +829,9 @@ func (c *client) MarshalJSON() ([]byte, error) {
}
}

c.inFlightM.Lock()
inFlight := c.inFlight
c.inFlightM.Unlock()
c.sentM.Lock()
inFlight := len(c.sent)
c.sentM.Unlock()

// if conn is nil then we don't want to panic. So just get the addresses if conn is not nil
var localAddr, remoteAddr Address
Expand All @@ -879,7 +850,7 @@ func (c *client) MarshalJSON() ([]byte, error) {
ConnectionRemoteAddress Address
RegionServerAddress string
ClientType ClientType
InFlight uint32
InFlight int
Id uint32
Done_status string
}{
Expand Down
Loading