Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion pkg/manager/backendcluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sort"
"strings"
"sync"
"time"

"github.com/pingcap/tiproxy/lib/config"
"github.com/pingcap/tiproxy/lib/util/errors"
Expand All @@ -19,6 +20,8 @@ import (
"go.uber.org/zap"
)

const topologyFetchTimeout = 2 * time.Second

type Manager struct {
lg *zap.Logger
clusterTLS func() *tls.Config
Expand Down Expand Up @@ -209,13 +212,44 @@ func (m *Manager) PreClose() {
}
}

// GetTiDBTopology fetches and merges TiDB topology from all backend clusters.
// It returns an error only when all cluster fetches fail to provide any topology.
func (m *Manager) GetTiDBTopology(ctx context.Context) (map[string]*infosync.TiDBTopologyInfo, error) {
clusters := m.Snapshot()
merged := make(map[string]*infosync.TiDBTopologyInfo, 128)
errs := make([]error, 0, len(clusters))

type topologyResult struct {
clusterName string
infos map[string]*infosync.TiDBTopologyInfo
err error
}

resultCh := make(chan topologyResult, len(clusters))
var wg waitgroup.WaitGroup
for clusterName, cluster := range clusters {
infos, err := cluster.GetTiDBTopology(ctx)
wg.Run(func() {
clusterCtx, cancel := context.WithTimeout(ctx, topologyFetchTimeout)
defer cancel()

infos, err := cluster.GetTiDBTopology(clusterCtx)
resultCh <- topologyResult{
clusterName: clusterName,
infos: infos,
err: err,
}
}, m.lg)
}
wg.Wait()
close(resultCh)

for result := range resultCh {
clusterName := result.clusterName
infos := result.infos
err := result.err
if err != nil {
m.lg.Warn("fetch TiDB topology from backend cluster failed",
zap.String("cluster", clusterName), zap.Error(err))
errs = append(errs, err)
continue
}
Expand Down
40 changes: 40 additions & 0 deletions pkg/manager/backendcluster/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,38 @@ func TestClusterReusableIgnoresNSServerOrder(t *testing.T) {
require.True(t, reusable)
}

func TestManagerGetTiDBTopologySkipsUnavailableCluster(t *testing.T) {
clusterA := newManagerTestEtcdCluster(t)
clusterB := newManagerTestEtcdCluster(t)
t.Cleanup(func() { clusterA.close(t) })
t.Cleanup(func() { clusterB.close(t) })

clusterA.putTopology(t, "10.0.0.1:4000", &infosync.TiDBTopologyInfo{IP: "10.0.0.1", StatusPort: 10080})

lg := zapLoggerForTest(t)
mgr := NewManager(lg, nilClusterTLS)
mgr.mu.clusters = map[string]*Cluster{
"cluster-a": {
infoSyncer: infosync.NewInfoSyncer(lg.Named("cluster-a"), clusterA.client),
},
"cluster-b": {
infoSyncer: infosync.NewInfoSyncer(lg.Named("cluster-b"), clusterB.client),
},
}
clusterB.shutdownServer(t)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
start := time.Now()
topology, err := mgr.GetTiDBTopology(ctx)
elapsed := time.Since(start)

require.NoError(t, err)
require.Less(t, elapsed, 3*time.Second)
require.Contains(t, topology, backendID("cluster-a", "10.0.0.1:4000"))
require.NotContains(t, topology, backendID("cluster-b", "10.0.0.2:4000"))
}

type managerTestConfigGetter struct {
mu sync.RWMutex
cfg *config.Config
Expand Down Expand Up @@ -376,7 +408,15 @@ func newManagerTestEtcdCluster(t *testing.T) *managerTestEtcdCluster {

func (tec *managerTestEtcdCluster) close(t *testing.T) {
require.NoError(t, tec.client.Close())
if tec.etcd != nil {
tec.etcd.Close()
}
}

func (tec *managerTestEtcdCluster) shutdownServer(t *testing.T) {
require.NotNil(t, tec.etcd)
tec.etcd.Close()
tec.etcd = nil
}

func (tec *managerTestEtcdCluster) putTopology(t *testing.T, sqlAddr string, info *infosync.TiDBTopologyInfo) {
Expand Down