@@ -75,8 +75,9 @@ const stopOnReshardDelay = 500 * time.Millisecond
7575// no events, including heartbeats, from any of the shards.
7676var livenessTimeout = 10 * time .Minute
7777
78- // defaultTransactionChunkSizeBytes is the default threshold for accumulated transaction size in bytes.
79- const defaultTransactionChunkSizeBytes = 128 * 1024 * 1024 // 128MB
78+ // defaultTransactionChunkSizeBytes is the default threshold for chunking transactions.
79+ // 0 (the default value for protobuf int64) means disabled, clients must explicitly set a value to opt in for chunking.
80+ const defaultTransactionChunkSizeBytes = 0
8081
8182// vstream contains the metadata for one VStream request.
8283type vstream struct {
@@ -154,6 +155,10 @@ type vstream struct {
154155 flags * vtgatepb.VStreamFlags
155156}
156157
158+ func (vs * vstream ) isChunkingEnabled () bool {
159+ return vs .transactionChunkSizeBytes > 0
160+ }
161+
157162type journalEvent struct {
158163 journal * binlogdatapb.Journal
159164 participants map [* binlogdatapb.ShardGtid ]bool
@@ -723,6 +728,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
723728 }()
724729
725730 err = tabletConn .VStream (ctx , req , func (events []* binlogdatapb.VEvent ) error {
731+ // We received a valid event. Reset error count.
726732 errCount = 0
727733
728734 select {
@@ -748,9 +754,8 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
748754
749755 sendevents := make ([]* binlogdatapb.VEvent , 0 , len (events ))
750756 for i , event := range events {
751- vs .streamLivenessTimer .Reset (livenessTimeout )
757+ vs .streamLivenessTimer .Reset (livenessTimeout ) // Any event in the stream demonstrates liveness
752758 accumulatedSize += event .SizeVT ()
753-
754759 switch event .Type {
755760 case binlogdatapb .VEventType_BEGIN :
756761 // Mark the start of a transaction.
@@ -769,18 +774,18 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
769774 sendevents = append (sendevents , event )
770775 eventss = append (eventss , sendevents )
771776
772- // If MinimizeSkew is enabled, then perform stream alignment, before acquiring the transaction lock.
773- // This ensures that we will not hold the lock while we are waiting for streams to align.
774777 if err := vs .alignStreams (ctx , event , sgtid .Keyspace , sgtid .Shard ); err != nil {
775778 return vterrors .Wrap (err , aligningStreamsErr )
776779 }
777780
778781 var sendErr error
779- if txLockHeld {
782+ if vs .isChunkingEnabled () && txLockHeld {
783+ // If chunking is enabled and we are holding the lock (only possible to acquire lock when chunking is enabled), then send the events.
780784 sendErr = vs .sendEventsLocked (ctx , sgtid , eventss )
781785 vs .mu .Unlock ()
782786 txLockHeld = false
783787 } else {
788+ // If chunking is not enabled or this transaction was small enough to not need chunking, fall back to default behavior of sending entire transaction atomically.
784789 sendErr = vs .sendAll (ctx , sgtid , eventss )
785790 }
786791 if sendErr != nil {
@@ -881,20 +886,26 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
881886 eventss = append (eventss , sendevents )
882887 }
883888
884- if ! inTransaction && txLockHeld {
889+ // If chunking is enabled, and we are holding the lock (only possible when enabled), and we are not in a transaction
890+ // release the lock (this should not ever execute, acts as a safety check).
891+ if vs .isChunkingEnabled () && txLockHeld && ! inTransaction {
892+ log .Warning ("Detected held lock but not in a transaction, releasing the lock" )
885893 vs .mu .Unlock ()
886894 txLockHeld = false
887895 }
888896
889- if len (eventss ) > 0 && txLockHeld {
897+ // If chunking is enabled, and we are holding the lock (only possible when chunking is enabled), send the events.
898+ if vs .isChunkingEnabled () && txLockHeld && len (eventss ) > 0 {
890899 if err := vs .sendEventsLocked (ctx , sgtid , eventss ); err != nil {
891900 log .Infof ("vstream for %s/%s, error in sendAll at end of callback: %v" , sgtid .Keyspace , sgtid .Shard , err )
892901 return vterrors .Wrap (err , sendingEventsErr )
893902 }
894903 eventss = nil
895904 }
896905
897- if inTransaction && ! txLockHeld && accumulatedSize > vs .transactionChunkSizeBytes && ! vs .minimizeSkew {
906+ // If chunking is enabled and minimize skew is disabled, and we are in a transaction, and we do not yet hold the lock, and the accumulated size is greater than our chunk size
907+ // then acquire the lock, so that we can send the events, and begin chunking the transaction.
908+ if vs .isChunkingEnabled () && ! vs .minimizeSkew && inTransaction && ! txLockHeld && accumulatedSize > vs .transactionChunkSizeBytes {
898909 log .Infof ("vstream for %s/%s: transaction size %d bytes exceeds chunk size %d bytes, acquiring lock for contiguous, chunked delivery" ,
899910 sgtid .Keyspace , sgtid .Shard , accumulatedSize , vs .transactionChunkSizeBytes )
900911 vs .vsm .vstreamsTransactionsChunked .Add (labelValues , 1 )
0 commit comments