Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 17 additions & 18 deletions coordinator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (c *Controller) collectMetrics(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
return context.Cause(ctx)
case <-ticker.C:
metrics.ChangefeedStateGauge.WithLabelValues("Total").Set(float64(c.changefeedDB.GetSize()))
metrics.ChangefeedStateGauge.WithLabelValues("Working").Set(float64(c.changefeedDB.GetReplicatingSize()))
Expand Down Expand Up @@ -219,7 +219,7 @@ func (c *Controller) collectMetrics(ctx context.Context) error {
}

// HandleEvent implements the event-driven process mode
func (c *Controller) HandleEvent(event *Event) {
func (c *Controller) HandleEvent(ctx context.Context, event *Event) {
if event == nil {
return
}
Expand All @@ -236,22 +236,22 @@ func (c *Controller) HandleEvent(event *Event) {

// Before processing the event, we need to check the online/offline nodes,
// the following logic is based on whether the node changed.
c.checkOnNodeChanged()
c.checkOnNodeChanged(ctx)

switch event.eventType {
case EventMessage:
c.onMessage(event.message)
c.onMessage(ctx, event.message)
case EventPeriod:
c.onPeriodTask()
}
}

func (c *Controller) checkOnNodeChanged() {
func (c *Controller) checkOnNodeChanged(ctx context.Context) {
c.nodeChanged.Lock()
defer c.nodeChanged.Unlock()

if c.nodeChanged.changed {
c.onNodeChanged()
c.onNodeChanged(ctx)
c.nodeChanged.changed = false
}
}
Expand All @@ -264,10 +264,10 @@ func (c *Controller) onPeriodTask() {
}
}

func (c *Controller) onMessage(msg *messaging.TargetMessage) {
func (c *Controller) onMessage(ctx context.Context, msg *messaging.TargetMessage) {
switch msg.Type {
case messaging.TypeCoordinatorBootstrapResponse:
c.onMaintainerBootstrapResponse(msg)
c.onMaintainerBootstrapResponse(ctx, msg)
case messaging.TypeMaintainerHeartbeatRequest:
if c.bootstrapper.AllNodesReady() {
req := msg.Message[0].(*heartbeatpb.MaintainerHeartbeat)
Expand Down Expand Up @@ -327,7 +327,7 @@ func (c *Controller) RequestResolvedTsFromLogCoordinator(ctx context.Context, ch
}
}

func (c *Controller) onNodeChanged() {
func (c *Controller) onNodeChanged(ctx context.Context) {
addedNodes, removedNodes, requests, responses := c.bootstrapper.HandleNodesChange(c.nodeManager.GetAliveNodes())
log.Info("controller detects node changed",
zap.Int("addedCount", len(addedNodes)),
Expand All @@ -345,24 +345,24 @@ func (c *Controller) onNodeChanged() {
zap.Any("targetNode", req.To), zap.Error(err))
}
}
c.handleBootstrapResponses(responses)
c.handleBootstrapResponses(ctx, responses)
}

func (c *Controller) onMaintainerBootstrapResponse(req *messaging.TargetMessage) {
func (c *Controller) onMaintainerBootstrapResponse(ctx context.Context, req *messaging.TargetMessage) {
response := req.Message[0].(*heartbeatpb.CoordinatorBootstrapResponse)
log.Info("controller received maintainer bootstrap response",
zap.Stringer("node", req.From),
zap.Int("maintainerCount", len(response.Statuses)))
responses := c.bootstrapper.HandleBootstrapResponse(req.From, response)
c.handleBootstrapResponses(responses)
c.handleBootstrapResponses(ctx, responses)
}

type remoteMaintainer struct {
nodeID node.ID
status *heartbeatpb.MaintainerStatus
}

func (c *Controller) handleBootstrapResponses(responses map[node.ID]*heartbeatpb.CoordinatorBootstrapResponse) {
func (c *Controller) handleBootstrapResponses(ctx context.Context, responses map[node.ID]*heartbeatpb.CoordinatorBootstrapResponse) {
if c.initialized.Load() || responses == nil {
return
}
Expand All @@ -385,7 +385,7 @@ func (c *Controller) handleBootstrapResponses(responses map[node.ID]*heartbeatpb
}
}
}
c.finishBootstrap(runningCfs)
c.finishBootstrap(ctx, runningCfs)
}

// handleMaintainerStatus handle the status report from the maintainers
Expand Down Expand Up @@ -504,17 +504,16 @@ func (c *Controller) updateChangefeedStatus(
// It will load all changefeeds from metastore, and compare with running changefeeds
// Then initialize the changefeeds that are not running on other nodes
// And construct all changefeeds state in memory.
func (c *Controller) finishBootstrap(runningChangefeeds map[common.ChangeFeedID]remoteMaintainer) {
func (c *Controller) finishBootstrap(ctx context.Context, runningChangefeeds map[common.ChangeFeedID]remoteMaintainer) {
// load all changefeeds from metastore, and check if the changefeed is already in workingMap
allChangefeeds, err := c.backend.GetAllChangefeeds(context.Background())
allChangefeeds, err := c.backend.GetAllChangefeeds(ctx)
if err != nil {
log.Panic("load all changefeeds failed", zap.Error(err))
}

// Register keyspace
schemaStore := appcontext.GetService[schemastore.SchemaStore](appcontext.SchemaStore)
registeredKeyspace := make(map[string]struct{})
ctx := context.Background()
for id := range allChangefeeds {
if _, ok := registeredKeyspace[id.Keyspace()]; ok {
continue
Expand Down Expand Up @@ -565,7 +564,7 @@ func (c *Controller) finishBootstrap(runningChangefeeds map[common.ChangeFeedID]
switch cfMeta.Status.Progress {
case config.ProgressStopping, config.ProgressRemoving:
remove := cfMeta.Status.Progress == config.ProgressRemoving
c.operatorController.StopChangefeed(context.Background(), cfID, remove)
c.operatorController.StopChangefeed(ctx, cfID, remove)
log.Info("stop changefeed when bootstrapping", zap.String("changefeed", cfID.String()), zap.Any("meta", cfMeta))
}
}
Expand Down
13 changes: 6 additions & 7 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (c *coordinator) recvMessages(ctx context.Context, msg *messaging.TargetMes

select {
case <-ctx.Done():
return ctx.Err()
return context.Cause(ctx)
default:
c.eventCh.In() <- &Event{message: msg}
}
Expand Down Expand Up @@ -202,7 +202,7 @@ func (c *coordinator) run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
return context.Cause(ctx)
case <-gcTicker.C:
if err := c.updateGCSafepoint(ctx); err != nil {
log.Warn("update gc safepoint failed",
Expand Down Expand Up @@ -231,9 +231,9 @@ func (c *coordinator) runHandleEvent(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
return context.Cause(ctx)
case event := <-c.eventCh.Out():
c.controller.HandleEvent(event)
c.controller.HandleEvent(ctx, event)
}
}
}
Expand All @@ -257,7 +257,7 @@ func (c *coordinator) handleStateChange(
if event.state == config.StateFailed || event.state == config.StateFinished {
progress = config.ProgressStopping
}
if err = c.backend.UpdateChangefeed(context.Background(), cfInfo, cf.GetStatus().CheckpointTs, progress); err != nil {
if err = c.backend.UpdateChangefeed(ctx, cfInfo, cf.GetStatus().CheckpointTs, progress); err != nil {
log.Error("failed to update changefeed state",
zap.Error(err))
return errors.Trace(err)
Expand Down Expand Up @@ -319,8 +319,7 @@ func (c *coordinator) checkStaleCheckpointTs(ctx context.Context, changefeed *ch
log.Warn("Failed to send state change event to stateChangedCh since context timeout, "+
"there may be a lot of state need to be handled. Try next time",
zap.String("changefeed", id.String()),
zap.Error(ctx.Err()))
return
zap.Error(context.Cause(ctx)))
case c.changefeedChangeCh <- []*changefeedChange{change}:
}
}
Expand Down
Loading