From 84690c59dffb0cf2c130a9d150c7188157907c43 Mon Sep 17 00:00:00 2001 From: Aaron Beitch Date: Tue, 31 Dec 2024 14:30:04 -0800 Subject: [PATCH] region: Add BenchmarkReceive Example output: BenchmarkReceive/scanSmall-14 2600080 443.0 ns/op 1040 B/op 17 allocs/op BenchmarkReceive/scanResult200KBCellBlocks-14 18249 65157 ns/op 354860 B/op 2096 allocs/op BenchmarkReceive/scanResult2MBCellBlocks-14 2203 532341 ns/op 3626167 B/op 21338 allocs/op BenchmarkReceive/mutate-14 7684842 154.1 ns/op 140 B/op 5 allocs/op BenchmarkReceive/multiMutate100-14 97549 12104 ns/op 27490 B/op 415 allocs/op --- region/client_test.go | 245 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 245 insertions(+) diff --git a/region/client_test.go b/region/client_test.go index a66ef773..bcfbe436 100644 --- a/region/client_test.go +++ b/region/client_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/tsuna/gohbase/compression/snappy" "github.com/tsuna/gohbase/hrpc" "github.com/tsuna/gohbase/pb" "github.com/tsuna/gohbase/test" @@ -1318,3 +1319,247 @@ func BenchmarkSendScanRequest(b *testing.B) { } }) } + +func makeResponse(callID uint32, response proto.Message, cellblocks []byte, + compressor *compressor) []byte { + b := make([]byte, 4) // Reserve 4 bytes for size + header := &pb.ResponseHeader{ + CallId: proto.Uint32(callID), + } + if len(cellblocks) > 0 { + if compressor != nil { + cellblocks = compressor.compressCellblocks( + net.Buffers{cellblocks}, uint32(len(cellblocks))) + } + + header.CellBlockMeta = &pb.CellBlockMeta{Length: proto.Uint32(uint32(len(cellblocks)))} + } + + b = protowire.AppendVarint(b, uint64(proto.Size(header))) + var err error + b, err = proto.MarshalOptions{}.MarshalAppend(b, header) + if err != nil { + panic(err) + } + + b = protowire.AppendVarint(b, uint64(proto.Size(response))) + b, err = proto.MarshalOptions{}.MarshalAppend(b, response) + if err != nil { + panic(err) + } + + b = append(b, cellblocks...) + + // Put the final size in the first 4 bytes + binary.BigEndian.PutUint32(b[:4], uint32(len(b)-4)) + return b +} + +func cellblockLen(rowLen, familyLen, qualifierLen, valueLen int) int { + keyLength := 2 + rowLen + 1 + familyLen + qualifierLen + 8 + 1 + keyValueLength := 4 + 4 + keyLength + valueLen + return 4 + keyValueLength +} + +// Copied from hrpc/mutate.go +func appendCellblock(row []byte, family, qualifier string, value []byte, ts uint64, typ byte, + cbs []byte) []byte { + // cellblock layout: + // + // Header: + // 4 byte length of key + value + // 4 byte length of key + // 4 byte length of value + // + // Key: + // 2 byte length of row + // + // 1 byte length of row family + // + // + // 8 byte timestamp + // 1 byte type + // + // Value: + // + keylength := 2 + len(row) + 1 + len(family) + len(qualifier) + 8 + 1 + valuelength := len(value) + + keyvaluelength := 4 + 4 + keylength + valuelength + i := len(cbs) + cbs = append(cbs, make([]byte, + cellblockLen(len(row), len(family), len(qualifier), len(value)))...) + + // Header: + binary.BigEndian.PutUint32(cbs[i:], uint32(keyvaluelength)) + i += 4 + binary.BigEndian.PutUint32(cbs[i:], uint32(keylength)) + i += 4 + binary.BigEndian.PutUint32(cbs[i:], uint32(valuelength)) + i += 4 + + // Key: + binary.BigEndian.PutUint16(cbs[i:], uint16(len(row))) + i += 2 + i += copy(cbs[i:], row) + cbs[i] = byte(len(family)) + i++ + i += copy(cbs[i:], family) + i += copy(cbs[i:], qualifier) + binary.BigEndian.PutUint64(cbs[i:], ts) + i += 8 + cbs[i] = typ + i++ + + // Value: + copy(cbs[i:], value) + + return cbs +} + +type fakeConn struct { + net.Conn +} + +func (fakeConn) SetReadDeadline(t time.Time) error { return nil } + +func BenchmarkReceive(b *testing.B) { + benchmark := func(c *client, reader *bytes.Reader, resp []byte, + call hrpc.Call) func(*testing.B) { + return func(b *testing.B) { + b.ReportAllocs() + for range b.N { + // Set read buffer to the encoded response + reader.Reset(resp) + // Put the RPC in the sent map so that it can be found by + // receive + c.sent[1] = call + if err := c.receive(reader); err != nil { + b.Fatal(err) + } + // Consume the result so that this same request can be + // reused on the next iteration + <-call.ResultChan() + } + } + } + + c := &client{ + conn: fakeConn{}, + sent: make(map[uint32]hrpc.Call), + compressor: &compressor{snappy.New()}, + } + reader := bytes.NewReader(nil) + + scan, err := hrpc.NewScanStr(context.Background(), "table") + if err != nil { + b.Fatal(err) + } + + cell := appendCellblock( + bytes.Repeat([]byte("0123456789"), 5), // 50-byte key + "f", + string(bytes.Repeat([]byte("9876543210"), 2)), // 20-byte qualifier + bytes.Repeat([]byte("abcdefghij"), 20), // 200-byte value + 17356887651735688765, + byte(pb.CellType_PUT), + nil) + + scanResponse := &pb.ScanResponse{ + // TODO: Should this be 1 result with cellCount cells, or + // cellCount results each with 1 cell? + CellsPerResult: []uint32{uint32(1)}, + PartialFlagPerResult: []bool{false}, + } + resp := makeResponse(1, scanResponse, cell, c.compressor) + b.Run("scanSmall", benchmark(c, reader, resp, scan)) + + const twoHunderedKiB = 200 * 1024 + cellCount200KB := twoHunderedKiB / len(cell) + cells200KB := bytes.Repeat(cell, cellCount200KB) + + b.Logf("200KB Cell Count: %d", cellCount200KB) + + scanResponse200KB := &pb.ScanResponse{ + // TODO: Should this be 1 result with cellCount cells, or + // cellCount results each with 1 cell? + CellsPerResult: []uint32{uint32(cellCount200KB)}, + PartialFlagPerResult: []bool{false}, + } + resp = makeResponse(1, scanResponse200KB, cells200KB, c.compressor) + + b.Run("scanResult200KBCellBlocks", benchmark(c, reader, resp, scan)) + + const twoMiB = 2 * 1024 * 1024 + cellCount2MB := twoMiB / len(cell) + cells2MB := bytes.Repeat(cell, cellCount2MB) + + b.Logf("2MB Cell Count: %d", cellCount2MB) + + scanResponse2MB := &pb.ScanResponse{ + // TODO: Should this be 1 result with cellCount cells, or + // cellCount results each with 1 cell? + CellsPerResult: []uint32{uint32(cellCount2MB)}, + PartialFlagPerResult: []bool{false}, + } + resp = makeResponse(1, scanResponse2MB, cells2MB, c.compressor) + + b.Run("scanResult2MBCellBlocks", benchmark(c, reader, resp, scan)) + + put, err := hrpc.NewPutStr(context.Background(), "table", "key", + map[string]map[string][]byte{"cf": {"a": []byte("1")}}) + if err != nil { + b.Fatal(err) + } + // Simple puts have an empty response + resp = makeResponse(1, &pb.MutateResponse{}, nil, nil) + + b.Run("mutate", benchmark(c, reader, resp, put)) + + multi := newMulti(100) + multiResp := &pb.MultiResponse{ + RegionActionResult: []*pb.RegionActionResult{{}}, + } + calls := make([]hrpc.Call, 0, 100) + for i := range 100 { + put, err := hrpc.NewPutStr(context.Background(), "table", "key", + map[string]map[string][]byte{"cf": {"a": []byte("1")}}) + if err != nil { + b.Fatal(err) + } + calls = append(calls, put) + + multiResp.RegionActionResult[0].ResultOrException = append( + multiResp.RegionActionResult[0].ResultOrException, + &pb.ResultOrException{ + Index: proto.Uint32(uint32(i + 1)), + Result: &pb.Result{}, + }, + ) + } + + multi.add(calls) + resp = makeResponse(1, multiResp, nil, nil) + + b.Run("multiMutate100", func(b *testing.B) { + b.ReportAllocs() + for range b.N { + reader.Reset(resp) + if multi.len() != 100 { + b.Fatalf("unexpected len: %d", multi.len()) + } + c.sent[1] = multi + if err := c.receive(reader); err != nil { + b.Fatal(err) + } + // Consume the results so that this same request can + // be reused on the next iteration + for _, c := range calls { + <-c.ResultChan() + } + // Need to do this on every iteration because + // returnResults on a multi resets the fields of multi + multi.add(calls) + } + }) +}