diff --git a/pkg/manager/backendcluster/manager.go b/pkg/manager/backendcluster/manager.go index f47a45c3..87defe31 100644 --- a/pkg/manager/backendcluster/manager.go +++ b/pkg/manager/backendcluster/manager.go @@ -11,6 +11,7 @@ import ( "sort" "strings" "sync" + "time" "github.com/pingcap/tiproxy/lib/config" "github.com/pingcap/tiproxy/lib/util/errors" @@ -19,6 +20,8 @@ import ( "go.uber.org/zap" ) +const topologyFetchTimeout = 2 * time.Second + type Manager struct { lg *zap.Logger clusterTLS func() *tls.Config @@ -209,13 +212,44 @@ func (m *Manager) PreClose() { } } +// GetTiDBTopology fetches and merges TiDB topology from all backend clusters. +// It returns an error only when all cluster fetches fail to provide any topology. func (m *Manager) GetTiDBTopology(ctx context.Context) (map[string]*infosync.TiDBTopologyInfo, error) { clusters := m.Snapshot() merged := make(map[string]*infosync.TiDBTopologyInfo, 128) errs := make([]error, 0, len(clusters)) + + type topologyResult struct { + clusterName string + infos map[string]*infosync.TiDBTopologyInfo + err error + } + + resultCh := make(chan topologyResult, len(clusters)) + var wg waitgroup.WaitGroup for clusterName, cluster := range clusters { - infos, err := cluster.GetTiDBTopology(ctx) + wg.Run(func() { + clusterCtx, cancel := context.WithTimeout(ctx, topologyFetchTimeout) + defer cancel() + + infos, err := cluster.GetTiDBTopology(clusterCtx) + resultCh <- topologyResult{ + clusterName: clusterName, + infos: infos, + err: err, + } + }, m.lg) + } + wg.Wait() + close(resultCh) + + for result := range resultCh { + clusterName := result.clusterName + infos := result.infos + err := result.err if err != nil { + m.lg.Warn("fetch TiDB topology from backend cluster failed", + zap.String("cluster", clusterName), zap.Error(err)) errs = append(errs, err) continue } diff --git a/pkg/manager/backendcluster/manager_test.go b/pkg/manager/backendcluster/manager_test.go index 1799bf13..21284367 100644 --- a/pkg/manager/backendcluster/manager_test.go +++ b/pkg/manager/backendcluster/manager_test.go @@ -331,6 +331,38 @@ func TestClusterReusableIgnoresNSServerOrder(t *testing.T) { require.True(t, reusable) } +func TestManagerGetTiDBTopologySkipsUnavailableCluster(t *testing.T) { + clusterA := newManagerTestEtcdCluster(t) + clusterB := newManagerTestEtcdCluster(t) + t.Cleanup(func() { clusterA.close(t) }) + t.Cleanup(func() { clusterB.close(t) }) + + clusterA.putTopology(t, "10.0.0.1:4000", &infosync.TiDBTopologyInfo{IP: "10.0.0.1", StatusPort: 10080}) + + lg := zapLoggerForTest(t) + mgr := NewManager(lg, nilClusterTLS) + mgr.mu.clusters = map[string]*Cluster{ + "cluster-a": { + infoSyncer: infosync.NewInfoSyncer(lg.Named("cluster-a"), clusterA.client), + }, + "cluster-b": { + infoSyncer: infosync.NewInfoSyncer(lg.Named("cluster-b"), clusterB.client), + }, + } + clusterB.shutdownServer(t) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + start := time.Now() + topology, err := mgr.GetTiDBTopology(ctx) + elapsed := time.Since(start) + + require.NoError(t, err) + require.Less(t, elapsed, 3*time.Second) + require.Contains(t, topology, backendID("cluster-a", "10.0.0.1:4000")) + require.NotContains(t, topology, backendID("cluster-b", "10.0.0.2:4000")) +} + type managerTestConfigGetter struct { mu sync.RWMutex cfg *config.Config @@ -376,7 +408,15 @@ func newManagerTestEtcdCluster(t *testing.T) *managerTestEtcdCluster { func (tec *managerTestEtcdCluster) close(t *testing.T) { require.NoError(t, tec.client.Close()) + if tec.etcd != nil { + tec.etcd.Close() + } +} + +func (tec *managerTestEtcdCluster) shutdownServer(t *testing.T) { + require.NotNil(t, tec.etcd) tec.etcd.Close() + tec.etcd = nil } func (tec *managerTestEtcdCluster) putTopology(t *testing.T, sqlAddr string, info *infosync.TiDBTopologyInfo) {