Skip to content

Commit 870cbf9

Browse files
Fix: make pool ref counter atomic, thread-safe using atomic by uber
Signed-off-by: smartaquarius10 <[email protected]>
1 parent 7d7f36f commit 870cbf9

File tree

3 files changed

+23
-23
lines changed

3 files changed

+23
-23
lines changed

pkg/scalers/connectionpool/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func loadConfig() {
5151
logger.Info("Loaded global pool configuration", "entries", len(parsed))
5252
}
5353
func clearGlobalPoolOverride() {
54-
globalOverrides.Range(func(key, value any) bool {
54+
globalOverrides.Range(func(key, _ any) bool {
5555
globalOverrides.Delete(key)
5656
return true
5757
})

pkg/scalers/connectionpool/connectionpool_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,23 @@ package connectionpool
33
import (
44
"context"
55
"errors"
6-
"sync/atomic"
76
"testing"
7+
8+
"go.uber.org/atomic"
89
)
910

1011
// mockPool simulates a ResourcePool for testing purposes.
1112
type mockPool struct {
12-
closed int32
13+
closed atomic.Int32
1314
}
1415

1516
func (m *mockPool) close() {
16-
atomic.StoreInt32(&m.closed, 1)
17+
m.closed.Store(1)
1718
}
1819

1920
func isClosed(p ResourcePool) bool {
2021
if m, ok := p.(*mockPool); ok {
21-
return atomic.LoadInt32(&m.closed) == 1
22+
return m.closed.Load() == 1
2223
}
2324
return false
2425
}

pkg/scalers/connectionpool/manager.go

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,70 +2,69 @@ package connectionpool
22

33
import (
44
"sync"
5+
6+
"go.uber.org/atomic"
57
)
68

7-
// ResourcePool : generic interface for any DB pool
89
type ResourcePool interface {
910
close()
1011
}
1112

12-
// poolEntry : tracks a ResourcePool with reference count
1313
type poolEntry struct {
1414
pool ResourcePool
15-
ref int32
15+
ref atomic.Int32
1616
}
1717

18-
var (
19-
poolMap sync.Map
20-
)
18+
var poolMap sync.Map
2119

22-
// GetOrCreate : reuse or create new pool
2320
func GetOrCreate(poolKey string, createFn func() (ResourcePool, error)) (ResourcePool, error) {
2421
if val, ok := poolMap.Load(poolKey); ok {
2522
entry := val.(*poolEntry)
26-
entry.ref++
27-
logger.V(1).Info("Reusing existing pool", "poolKey", poolKey, "refCount", entry.ref)
23+
entry.ref.Inc()
24+
logger.V(1).Info("Reusing existing pool", "poolKey", poolKey, "refCount", entry.ref.Load())
2825
return entry.pool, nil
2926
}
27+
3028
logger.Info("Creating new pool", "poolKey", poolKey)
3129
newPool, err := createFn()
3230
if err != nil {
3331
logger.Error(err, "Failed to create new pool", "poolKey", poolKey)
3432
return nil, err
3533
}
3634

37-
entry := &poolEntry{pool: newPool, ref: 1}
38-
actual, loaded := poolMap.LoadOrStore(poolKey, entry)
35+
e := &poolEntry{pool: newPool}
36+
e.ref.Store(1)
37+
38+
actual, loaded := poolMap.LoadOrStore(poolKey, e)
3939
if loaded {
4040
logger.Info("Duplicate creation detected, closing redundant pool", "poolKey", poolKey)
4141
newPool.close()
4242
old := actual.(*poolEntry)
43-
old.ref++
44-
logger.V(1).Info("Reusing existing pool after race", "poolKey", poolKey, "refCount", old.ref)
43+
old.ref.Inc()
44+
logger.V(1).Info("Reusing existing pool after race", "poolKey", poolKey, "refCount", old.ref.Load())
4545
return old.pool, nil
4646
}
47+
4748
logger.Info("Pool created successfully", "poolKey", poolKey)
4849
return newPool, nil
4950
}
5051

51-
// Release : decrease ref count, close when last user gone
5252
func Release(poolKey string) {
5353
val, ok := poolMap.Load(poolKey)
5454
if !ok {
5555
logger.V(1).Info("Attempted to release non-existent pool", "poolKey", poolKey)
5656
return
5757
}
5858
entry := val.(*poolEntry)
59-
entry.ref--
60-
logger.V(1).Info("Released pool reference", "poolKey", poolKey, "refCount", entry.ref)
61-
if entry.ref <= 0 {
59+
if entry.ref.Dec() <= 0 {
6260
logger.Info("Closing pool, no active references", "poolKey", poolKey)
6361
entry.pool.close()
6462
poolMap.Delete(poolKey)
63+
} else {
64+
logger.V(1).Info("Released pool reference", "poolKey", poolKey, "refCount", entry.ref.Load())
6565
}
6666
}
6767

68-
// CloseAll : close all pools (on operator shutdown)
6968
func CloseAll() {
7069
poolMap.Range(func(key, val any) bool {
7170
entry := val.(*poolEntry)

0 commit comments

Comments
 (0)