Skip to content

Commit 7bef872

Browse files
siddharth16396arthurschreiberCopilot
authored
[gRPC-TMClient]Optimizes the gRPC tablet manager client by splitting a shared mutex into two independent locks (#18933)
Signed-off-by: siddharth16396 <[email protected]> Signed-off-by: Arthur Schreiber <[email protected]> Signed-off-by: siddharth16396 <[email protected]> Co-authored-by: Arthur Schreiber <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent 5dbb061 commit 7bef872

File tree

2 files changed

+130
-62
lines changed

2 files changed

+130
-62
lines changed

go/vt/vttablet/grpctmclient/client.go

Lines changed: 93 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -103,17 +103,25 @@ type tmc struct {
103103
client tabletmanagerservicepb.TabletManagerClient
104104
}
105105

106-
type addrTmcMap map[string]*tmc
106+
type tmcEntry struct {
107+
once sync.Once
108+
tmc *tmc
109+
err error
110+
}
111+
112+
type addrTmcMap map[string]*tmcEntry
107113

108114
// grpcClient implements both dialer and poolDialer.
109115
type grpcClient struct {
110116
// This cache of connections is to maximize QPS for ExecuteFetchAs{Dba,App},
111117
// CheckThrottler and FullStatus. Note we'll keep the clients open and close them upon Close() only.
112118
// But that's OK because usually the tasks that use them are one-purpose only.
113-
// The map is protected by the mutex.
114-
mu sync.Mutex
119+
// rpcClientMapMu protects rpcClientMap.
120+
rpcClientMapMu sync.Mutex
115121
rpcClientMap map[string]chan *tmc
116-
rpcDialPoolMap map[DialPoolGroup]addrTmcMap
122+
// rpcDialPoolMapMu protects rpcDialPoolMap.
123+
rpcDialPoolMapMu sync.Mutex
124+
rpcDialPoolMap map[DialPoolGroup]addrTmcMap
117125
}
118126

119127
type dialer interface {
@@ -185,25 +193,32 @@ func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Table
185193
return nil, vterrors.FromGRPC(err)
186194
}
187195

188-
client.mu.Lock()
189-
if client.rpcClientMap == nil {
190-
client.rpcClientMap = make(map[string]chan *tmc)
191-
}
192-
c, ok := client.rpcClientMap[addr]
193-
if !ok {
196+
c, isEmpty := func() (chan *tmc, bool) {
197+
client.rpcClientMapMu.Lock()
198+
defer client.rpcClientMapMu.Unlock()
199+
200+
if client.rpcClientMap == nil {
201+
client.rpcClientMap = make(map[string]chan *tmc)
202+
}
203+
c, ok := client.rpcClientMap[addr]
204+
if ok {
205+
return c, false
206+
}
207+
194208
c = make(chan *tmc, concurrency)
195209
client.rpcClientMap[addr] = c
196-
client.mu.Unlock()
210+
return c, true
211+
}()
197212

213+
// If the channel is empty, populate it with connections.
214+
if isEmpty {
198215
for i := 0; i < cap(c); i++ {
199216
tm, err := client.createTmc(ctx, addr, opt)
200217
if err != nil {
201218
return nil, vterrors.FromGRPC(err)
202219
}
203220
c <- tm
204221
}
205-
} else {
206-
client.mu.Unlock()
207222
}
208223

209224
result := <-c
@@ -218,44 +233,81 @@ func (client *grpcClient) dialDedicatedPool(ctx context.Context, dialPoolGroup D
218233
return nil, nil, err
219234
}
220235

221-
client.mu.Lock()
222-
defer client.mu.Unlock()
223-
if client.rpcDialPoolMap == nil {
224-
client.rpcDialPoolMap = make(map[DialPoolGroup]addrTmcMap)
225-
}
226-
if _, ok := client.rpcDialPoolMap[dialPoolGroup]; !ok {
227-
client.rpcDialPoolMap[dialPoolGroup] = make(addrTmcMap)
228-
}
229-
m := client.rpcDialPoolMap[dialPoolGroup]
230-
if _, ok := m[addr]; !ok {
231-
tm, err := client.createTmc(ctx, addr, opt)
232-
if err != nil {
233-
return nil, nil, err
236+
entry := func() *tmcEntry {
237+
client.rpcDialPoolMapMu.Lock()
238+
defer client.rpcDialPoolMapMu.Unlock()
239+
240+
if client.rpcDialPoolMap == nil {
241+
client.rpcDialPoolMap = make(map[DialPoolGroup]addrTmcMap)
242+
}
243+
if _, ok := client.rpcDialPoolMap[dialPoolGroup]; !ok {
244+
client.rpcDialPoolMap[dialPoolGroup] = make(addrTmcMap)
245+
}
246+
247+
poolEntries := client.rpcDialPoolMap[dialPoolGroup]
248+
entry, ok := poolEntries[addr]
249+
if ok {
250+
return entry
234251
}
235-
m[addr] = tm
252+
253+
entry = &tmcEntry{}
254+
poolEntries[addr] = entry
255+
return entry
256+
}()
257+
258+
// Initialize connection exactly once, without holding the mutex
259+
entry.once.Do(func() {
260+
entry.tmc, entry.err = client.createTmc(ctx, addr, opt)
261+
})
262+
263+
if entry.err != nil {
264+
return nil, nil, entry.err
236265
}
266+
237267
invalidator := func() {
238-
client.mu.Lock()
239-
defer client.mu.Unlock()
240-
if tm := m[addr]; tm != nil && tm.cc != nil {
241-
tm.cc.Close()
268+
client.rpcDialPoolMapMu.Lock()
269+
defer client.rpcDialPoolMapMu.Unlock()
270+
271+
if entry.tmc != nil && entry.tmc.cc != nil {
272+
entry.tmc.cc.Close()
273+
}
274+
275+
if poolEntries, ok := client.rpcDialPoolMap[dialPoolGroup]; ok {
276+
delete(poolEntries, addr)
242277
}
243-
delete(m, addr)
244278
}
245-
return m[addr].client, invalidator, nil
279+
return entry.tmc.client, invalidator, nil
246280
}
247281

248282
// Close is part of the tmclient.TabletManagerClient interface.
249283
func (client *grpcClient) Close() {
250-
client.mu.Lock()
251-
defer client.mu.Unlock()
252-
for _, c := range client.rpcClientMap {
253-
close(c)
254-
for ch := range c {
255-
ch.cc.Close()
284+
func() {
285+
client.rpcClientMapMu.Lock()
286+
defer client.rpcClientMapMu.Unlock()
287+
288+
for _, c := range client.rpcClientMap {
289+
close(c)
290+
for ch := range c {
291+
ch.cc.Close()
292+
}
256293
}
257-
}
258-
client.rpcClientMap = nil
294+
client.rpcClientMap = nil
295+
}()
296+
297+
// Close dedicated pools
298+
func() {
299+
client.rpcDialPoolMapMu.Lock()
300+
defer client.rpcDialPoolMapMu.Unlock()
301+
302+
for _, addrMap := range client.rpcDialPoolMap {
303+
for _, tm := range addrMap {
304+
if tm != nil && tm.tmc != nil && tm.tmc.cc != nil {
305+
tm.tmc.cc.Close()
306+
}
307+
}
308+
}
309+
client.rpcDialPoolMap = nil
310+
}()
259311
}
260312

261313
//

go/vt/vttablet/grpctmclient/client_test.go

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,12 @@ func TestDialDedicatedPool(t *testing.T) {
6565
assert.NotEmpty(t, rpcClient.rpcDialPoolMap[dialPoolGroupThrottler])
6666
assert.Empty(t, rpcClient.rpcDialPoolMap[dialPoolGroupVTOrc])
6767

68-
c := rpcClient.rpcDialPoolMap[dialPoolGroupThrottler][addr]
69-
assert.NotNil(t, c)
70-
assert.Contains(t, []connectivity.State{connectivity.Connecting, connectivity.TransientFailure}, c.cc.GetState())
68+
entry := rpcClient.rpcDialPoolMap[dialPoolGroupThrottler][addr]
69+
assert.NotNil(t, entry)
70+
assert.NotNil(t, entry.tmc)
71+
assert.Contains(t, []connectivity.State{connectivity.Connecting, connectivity.TransientFailure}, entry.tmc.cc.GetState())
7172

72-
cachedTmc = c
73+
cachedTmc = entry.tmc
7374
})
7475

7576
t.Run("CheckThrottler", func(t *testing.T) {
@@ -145,22 +146,31 @@ func TestDialPool(t *testing.T) {
145146
_, err := client.CheckThrottler(ctx, tablet, req)
146147
assert.Error(t, err)
147148
})
149+
148150
t.Run("post throttler maps", func(t *testing.T) {
149151
rpcClient, ok := client.dialer.(*grpcClient)
150152
require.True(t, ok)
151153

152-
rpcClient.mu.Lock()
153-
defer rpcClient.mu.Unlock()
154+
func() {
155+
rpcClient.rpcDialPoolMapMu.Lock()
156+
defer rpcClient.rpcDialPoolMapMu.Unlock()
154157

155-
assert.NotEmpty(t, rpcClient.rpcDialPoolMap)
156-
assert.Empty(t, rpcClient.rpcDialPoolMap[dialPoolGroupThrottler])
157-
assert.Empty(t, rpcClient.rpcDialPoolMap[dialPoolGroupVTOrc])
158+
assert.NotEmpty(t, rpcClient.rpcDialPoolMap)
159+
assert.Empty(t, rpcClient.rpcDialPoolMap[dialPoolGroupThrottler])
160+
assert.Empty(t, rpcClient.rpcDialPoolMap[dialPoolGroupVTOrc])
161+
}()
158162

159-
assert.NotEmpty(t, rpcClient.rpcClientMap)
160-
assert.NotEmpty(t, rpcClient.rpcClientMap[addr])
163+
func() {
164+
rpcClient.rpcClientMapMu.Lock()
165+
defer rpcClient.rpcClientMapMu.Unlock()
166+
167+
assert.NotEmpty(t, rpcClient.rpcClientMap)
168+
assert.NotEmpty(t, rpcClient.rpcClientMap[addr])
169+
}()
161170

162171
assert.Contains(t, []connectivity.State{connectivity.Connecting, connectivity.TransientFailure}, cachedTmc.cc.GetState())
163172
})
173+
164174
t.Run("ExecuteFetchAsDba", func(t *testing.T) {
165175
ctx, cancel := context.WithTimeout(ctx, time.Second)
166176
defer cancel()
@@ -174,16 +184,22 @@ func TestDialPool(t *testing.T) {
174184
rpcClient, ok := client.dialer.(*grpcClient)
175185
require.True(t, ok)
176186

177-
rpcClient.mu.Lock()
178-
defer rpcClient.mu.Unlock()
179-
180-
assert.NotEmpty(t, rpcClient.rpcDialPoolMap)
181-
assert.Empty(t, rpcClient.rpcDialPoolMap[dialPoolGroupThrottler])
182-
assert.Empty(t, rpcClient.rpcDialPoolMap[dialPoolGroupVTOrc])
183-
184-
// The default pools are unaffected. Invalidator does not run, connections are not closed.
185-
assert.NotEmpty(t, rpcClient.rpcClientMap)
186-
assert.NotEmpty(t, rpcClient.rpcClientMap[addr])
187+
func() {
188+
rpcClient.rpcDialPoolMapMu.Lock()
189+
defer rpcClient.rpcDialPoolMapMu.Unlock()
190+
191+
assert.NotEmpty(t, rpcClient.rpcDialPoolMap)
192+
assert.Empty(t, rpcClient.rpcDialPoolMap[dialPoolGroupThrottler])
193+
assert.Empty(t, rpcClient.rpcDialPoolMap[dialPoolGroupVTOrc])
194+
}()
195+
196+
func() {
197+
rpcClient.rpcClientMapMu.Lock()
198+
defer rpcClient.rpcClientMapMu.Unlock()
199+
// The default pools are unaffected. Invalidator does not run, connections are not closed.
200+
assert.NotEmpty(t, rpcClient.rpcClientMap)
201+
assert.NotEmpty(t, rpcClient.rpcClientMap[addr])
202+
}()
187203

188204
assert.NotNil(t, cachedTmc)
189205
assert.Contains(t, []connectivity.State{connectivity.Connecting, connectivity.TransientFailure}, cachedTmc.cc.GetState())

0 commit comments

Comments
 (0)