Skip to content

Commit 02e9287

Browse files
author
Shlomi Noach
authored
Merge branch 'master' into concurrent-rowcount-defaults-true
2 parents 10850e4 + 3435539 commit 02e9287

File tree

13 files changed

+125
-33
lines changed

13 files changed

+125
-33
lines changed

RELEASE_VERSION

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
1.0.28

build.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#
33
#
44

5-
RELEASE_VERSION="1.0.26"
5+
RELEASE_VERSION=$(cat RELEASE_VERSION)
66

77
function build {
88
osname=$1

go/base/context.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,19 +37,28 @@ const (
3737
CutOverTwoStep = iota
3838
)
3939

40+
type ThrottleReasonHint string
41+
42+
const (
43+
NoThrottleReasonHint ThrottleReasonHint = "NoThrottleReasonHint"
44+
UserCommandThrottleReasonHint = "UserCommandThrottleReasonHint"
45+
)
46+
4047
var (
4148
envVariableRegexp = regexp.MustCompile("[$][{](.*)[}]")
4249
)
4350

4451
type ThrottleCheckResult struct {
4552
ShouldThrottle bool
4653
Reason string
54+
ReasonHint ThrottleReasonHint
4755
}
4856

49-
func NewThrottleCheckResult(throttle bool, reason string) *ThrottleCheckResult {
57+
func NewThrottleCheckResult(throttle bool, reason string, reasonHint ThrottleReasonHint) *ThrottleCheckResult {
5058
return &ThrottleCheckResult{
5159
ShouldThrottle: throttle,
5260
Reason: reason,
61+
ReasonHint: reasonHint,
5362
}
5463
}
5564

@@ -138,13 +147,15 @@ type MigrationContext struct {
138147
TotalDMLEventsApplied int64
139148
isThrottled bool
140149
throttleReason string
150+
throttleReasonHint ThrottleReasonHint
141151
throttleGeneralCheckResult ThrottleCheckResult
142152
throttleMutex *sync.Mutex
143153
IsPostponingCutOver int64
144154
CountingRowsFlag int64
145155
AllEventsUpToLockProcessedInjectedFlag int64
146156
CleanupImminentFlag int64
147157
UserCommandedUnpostponeFlag int64
158+
CutOverCompleteFlag int64
148159
PanicAbort chan error
149160

150161
OriginalTableColumnsOnApplier *sql.ColumnList
@@ -416,17 +427,18 @@ func (this *MigrationContext) GetThrottleGeneralCheckResult() *ThrottleCheckResu
416427
return &result
417428
}
418429

419-
func (this *MigrationContext) SetThrottled(throttle bool, reason string) {
430+
func (this *MigrationContext) SetThrottled(throttle bool, reason string, reasonHint ThrottleReasonHint) {
420431
this.throttleMutex.Lock()
421432
defer this.throttleMutex.Unlock()
422433
this.isThrottled = throttle
423434
this.throttleReason = reason
435+
this.throttleReasonHint = reasonHint
424436
}
425437

426-
func (this *MigrationContext) IsThrottled() (bool, string) {
438+
func (this *MigrationContext) IsThrottled() (bool, string, ThrottleReasonHint) {
427439
this.throttleMutex.Lock()
428440
defer this.throttleMutex.Unlock()
429-
return this.isThrottled, this.throttleReason
441+
return this.isThrottled, this.throttleReason, this.throttleReasonHint
430442
}
431443

432444
func (this *MigrationContext) GetReplicationLagQuery() string {

go/binlog/gomysql_reader.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,9 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven
118118

119119
// StreamEvents
120120
func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error {
121+
if canStopStreaming() {
122+
return nil
123+
}
121124
for {
122125
if canStopStreaming() {
123126
break
@@ -148,3 +151,8 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
148151

149152
return nil
150153
}
154+
155+
func (this *GoMySQLReader) Close() error {
156+
this.binlogSyncer.Close()
157+
return nil
158+
}

go/logic/applier.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,9 @@ func (this *Applier) InitiateHeartbeat() {
305305
// Generally speaking, we would issue a goroutine, but I'd actually rather
306306
// have this block the loop rather than spam the master in the event something
307307
// goes wrong
308+
if throttle, _, reasonHint := this.migrationContext.IsThrottled(); throttle && (reasonHint == base.UserCommandThrottleReasonHint) {
309+
continue
310+
}
308311
if err := injectHeartbeat(); err != nil {
309312
return
310313
}

go/logic/inspect.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -335,12 +335,27 @@ func (this *Inspector) validateLogSlaveUpdates() error {
335335
if err := this.db.QueryRow(query).Scan(&logSlaveUpdates); err != nil {
336336
return err
337337
}
338-
if !logSlaveUpdates && !this.migrationContext.InspectorIsAlsoApplier() && !this.migrationContext.IsTungsten {
339-
return fmt.Errorf("%s:%d must have log_slave_updates enabled", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
338+
339+
if logSlaveUpdates {
340+
log.Infof("log_slave_updates validated on %s:%d", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
341+
return nil
340342
}
341343

342-
log.Infof("binary logs updates validated on %s:%d", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
343-
return nil
344+
if this.migrationContext.IsTungsten {
345+
log.Warning("log_slave_updates not found on %s:%d, but --tungsten provided, so I'm proceeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
346+
return nil
347+
}
348+
349+
if this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica {
350+
return fmt.Errorf("%s:%d must have log_slave_updates enabled for testing/migrating on replica", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
351+
}
352+
353+
if this.migrationContext.InspectorIsAlsoApplier() {
354+
log.Warning("log_slave_updates not found on %s:%d, but executing directly on master, so I'm proceeeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
355+
return nil
356+
}
357+
358+
return fmt.Errorf("%s:%d must have log_slave_updates enabled for executing migration", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
344359
}
345360

346361
// validateTable makes sure the table we need to operate on actually exists

go/logic/migrator.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ func (this *Migrator) consumeRowCopyComplete() {
171171
}
172172

173173
func (this *Migrator) canStopStreaming() bool {
174-
return false
174+
return atomic.LoadInt64(&this.migrationContext.CutOverCompleteFlag) != 0
175175
}
176176

177177
// onChangelogStateEvent is called when a binlog event operation on the changelog table is intercepted.
@@ -345,6 +345,7 @@ func (this *Migrator) Migrate() (err error) {
345345
if err := this.cutOver(); err != nil {
346346
return err
347347
}
348+
atomic.StoreInt64(&this.migrationContext.CutOverCompleteFlag, 1)
348349

349350
if err := this.finalCleanup(); err != nil {
350351
return nil
@@ -803,7 +804,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
803804
} else if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 {
804805
eta = "due"
805806
state = "postponing cut-over"
806-
} else if isThrottled, throttleReason := this.migrationContext.IsThrottled(); isThrottled {
807+
} else if isThrottled, throttleReason, _ := this.migrationContext.IsThrottled(); isThrottled {
807808
state = fmt.Sprintf("throttled, %s", throttleReason)
808809
}
809810

@@ -1058,6 +1059,9 @@ func (this *Migrator) finalCleanup() error {
10581059
log.Errore(err)
10591060
}
10601061
}
1062+
if err := this.eventsStreamer.Close(); err != nil {
1063+
log.Errore(err)
1064+
}
10611065

10621066
if err := this.retryOperation(this.applier.DropChangelogTable); err != nil {
10631067
return err

go/logic/streamer.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,3 +217,9 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
217217
}
218218
}
219219
}
220+
221+
func (this *EventsStreamer) Close() (err error) {
222+
err = this.binlogReader.Close()
223+
log.Infof("Closed streamer connection. err=%+v", err)
224+
return err
225+
}

go/logic/throttler.go

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,16 @@ func NewThrottler(applier *Applier, inspector *Inspector) *Throttler {
3434
// shouldThrottle performs checks to see whether we should currently be throttling.
3535
// It merely observes the metrics collected by other components, it does not issue
3636
// its own metric collection.
37-
func (this *Throttler) shouldThrottle() (result bool, reason string) {
37+
func (this *Throttler) shouldThrottle() (result bool, reason string, reasonHint base.ThrottleReasonHint) {
3838
generalCheckResult := this.migrationContext.GetThrottleGeneralCheckResult()
3939
if generalCheckResult.ShouldThrottle {
40-
return generalCheckResult.ShouldThrottle, generalCheckResult.Reason
40+
return generalCheckResult.ShouldThrottle, generalCheckResult.Reason, generalCheckResult.ReasonHint
4141
}
4242
// Replication lag throttle
4343
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
4444
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
4545
if time.Duration(lag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
46-
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds())
46+
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds()), base.NoThrottleReasonHint
4747
}
4848
checkThrottleControlReplicas := true
4949
if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) {
@@ -52,14 +52,14 @@ func (this *Throttler) shouldThrottle() (result bool, reason string) {
5252
if checkThrottleControlReplicas {
5353
lagResult := this.migrationContext.GetControlReplicasLagResult()
5454
if lagResult.Err != nil {
55-
return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err)
55+
return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err), base.NoThrottleReasonHint
5656
}
5757
if lagResult.Lag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
58-
return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds())
58+
return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds()), base.NoThrottleReasonHint
5959
}
6060
}
6161
// Got here? No metrics indicates we need throttling.
62-
return false, ""
62+
return false, "", base.NoThrottleReasonHint
6363
}
6464

6565
// parseChangelogHeartbeat is called when a heartbeat event is intercepted
@@ -147,8 +147,8 @@ func (this *Throttler) criticalLoadIsMet() (met bool, variableName string, value
147147
// collectGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext
148148
func (this *Throttler) collectGeneralThrottleMetrics() error {
149149

150-
setThrottle := func(throttle bool, reason string) error {
151-
this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(throttle, reason))
150+
setThrottle := func(throttle bool, reason string, reasonHint base.ThrottleReasonHint) error {
151+
this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(throttle, reason, reasonHint))
152152
return nil
153153
}
154154

@@ -161,7 +161,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
161161

162162
criticalLoadMet, variableName, value, threshold, err := this.criticalLoadIsMet()
163163
if err != nil {
164-
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err))
164+
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err), base.NoThrottleReasonHint)
165165
}
166166
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 {
167167
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
@@ -181,38 +181,38 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
181181

182182
// User-based throttle
183183
if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 {
184-
return setThrottle(true, "commanded by user")
184+
return setThrottle(true, "commanded by user", base.UserCommandThrottleReasonHint)
185185
}
186186
if this.migrationContext.ThrottleFlagFile != "" {
187187
if base.FileExists(this.migrationContext.ThrottleFlagFile) {
188188
// Throttle file defined and exists!
189-
return setThrottle(true, "flag-file")
189+
return setThrottle(true, "flag-file", base.NoThrottleReasonHint)
190190
}
191191
}
192192
if this.migrationContext.ThrottleAdditionalFlagFile != "" {
193193
if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) {
194194
// 2nd Throttle file defined and exists!
195-
return setThrottle(true, "flag-file")
195+
return setThrottle(true, "flag-file", base.NoThrottleReasonHint)
196196
}
197197
}
198198

199199
maxLoad := this.migrationContext.GetMaxLoad()
200200
for variableName, threshold := range maxLoad {
201201
value, err := this.applier.ShowStatusVariable(variableName)
202202
if err != nil {
203-
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err))
203+
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err), base.NoThrottleReasonHint)
204204
}
205205
if value >= threshold {
206-
return setThrottle(true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold))
206+
return setThrottle(true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold), base.NoThrottleReasonHint)
207207
}
208208
}
209209
if this.migrationContext.GetThrottleQuery() != "" {
210210
if res, _ := this.applier.ExecuteThrottleQuery(); res > 0 {
211-
return setThrottle(true, "throttle-query")
211+
return setThrottle(true, "throttle-query", base.NoThrottleReasonHint)
212212
}
213213
}
214214

215-
return setThrottle(false, "")
215+
return setThrottle(false, "", base.NoThrottleReasonHint)
216216
}
217217

218218
// initiateThrottlerMetrics initiates the various processes that collect measurements
@@ -237,8 +237,8 @@ func (this *Throttler) initiateThrottlerChecks() error {
237237
throttlerTick := time.Tick(100 * time.Millisecond)
238238

239239
throttlerFunction := func() {
240-
alreadyThrottling, currentReason := this.migrationContext.IsThrottled()
241-
shouldThrottle, throttleReason := this.shouldThrottle()
240+
alreadyThrottling, currentReason, _ := this.migrationContext.IsThrottled()
241+
shouldThrottle, throttleReason, throttleReasonHint := this.shouldThrottle()
242242
if shouldThrottle && !alreadyThrottling {
243243
// New throttling
244244
this.applier.WriteAndLogChangelog("throttle", throttleReason)
@@ -249,7 +249,7 @@ func (this *Throttler) initiateThrottlerChecks() error {
249249
// End of throttling
250250
this.applier.WriteAndLogChangelog("throttle", "done throttling")
251251
}
252-
this.migrationContext.SetThrottled(shouldThrottle, throttleReason)
252+
this.migrationContext.SetThrottled(shouldThrottle, throttleReason, throttleReasonHint)
253253
}
254254
throttlerFunction()
255255
for range throttlerTick {
@@ -265,7 +265,7 @@ func (this *Throttler) throttle(onThrottled func()) {
265265
for {
266266
// IsThrottled() is non-blocking; the throttling decision making takes place asynchronously.
267267
// Therefore calling IsThrottled() is cheap
268-
if shouldThrottle, _ := this.migrationContext.IsThrottled(); !shouldThrottle {
268+
if shouldThrottle, _, _ := this.migrationContext.IsThrottled(); !shouldThrottle {
269269
return
270270
}
271271
if onThrottled != nil {
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
drop table if exists gh_ost_test;
2+
create table gh_ost_test (
3+
id int auto_increment,
4+
i int not null,
5+
dt0 datetime(6),
6+
dt1 datetime(6),
7+
ts2 timestamp(6),
8+
updated tinyint unsigned default 0,
9+
primary key(id),
10+
key i_idx(i)
11+
) auto_increment=1;
12+
13+
drop event if exists gh_ost_test;
14+
delimiter ;;
15+
create event gh_ost_test
16+
on schedule every 1 second
17+
starts current_timestamp
18+
ends current_timestamp + interval 60 second
19+
on completion not preserve
20+
enable
21+
do
22+
begin
23+
insert into gh_ost_test values (null, 11, now(), now(), now(), 0);
24+
update gh_ost_test set dt1='2016-10-31 11:22:33.444', updated = 1 where i = 11 order by id desc limit 1;
25+
26+
insert into gh_ost_test values (null, 13, now(), now(), now(), 0);
27+
update gh_ost_test set ts1='2016-11-01 11:22:33.444', updated = 1 where i = 13 order by id desc limit 1;
28+
end ;;

0 commit comments

Comments
 (0)