From 9fca7c709bceb6edd7e9d2c39995d18a69a65ad2 Mon Sep 17 00:00:00 2001 From: xiaofanli Date: Mon, 6 Dec 2021 16:26:54 +0800 Subject: [PATCH 1/4] admin api create/delete/disable/enable support tablename like ns:name --- hrpc/create.go | 13 ++++++++++--- hrpc/delete.go | 12 +++++++++--- hrpc/disable.go | 10 ++++++++-- hrpc/enable.go | 12 +++++++++--- 4 files changed, 36 insertions(+), 11 deletions(-) diff --git a/hrpc/create.go b/hrpc/create.go index c69c244e..1417f51b 100644 --- a/hrpc/create.go +++ b/hrpc/create.go @@ -6,6 +6,7 @@ package hrpc import ( + "bytes" "context" "github.com/tsuna/gohbase/pb" @@ -97,12 +98,18 @@ func (ct *CreateTable) ToProto() proto.Message { } pbFamilies = append(pbFamilies, f) } + + namespace := []byte("default") + table := ct.table + if i := bytes.Index(table, []byte(":")); i > -1 { + namespace = table[:i] + table = table[i+1:] + } return &pb.CreateTableRequest{ TableSchema: &pb.TableSchema{ TableName: &pb.TableName{ - // TODO: handle namespaces - Namespace: []byte("default"), - Qualifier: ct.table, + Namespace: namespace, + Qualifier: table, }, ColumnFamilies: pbFamilies, }, diff --git a/hrpc/delete.go b/hrpc/delete.go index 072fb80d..5b9f740f 100644 --- a/hrpc/delete.go +++ b/hrpc/delete.go @@ -6,6 +6,7 @@ package hrpc import ( + "bytes" "context" "github.com/tsuna/gohbase/pb" @@ -41,11 +42,16 @@ func (dt *DeleteTable) Description() string { // ToProto converts the RPC into a protobuf message func (dt *DeleteTable) ToProto() proto.Message { + namespace := []byte("default") + table := dt.table + if i := bytes.Index(table, []byte(":")); i > -1 { + namespace = table[:i] + table = table[i+1:] + } return &pb.DeleteTableRequest{ TableName: &pb.TableName{ - // TODO: hadle namespaces properly - Namespace: []byte("default"), - Qualifier: dt.table, + Namespace: namespace, + Qualifier: table, }, } } diff --git a/hrpc/disable.go b/hrpc/disable.go index ae51f7aa..df6bd3c5 100644 --- a/hrpc/disable.go +++ b/hrpc/disable.go @@ -6,6 +6,7 @@ package hrpc import ( + "bytes" "context" "github.com/tsuna/gohbase/pb" @@ -41,10 +42,15 @@ func (dt *DisableTable) Description() string { // ToProto converts the RPC into a protobuf message func (dt *DisableTable) ToProto() proto.Message { + namespace := []byte("default") + table := dt.table + if i := bytes.Index(table, []byte(":")); i > -1 { + namespace = table[:i] + table = table[i+1:] + } return &pb.DisableTableRequest{ TableName: &pb.TableName{ - // TODO: handle namespaces - Namespace: []byte("default"), + Namespace: namespace, Qualifier: dt.table, }, } diff --git a/hrpc/enable.go b/hrpc/enable.go index 1172bf17..a2904678 100644 --- a/hrpc/enable.go +++ b/hrpc/enable.go @@ -6,6 +6,7 @@ package hrpc import ( + "bytes" "context" "github.com/tsuna/gohbase/pb" @@ -41,11 +42,16 @@ func (et *EnableTable) Description() string { // ToProto converts the RPC into a protobuf message func (et *EnableTable) ToProto() proto.Message { + namespace := []byte("default") + table := et.table + if i := bytes.Index(table, []byte(":")); i > -1 { + namespace = table[:i] + table = table[i+1:] + } return &pb.EnableTableRequest{ TableName: &pb.TableName{ - // TODO: handle namespaces - Namespace: []byte("default"), - Qualifier: et.table, + Namespace: namespace, + Qualifier: table, }, } } From 08de0230cf1fdeeb9234bd25c2cd84fcab2bec69 Mon Sep 17 00:00:00 2001 From: lixiaofan Date: Mon, 6 Dec 2021 16:36:39 +0800 Subject: [PATCH 2/4] fix disable qualifier --- hrpc/disable.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hrpc/disable.go b/hrpc/disable.go index df6bd3c5..36197963 100644 --- a/hrpc/disable.go +++ b/hrpc/disable.go @@ -51,7 +51,7 @@ func (dt *DisableTable) ToProto() proto.Message { return &pb.DisableTableRequest{ TableName: &pb.TableName{ Namespace: namespace, - Qualifier: dt.table, + Qualifier: table, }, } } From 4d35c9ef343cf99f6bd1fa48e62eef3a9ff06f2f Mon Sep 17 00:00:00 2001 From: xiaofanli Date: Thu, 9 Jun 2022 13:11:31 +0800 Subject: [PATCH 3/4] update create --- hrpc/create.go | 45 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/hrpc/create.go b/hrpc/create.go index 1417f51b..d73e94c6 100644 --- a/hrpc/create.go +++ b/hrpc/create.go @@ -17,8 +17,10 @@ import ( type CreateTable struct { base - families map[string]map[string]string - splitKeys [][]byte + families map[string]map[string]string + splitKeys [][]byte + attributes map[string]string + configuration map[string]string } var defaultAttributes = map[string]string{ @@ -47,7 +49,9 @@ func NewCreateTable(ctx context.Context, table []byte, ctx: ctx, resultch: make(chan RPCResult, 1), }, - families: make(map[string]map[string]string, len(families)), + families: make(map[string]map[string]string, len(families)), + attributes: make(map[string]string), + configuration: make(map[string]string), } for _, option := range options { option(ct) @@ -72,6 +76,20 @@ func SplitKeys(sk [][]byte) func(*CreateTable) { } } +// Attributes will add create table attributes params +func Attributes(key, value string) func(table *CreateTable) { + return func(ct *CreateTable) { + ct.attributes[key] = value + } +} + +// Configuration will add create table conf +func Configuration(name, value string) func(table *CreateTable) { + return func(ct *CreateTable) { + ct.configuration[name] = value + } +} + // Name returns the name of this RPC call. func (ct *CreateTable) Name() string { return "CreateTable" @@ -99,6 +117,25 @@ func (ct *CreateTable) ToProto() proto.Message { pbFamilies = append(pbFamilies, f) } + // Attributes + pbAttributes := make([]*pb.BytesBytesPair, 0, len(ct.attributes)) + for k, v := range ct.attributes { + pbAttributes = append(pbAttributes, &pb.BytesBytesPair{ + First: []byte(k), + Second: []byte(v), + }) + } + + // Configuration + pbConfiguration := make([]*pb.NameStringPair, 0, len(ct.configuration)) + for k, v := range ct.configuration { + pbConfiguration = append(pbConfiguration, &pb.NameStringPair{ + Name: proto.String(k), + Value: proto.String(v), + }) + } + + // TableName namespace := []byte("default") table := ct.table if i := bytes.Index(table, []byte(":")); i > -1 { @@ -112,6 +149,8 @@ func (ct *CreateTable) ToProto() proto.Message { Qualifier: table, }, ColumnFamilies: pbFamilies, + Attributes: pbAttributes, + Configuration: pbConfiguration, }, SplitKeys: ct.splitKeys, } From 3e590b69a056e99a5e916419d5eb086c86d4fa71 Mon Sep 17 00:00:00 2001 From: xiaofanli Date: Mon, 5 Sep 2022 19:16:09 +0800 Subject: [PATCH 4/4] add admin api:descriptor and modify --- admin_client.go | 30 +++++++++++++++++++++++- hrpc/call.go | 15 ++++++++++++ hrpc/create.go | 8 +------ hrpc/delete.go | 8 +------ hrpc/descriptor.go | 57 ++++++++++++++++++++++++++++++++++++++++++++++ hrpc/disable.go | 8 +------ hrpc/enable.go | 8 +------ hrpc/modify.go | 56 +++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 161 insertions(+), 29 deletions(-) create mode 100644 hrpc/descriptor.go create mode 100644 hrpc/modify.go diff --git a/admin_client.go b/admin_client.go index 19eba18d..99b808c8 100644 --- a/admin_client.go +++ b/admin_client.go @@ -27,6 +27,8 @@ const ( // AdminClient to perform admistrative operations with HMaster type AdminClient interface { CreateTable(t *hrpc.CreateTable) error + ModifyTable(t *hrpc.ModifyTable) error + GetTableDescriptors(t *hrpc.GetTableDescriptors) ([]*pb.TableSchema, error) DeleteTable(t *hrpc.DeleteTable) error EnableTable(t *hrpc.EnableTable) error DisableTable(t *hrpc.DisableTable) error @@ -71,7 +73,7 @@ func newAdminClient(zkquorum string, options ...Option) AdminClient { return c } -//Get the status of the cluster +// Get the status of the cluster func (c *client) ClusterStatus() (*pb.ClusterStatus, error) { pbmsg, err := c.SendRPC(hrpc.NewClusterStatus()) if err != nil { @@ -100,6 +102,32 @@ func (c *client) CreateTable(t *hrpc.CreateTable) error { return c.checkProcedureWithBackoff(t.Context(), r.GetProcId()) } +func (c *client) ModifyTable(t *hrpc.ModifyTable) error { + pbmsg, err := c.SendRPC(t) + if err != nil { + return err + } + + _, ok := pbmsg.(*pb.ModifyTableResponse) + if !ok { + return fmt.Errorf("sendRPC returned not a ModifyTableResponse") + } + return nil +} + +func (c *client) GetTableDescriptors(t *hrpc.GetTableDescriptors) ([]*pb.TableSchema, error) { + pbmsg, err := c.SendRPC(t) + if err != nil { + return nil, err + } + + r, ok := pbmsg.(*pb.GetTableDescriptorsResponse) + if !ok { + return nil, fmt.Errorf("sendRPC returned not a GetTableDescriptorsResponse") + } + return r.GetTableSchema(), nil +} + func (c *client) DeleteTable(t *hrpc.DeleteTable) error { pbmsg, err := c.SendRPC(t) if err != nil { diff --git a/hrpc/call.go b/hrpc/call.go index d90355a8..2d4a6ec0 100644 --- a/hrpc/call.go +++ b/hrpc/call.go @@ -6,6 +6,7 @@ package hrpc import ( + "bytes" "context" "encoding/binary" "errors" @@ -162,6 +163,20 @@ func (b *base) Options() []func(Call) error { return b.options } +// return +func (b *base) parseTableName() ([]byte, []byte) { + return parseTableName(b.table) +} + +func parseTableName(table []byte) ([]byte, []byte) { + namespace := []byte("default") + if i := bytes.Index(table, []byte(":")); i > -1 { + namespace = table[:i] + table = table[i+1:] + } + return namespace, table +} + func applyOptions(call Call, options ...func(Call) error) error { call.(withOptions).setOptions(options) for _, option := range options { diff --git a/hrpc/create.go b/hrpc/create.go index d73e94c6..6b05ac1f 100644 --- a/hrpc/create.go +++ b/hrpc/create.go @@ -6,7 +6,6 @@ package hrpc import ( - "bytes" "context" "github.com/tsuna/gohbase/pb" @@ -136,12 +135,7 @@ func (ct *CreateTable) ToProto() proto.Message { } // TableName - namespace := []byte("default") - table := ct.table - if i := bytes.Index(table, []byte(":")); i > -1 { - namespace = table[:i] - table = table[i+1:] - } + namespace, table := ct.parseTableName() return &pb.CreateTableRequest{ TableSchema: &pb.TableSchema{ TableName: &pb.TableName{ diff --git a/hrpc/delete.go b/hrpc/delete.go index 5b9f740f..59ce6b71 100644 --- a/hrpc/delete.go +++ b/hrpc/delete.go @@ -6,7 +6,6 @@ package hrpc import ( - "bytes" "context" "github.com/tsuna/gohbase/pb" @@ -42,12 +41,7 @@ func (dt *DeleteTable) Description() string { // ToProto converts the RPC into a protobuf message func (dt *DeleteTable) ToProto() proto.Message { - namespace := []byte("default") - table := dt.table - if i := bytes.Index(table, []byte(":")); i > -1 { - namespace = table[:i] - table = table[i+1:] - } + namespace, table := dt.parseTableName() return &pb.DeleteTableRequest{ TableName: &pb.TableName{ Namespace: namespace, diff --git a/hrpc/descriptor.go b/hrpc/descriptor.go new file mode 100644 index 00000000..3685c44a --- /dev/null +++ b/hrpc/descriptor.go @@ -0,0 +1,57 @@ +package hrpc + +import ( + "context" + + "github.com/tsuna/gohbase/pb" + "google.golang.org/protobuf/proto" +) + +// GetTableDescriptors ... +type GetTableDescriptors struct { + base + + tableNames [][]byte +} + +// NewGetTableDescriptors ... +func NewGetTableDescriptors(ctx context.Context, tableNames [][]byte) *GetTableDescriptors { + tn := &GetTableDescriptors{ + base: base{ + ctx: ctx, + table: []byte{}, + resultch: make(chan RPCResult, 1), + }, + tableNames: tableNames, + } + return tn +} + +// Name returns the name of this RPC call. +func (gd *GetTableDescriptors) Name() string { + return "GetTableDescriptors" +} + +// Description returns the description of this RPC call. +func (gd *GetTableDescriptors) Description() string { + return gd.Name() +} + +// ToProto converts the RPC into a protobuf message. +func (gd *GetTableDescriptors) ToProto() proto.Message { + req := &pb.GetTableDescriptorsRequest{} + for _, tableName := range gd.tableNames { + namespace, table := parseTableName(tableName) + req.TableNames = append(req.TableNames, &pb.TableName{ + Namespace: namespace, + Qualifier: table, + }) + } + return req +} + +// NewResponse creates an empty protobuf message to read the response of this +// RPC. +func (gd *GetTableDescriptors) NewResponse() proto.Message { + return &pb.GetTableDescriptorsResponse{} +} diff --git a/hrpc/disable.go b/hrpc/disable.go index 36197963..1666ebc4 100644 --- a/hrpc/disable.go +++ b/hrpc/disable.go @@ -6,7 +6,6 @@ package hrpc import ( - "bytes" "context" "github.com/tsuna/gohbase/pb" @@ -42,12 +41,7 @@ func (dt *DisableTable) Description() string { // ToProto converts the RPC into a protobuf message func (dt *DisableTable) ToProto() proto.Message { - namespace := []byte("default") - table := dt.table - if i := bytes.Index(table, []byte(":")); i > -1 { - namespace = table[:i] - table = table[i+1:] - } + namespace, table := dt.parseTableName() return &pb.DisableTableRequest{ TableName: &pb.TableName{ Namespace: namespace, diff --git a/hrpc/enable.go b/hrpc/enable.go index a2904678..821b1647 100644 --- a/hrpc/enable.go +++ b/hrpc/enable.go @@ -6,7 +6,6 @@ package hrpc import ( - "bytes" "context" "github.com/tsuna/gohbase/pb" @@ -42,12 +41,7 @@ func (et *EnableTable) Description() string { // ToProto converts the RPC into a protobuf message func (et *EnableTable) ToProto() proto.Message { - namespace := []byte("default") - table := et.table - if i := bytes.Index(table, []byte(":")); i > -1 { - namespace = table[:i] - table = table[i+1:] - } + namespace, table := et.parseTableName() return &pb.EnableTableRequest{ TableName: &pb.TableName{ Namespace: namespace, diff --git a/hrpc/modify.go b/hrpc/modify.go new file mode 100644 index 00000000..2d79fedb --- /dev/null +++ b/hrpc/modify.go @@ -0,0 +1,56 @@ +package hrpc + +import ( + "context" + + "github.com/tsuna/gohbase/pb" + "google.golang.org/protobuf/proto" +) + +type TableSchema = pb.TableSchema + +// ModifyTable represents a ModifyTable HBase call +type ModifyTable struct { + base + + schema *TableSchema +} + +// NewModifyTable create new ModifyTable object +func NewModifyTable(ctx context.Context, table []byte, schema *TableSchema) *ModifyTable { + mt := &ModifyTable{ + base: base{ + table: table, + ctx: ctx, + resultch: make(chan RPCResult, 1), + }, + schema: schema, + } + return mt +} + +func (mt *ModifyTable) Name() string { + return "ModifyTable" +} + +func (mt *ModifyTable) Description() string { + return mt.Name() +} + +func (mt *ModifyTable) ToProto() proto.Message { + + namespace, table := mt.parseTableName() + req := &pb.ModifyTableRequest{ + TableName: &pb.TableName{ + Namespace: namespace, + Qualifier: table, + }, + } + mt.schema.TableName = req.TableName + req.TableSchema = mt.schema + return req +} + +func (mt *ModifyTable) NewResponse() proto.Message { + return &pb.ModifyTableResponse{} +}