diff --git a/caches.go b/caches.go index 280f5445..524ec410 100644 --- a/caches.go +++ b/caches.go @@ -20,37 +20,31 @@ import ( type clientRegionCache struct { m sync.RWMutex - regions map[hrpc.RegionClient]map[hrpc.RegionInfo]struct{} + regionClients map[string]hrpc.RegionClient + regions map[string]map[hrpc.RegionInfo]struct{} } // put caches client and associates a region with it. Returns a client that is in cache. -// TODO: obvious place for optimization (use map with address as key to lookup exisiting clients) func (rcc *clientRegionCache) put(c hrpc.RegionClient, r hrpc.RegionInfo) hrpc.RegionClient { rcc.m.Lock() - for existingClient, regions := range rcc.regions { - // check if client already exists, checking by host and port - // because concurrent callers might try to put the same client - if c.Addr() == existingClient.Addr() { - // check client already knows about the region, checking - // by pointer is enough because we make sure that there are - // no regions with the same name around - if _, ok := regions[r]; !ok { - regions[r] = struct{}{} - } - rcc.m.Unlock() - - log.WithFields(log.Fields{ - "existingClient": existingClient, - "client": c, - }).Debug("region client is already in client's cache") - return existingClient + if regions, ok := rcc.regions[c.Addr()]; ok { + if _, ok := regions[r]; !ok { + regions[r] = struct{}{} } + existingClient := rcc.regionClients[c.Addr()] + rcc.m.Unlock() + + log.WithFields(log.Fields{ + "existingClient": existingClient, + "client": c, + }).Debug("region client is already in client's cache") + return existingClient } // no such client yet - rcc.regions[c] = map[hrpc.RegionInfo]struct{}{r: struct{}{}} + rcc.regionClients[c.Addr()] = c + rcc.regions[c.Addr()] = map[hrpc.RegionInfo]struct{}{r: struct{}{}} rcc.m.Unlock() - log.WithField("client", c).Info("added new region client") return c } @@ -60,7 +54,7 @@ func (rcc *clientRegionCache) del(r hrpc.RegionInfo) { c := r.Client() if c != nil { r.SetClient(nil) - regions := rcc.regions[c] + regions := rcc.regions[c.Addr()] delete(regions, r) } rcc.m.Unlock() @@ -68,11 +62,12 @@ func (rcc *clientRegionCache) del(r hrpc.RegionInfo) { func (rcc *clientRegionCache) closeAll() { rcc.m.Lock() - for client, regions := range rcc.regions { + for cAddr, regions := range rcc.regions { for region := range regions { region.MarkUnavailable() region.SetClient(nil) } + client := rcc.regionClients[cAddr] client.Close() } rcc.m.Unlock() @@ -80,8 +75,8 @@ func (rcc *clientRegionCache) closeAll() { func (rcc *clientRegionCache) clientDown(c hrpc.RegionClient) map[hrpc.RegionInfo]struct{} { rcc.m.Lock() - downregions, ok := rcc.regions[c] - delete(rcc.regions, c) + downregions, ok := rcc.regions[c.Addr()] + delete(rcc.regions, c.Addr()) rcc.m.Unlock() if ok { @@ -90,17 +85,13 @@ func (rcc *clientRegionCache) clientDown(c hrpc.RegionClient) map[hrpc.RegionInf return downregions } -// TODO: obvious place for optimization (use map with address as key to lookup exisiting clients) func (rcc *clientRegionCache) checkForClient(addr string) hrpc.RegionClient { rcc.m.RLock() - - for client := range rcc.regions { - if client.Addr() == addr { - rcc.m.RUnlock() - return client - } + if _, ok := rcc.regions[addr]; ok { + client := rcc.regionClients[addr] + rcc.m.RUnlock() + return client } - rcc.m.RUnlock() return nil } diff --git a/client.go b/client.go index 6f54259d..51cd97df 100644 --- a/client.go +++ b/client.go @@ -116,7 +116,8 @@ func newClient(zkquorum string, options ...Option) *client { clientType: standardClient, regions: keyRegionCache{regions: b.TreeNew(region.CompareGeneric)}, clients: clientRegionCache{ - regions: make(map[hrpc.RegionClient]map[hrpc.RegionInfo]struct{}), + regionClients: make(map[string]hrpc.RegionClient), + regions: make(map[string]map[hrpc.RegionInfo]struct{}), }, rpcQueueSize: defaultRPCQueueSize, flushInterval: defaultFlushInterval, diff --git a/metacache_test.go b/metacache_test.go index b413435b..6de530f3 100644 --- a/metacache_test.go +++ b/metacache_test.go @@ -649,9 +649,9 @@ func TestClientCachePut(t *testing.T) { t.Errorf("Expected 1 client in cache, got %d", len(client.clients.regions)) } - if len(client.clients.regions[regClient]) != 1 { + if len(client.clients.regions[regClient.Addr()]) != 1 { t.Errorf("Expected 1 region for client in cache, got %d", - len(client.clients.regions[regClient])) + len(client.clients.regions[regClient.Addr()])) } // try putting client with the same host port @@ -676,8 +676,8 @@ func TestClientCachePut(t *testing.T) { t.Errorf("Expected 1 client in cache, got %d", len(client.clients.regions)) } - if len(client.clients.regions[regClient]) != 2 { + if len(client.clients.regions[regClient.Addr()]) != 2 { t.Errorf("Expected 2 regions for client in cache, got %d", - len(client.clients.regions[regClient])) + len(client.clients.regions[regClient.Addr()])) } }