From 032b058aad10899d37f217d8a55c23f43ce45ab8 Mon Sep 17 00:00:00 2001 From: Thibault Deutsch Date: Fri, 5 Jul 2024 19:06:29 +0100 Subject: [PATCH] hrpc: add an option to skip retry This let the caller tell GoHBase to no retry calls that failed. Otherwise, GoHBase will retry forever until cancellation of the context. But sometime, it is useful to to be able to say "try only once", without just relying on having a small context timeout. One of the place where this is useful is when we try to close a scanner that we prematurely stopped, as this is just the client trying to be a good citizen. But most likely, if the request failed with a retryable error (region closed or moved, call queue too big, etc), by the time we manage to send the request the scanner lease would have been already expired on HBase side. --- hrpc/call.go | 25 +++++++++++++++++++++++++ region/multi.go | 5 +++++ rpc.go | 20 +++++++++++--------- rpc_test.go | 1 + scanner.go | 16 +++++++--------- test/mock/call.go | 14 ++++++++++++++ 6 files changed, 63 insertions(+), 18 deletions(-) diff --git a/hrpc/call.go b/hrpc/call.go index a15e6e4c..04c618e5 100644 --- a/hrpc/call.go +++ b/hrpc/call.go @@ -61,6 +61,7 @@ type Call interface { ResultChan() chan RPCResult Description() string // Used for tracing and metrics Context() context.Context + SkipRetry() bool } type withOptions interface { @@ -90,6 +91,20 @@ func SkipBatch() func(Call) error { } } +func SkipRetry() func(Call) error { + return func(c Call) error { + if b, ok := c.(canSetSkipRetry); ok { + b.setSkipRetry(true) + return nil + } + return errors.New("'SkipRetry' is not implemented for this call") + } +} + +type canSetSkipRetry interface { + setSkipRetry(v bool) +} + // hasQueryOptions is interface that needs to be implemented by calls // that allow to provide Families and Filters options. type hasQueryOptions interface { @@ -119,12 +134,22 @@ type base struct { region RegionInfo resultch chan RPCResult + + skipRetry bool } func (b *base) Context() context.Context { return b.ctx } +func (b *base) SkipRetry() bool { + return b.skipRetry +} + +func (b *base) setSkipRetry() { + b.skipRetry = true +} + func (b *base) Region() RegionInfo { return b.region } diff --git a/region/multi.go b/region/multi.go index 29784928..219ce61e 100644 --- a/region/multi.go +++ b/region/multi.go @@ -323,6 +323,11 @@ func (m *multi) Context() context.Context { return context.Background() } +// SkipRetry always returns false for Multi. +func (m *multi) SkipRetry() bool { + return false +} + // String returns a description of this call func (m *multi) String() string { return "MULTI" diff --git a/rpc.go b/rpc.go index b13e5788..f3264873 100644 --- a/rpc.go +++ b/rpc.go @@ -97,16 +97,18 @@ func (c *client) SendRPC(rpc hrpc.Call) (msg proto.Message, err error) { return nil, err } msg, err = c.sendRPCToRegionClient(ctx, rpc, rc) - switch err.(type) { - case region.RetryableError: - sp.AddEvent("retrySleep") - backoff, err = sleepAndIncreaseBackoff(ctx, backoff) - if err != nil { - return msg, err + if !rpc.SkipRetry() { + switch err.(type) { + case region.RetryableError: + sp.AddEvent("retrySleep") + backoff, err = sleepAndIncreaseBackoff(ctx, backoff) + if err != nil { + return msg, err + } + continue // retry + case region.ServerError, region.NotServingRegionError: + continue // retry } - continue // retry - case region.ServerError, region.NotServingRegionError: - continue // retry } return msg, err } diff --git a/rpc_test.go b/rpc_test.go index be8a925f..786e2d41 100644 --- a/rpc_test.go +++ b/rpc_test.go @@ -88,6 +88,7 @@ func TestSendRPCSanity(t *testing.T) { expMsg := &pb.ScanResponse{} result <- hrpc.RPCResult{Msg: expMsg} mockCall.EXPECT().ResultChan().Return(result).Times(1) + mockCall.EXPECT().SkipRetry().Return(false) msg, err := c.SendRPC(mockCall) if err != nil { t.Fatal(err) diff --git a/scanner.go b/scanner.go index a5aaa698..2a92008d 100644 --- a/scanner.go +++ b/scanner.go @@ -337,22 +337,20 @@ func (s *scanner) closeRegionScanner() { return } if !s.rpc.IsClosing() { - // Not closed at server side - // if we are closing in the middle of scanning a region, - // send a close scanner request - // TODO: add a deadline + // Not closed at server side if we are closing in the middle of scanning + // a region, so send a close scanner request. This is a fire-and-forget + // call, as if we fail the scanner lease will expire and be closed + // automatically by HBase. rpc, err := hrpc.NewScanRange(context.Background(), s.rpc.Table(), s.startRow, nil, hrpc.ScannerID(s.curRegionScannerID), hrpc.CloseScanner(), - hrpc.NumberOfRows(0)) + hrpc.NumberOfRows(0), + hrpc.SkipRetry(), + ) if err != nil { panic(fmt.Sprintf("should not happen: %s", err)) } - - // If the request fails, the scanner lease will be expired - // and it will be closed automatically by hbase. - // No need to bother clients about that. go s.SendRPC(rpc) } s.curRegionScannerID = noScannerID diff --git a/test/mock/call.go b/test/mock/call.go index faf1eeb2..f0de7df8 100644 --- a/test/mock/call.go +++ b/test/mock/call.go @@ -151,6 +151,20 @@ func (mr *MockCallMockRecorder) SetRegion(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetRegion", reflect.TypeOf((*MockCall)(nil).SetRegion), arg0) } +// SkipRetry mocks base method. +func (m *MockCall) SkipRetry() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SkipRetry") + ret0, _ := ret[0].(bool) + return ret0 +} + +// SkipRetry indicates an expected call of SkipRetry. +func (mr *MockCallMockRecorder) SkipRetry() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SkipRetry", reflect.TypeOf((*MockCall)(nil).SkipRetry)) +} + // Table mocks base method. func (m *MockCall) Table() []byte { m.ctrl.T.Helper()