Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
defaultZkRoot = "/hbase"
defaultZkTimeout = 30 * time.Second
defaultEffectiveUser = "root"
defaultAuth = "SIMPLE"
)

// Client a regular HBase client
Expand Down Expand Up @@ -94,9 +95,11 @@ type client struct {
closeOnce sync.Once

newRegionClientFn func(string, region.ClientType, int, time.Duration,
string, time.Duration, compression.Codec) hrpc.RegionClient
string, time.Duration, compression.Codec, string) hrpc.RegionClient

compressionCodec compression.Codec

auth string
}

// NewClient creates a new HBase client.
Expand Down Expand Up @@ -130,6 +133,7 @@ func newClient(zkquorum string, options ...Option) *client {
regionReadTimeout: region.DefaultReadTimeout,
done: make(chan struct{}),
newRegionClientFn: region.NewClient,
auth: defaultAuth,
}
for _, option := range options {
option(c)
Expand Down Expand Up @@ -185,6 +189,13 @@ func EffectiveUser(user string) Option {
}
}

// Auth will return an option that will set the authorization method when accessing regions
func Auth(auth string) Option {
return func(c *client) {
c.auth = auth
}
}

// FlushInterval will return an option that will set the timeout for flushing
// the RPC queues used in a given client
func FlushInterval(interval time.Duration) Option {
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ module github.com/tsuna/gohbase
go 1.13

require (
github.com/beltran/gosasl v0.0.0-20200816203322-2f20f217aef6
github.com/golang/mock v1.4.3
github.com/golang/protobuf v1.4.0
github.com/golang/snappy v0.0.1
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da
github.com/sirupsen/logrus v1.5.0
github.com/stretchr/testify v1.4.0 // indirect
google.golang.org/protobuf v1.21.0
modernc.org/b v1.0.0
modernc.org/mathutil v1.1.1 // indirect
modernc.org/strutil v1.1.0 // indirect
)
23 changes: 23 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
github.com/beltran/gosasl v0.0.0-20200816203322-2f20f217aef6 h1:ZqV4zNU/8Vqt7+En3hG066C9fk6ou9sTVCQOek/180o=
github.com/beltran/gosasl v0.0.0-20200816203322-2f20f217aef6/go.mod h1:Qx8cW6jkI8riyzmklj80kAIkv+iezFUTBiGU0qHhHes=
github.com/beltran/gssapi v0.0.0-20200324152954-d86554db4bab h1:ayfcn60tXOSYy5zUN1AMSTQo4nJCf7hrdzAVchpPst4=
github.com/beltran/gssapi v0.0.0-20200324152954-d86554db4bab/go.mod h1:GLe4UoSyvJ3cVG+DVtKen5eAiaD8mAJFuV5PT3Eeg9Q=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/mock v1.4.3 h1:GV+pQPG/EUUbkh47niozDcADz6go/dUwhVzdUQHIVRw=
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
Expand All @@ -11,14 +17,22 @@ github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da h1:p3Vo3i64TCLY7gIfzeQaUJ+kppEO5WQG3cL8iE8tGHU=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/sirupsen/logrus v1.5.0 h1:1N5EYkVAPEywqZRJd7cwnRtCb6xJx7NH3T3WUTF980Q=
github.com/sirupsen/logrus v1.5.0/go.mod h1:+F7Ogzej0PZc/94MaYx/nvG9jOFMD2osvC3s+Squfpo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -28,14 +42,23 @@ golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0 h1:qdOKuR/EIArgaWNjetjgTzgVTAZ+S/WXVrq9HW9zimw=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
modernc.org/b v1.0.0 h1:vpvqeyp17ddcQWF29Czawql4lDdABCDRbXRAS4+aF2o=
modernc.org/b v1.0.0/go.mod h1:uZWcZfRj1BpYzfN9JTerzlNUnnPsV9O2ZA8JsRcubNg=
modernc.org/mathutil v1.1.1 h1:FeylZSVX8S+58VsyJlkEj2bcpdytmp9MmDKZkKx8OIE=
modernc.org/mathutil v1.1.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/strutil v1.1.0 h1:+1/yCzZxY2pZwwrsbH+4T7BQMoLQ9QiBshRC9eicYsc=
modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
98 changes: 89 additions & 9 deletions region/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"github.com/tsuna/gohbase/pb"
"google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/runtime/protoimpl"

gosasl "github.com/beltran/gosasl"
)

// ClientType is a type alias to represent the type of this region client
Expand Down Expand Up @@ -193,6 +196,8 @@ type client struct {

// compressor for cellblocks. if nil, then no compression
compressor *compressor

auth string
}

// QueueRPC will add an rpc call to the queue for processing by the writer goroutine
Expand Down Expand Up @@ -485,13 +490,15 @@ func (c *client) receive() (err error) {
}

if header.CallId == nil {
log.Println(protoimpl.X.MessageStringOf(header.Exception))
return ErrMissingCallID
}

callID := *header.CallId
rpc := c.unregisterRPC(callID)
if rpc == nil {
return ServerError{fmt.Errorf("got a response with an unexpected call ID: %d", callID)}
//return ServerError{fmt.Errorf("got a response with an unexpected call ID: %d", callID)}
return ServerError{fmt.Errorf("got a response with an unexpected call ID: %d, message: %v", callID, protoimpl.X.MessageStringOf(header.Exception))}
}
if err := c.inFlightDown(); err != nil {
return ServerError{err}
Expand Down Expand Up @@ -581,7 +588,13 @@ func (c *client) readFully(buf []byte) error {
return err
}

// sendHello sends the "hello" message needed when opening a new connection.
/* sendHello sends the "hello" message needed when opening a new connection.
1. Setup connection
2. Define input and output streams
3. Write the preamble (const header below)
4. if(useSasl) Create sasl connection
5. Send connection header
*/
func (c *client) sendHello() error {
connHeader := &pb.ConnectionHeader{
UserInfo: &pb.UserInformation{
Expand All @@ -599,13 +612,80 @@ func (c *client) sendHello() error {
return fmt.Errorf("failed to marshal connection header: %s", err)
}

const header = "HBas\x00\x50" // \x50 = Simple Auth.
buf := make([]byte, 0, len(header)+4+len(data))
buf = append(buf, header...)
buf = buf[:len(header)+4]
binary.BigEndian.PutUint32(buf[6:], uint32(len(data)))
buf = append(buf, data...)
return c.write(buf)
if c.auth == "KERBEROS" {
// Write the preamble
const header = "HBas\x00\x51" // \x51 = Kerberos Auth
buf := make([]byte, 0, len(header)+4+len(data))
buf = append(buf, header...)
c.write(buf)

mechanism, err := gosasl.NewGSSAPIMechanism("hbase")
if err != nil {
log.Println(err)
}
saslClient := gosasl.NewSaslClient(strings.Split(c.addr, ":")[0], mechanism)

// Get initial response
saslToken, err := saslClient.Start()
if err != nil {
log.Println(err)
}

sbuf := make([]byte, 4)
binary.BigEndian.PutUint32(sbuf[0:], uint32(len(saslToken)))
sbuf = append(sbuf, saslToken...)
c.write(sbuf)

if !saslClient.Complete() {
status := make([]byte, 4)
c.conn.Read(status)

tokenLen := make([]byte, 4)
c.conn.Read(tokenLen)

resLength := binary.BigEndian.Uint32(tokenLen)
saslToken = make([]byte, resLength)
c.conn.Read(saslToken)

for !saslClient.Complete() {
saslToken, err = saslClient.Step(saslToken)
if err != nil {
log.Println(err)
}
if saslToken != nil {
sbuf := make([]byte, 4)
binary.BigEndian.PutUint32(sbuf[0:], uint32(len(saslToken)))
sbuf = append(sbuf, saslToken...)
c.write(sbuf)
}
if !saslClient.Complete() {
status := make([]byte, 4)
c.conn.Read(status)

tokenLen := make([]byte, 4)
c.conn.Read(tokenLen)

resLength := binary.BigEndian.Uint32(tokenLen)
saslToken = make([]byte, resLength)
c.conn.Read(saslToken)
}
}
}

buf = make([]byte, 4)
binary.BigEndian.PutUint32(buf[:], uint32(len(data)))
buf = append(buf, data...)
resp, err := saslClient.Encode(buf)
return c.write(resp)
} else {
const header = "HBas\x00\x50" // \x50 = Simple Auth.
buf := make([]byte, 0, len(header)+4+len(data))
buf = append(buf, header...)
buf = buf[:len(header)+4]
binary.BigEndian.PutUint32(buf[6:], uint32(len(data)))
buf = append(buf, data...)
return c.write(buf)
}
}

// send sends an RPC out to the wire.
Expand Down
3 changes: 2 additions & 1 deletion region/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

// NewClient creates a new RegionClient.
func NewClient(addr string, ctype ClientType, queueSize int, flushInterval time.Duration,
effectiveUser string, readTimeout time.Duration, codec compression.Codec) hrpc.RegionClient {
effectiveUser string, readTimeout time.Duration, codec compression.Codec, auth string) hrpc.RegionClient {
c := &client{
addr: addr,
ctype: ctype,
Expand All @@ -30,6 +30,7 @@ func NewClient(addr string, ctype ClientType, queueSize int, flushInterval time.
rpcs: make(chan hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
auth: auth,
}

if codec != nil {
Expand Down
4 changes: 2 additions & 2 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,11 +509,11 @@ func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) {
// master that we don't add to the cache
// TODO: consider combining this case with the regular regionserver path
client = c.newRegionClientFn(addr, c.clientType, c.rpcQueueSize, c.flushInterval,
c.effectiveUser, c.regionReadTimeout, nil)
c.effectiveUser, c.regionReadTimeout, nil, c.auth)
} else {
client = c.clients.put(addr, reg, func() hrpc.RegionClient {
return c.newRegionClientFn(addr, c.clientType, c.rpcQueueSize, c.flushInterval,
c.effectiveUser, c.regionReadTimeout, c.compressionCodec)
c.effectiveUser, c.regionReadTimeout, c.compressionCodec, c.auth)
})
}

Expand Down