diff --git a/go/base/context.go b/go/base/context.go index 41baae696..5ffd94fa8 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -102,6 +102,7 @@ type MigrationContext struct { GoogleCloudPlatform bool AzureMySQL bool AttemptInstantDDL bool + OceanBaseBinlogService bool config ContextConfig configMutex *sync.Mutex diff --git a/go/base/utils.go b/go/base/utils.go index 725bb2279..3527602a0 100644 --- a/go/base/utils.go +++ b/go/base/utils.go @@ -6,14 +6,13 @@ package base import ( + gosql "database/sql" "fmt" "os" "regexp" "strings" "time" - gosql "database/sql" - "github.com/github/gh-ost/go/mysql" ) @@ -75,7 +74,7 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, // AliyunRDS set users port to "NULL", replace it by gh-ost param // GCP set users port to "NULL", replace it by gh-ost param // Azure MySQL set users port to a different value by design, replace it by gh-ost para - if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL { + if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL || migrationContext.OceanBaseBinlogService { port = connectionConfig.Key.Port } else { portQuery := `select @@global.port` diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 39c815bc8..e447efaed 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -87,6 +87,7 @@ func main() { flag.BoolVar(&migrationContext.AliyunRDS, "aliyun-rds", false, "set to 'true' when you execute on Aliyun RDS.") flag.BoolVar(&migrationContext.GoogleCloudPlatform, "gcp", false, "set to 'true' when you execute on a 1st generation Google Cloud Platform (GCP).") flag.BoolVar(&migrationContext.AzureMySQL, "azure", false, "set to 'true' when you execute on Azure Database on MySQL.") + flag.BoolVar(&migrationContext.OceanBaseBinlogService, "oceanbase", false, "set to 'true' when you execute on OceanBase Binlog Service") executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit") flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration replication is stopped, and tables are swapped and immediately swap-revert. Replication remains stopped and you can compare the two tables for building trust") diff --git a/go/logic/applier.go b/go/logic/applier.go index 9b190919f..19765abfa 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -92,7 +92,7 @@ func (this *Applier) InitDBConnections() (err error) { if err := this.validateAndReadTimeZone(); err != nil { return err } - if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL { + if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL && !this.migrationContext.OceanBaseBinlogService { if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil { return err } else { @@ -670,24 +670,28 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected return chunkSize, rowsAffected, duration, nil } -// LockOriginalTable places a write lock on the original table -func (this *Applier) LockOriginalTable() error { - query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, - sql.EscapeName(this.migrationContext.DatabaseName), - sql.EscapeName(this.migrationContext.OriginalTableName), - ) - this.migrationContext.Log.Infof("Locking %s.%s", - sql.EscapeName(this.migrationContext.DatabaseName), - sql.EscapeName(this.migrationContext.OriginalTableName), - ) +// lockTable places a write lock on the specific table +func (this *Applier) lockTable(databaseName, tableName string) error { + query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, databaseName, tableName) + this.migrationContext.Log.Infof("Locking %s.%s", databaseName, tableName) this.migrationContext.LockTablesStartTime = time.Now() if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil { return err } - this.migrationContext.Log.Infof("Table locked") + this.migrationContext.Log.Infof("Table %s.%s locked", databaseName, tableName) return nil } +// LockOriginalTable places a write lock on the original table +func (this *Applier) LockOriginalTable() error { + return this.lockTable(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName) +} + +// LockGhostTable places a write lock on the ghost table +func (this *Applier) LockGhostTable() error { + return this.lockTable(this.migrationContext.DatabaseName, this.migrationContext.GetGhostTableName()) +} + // UnlockTables makes tea. No wait, it unlocks tables. func (this *Applier) UnlockTables() error { query := `unlock /* gh-ost */ tables` @@ -968,7 +972,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2 this.migrationContext.Log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds) - query = fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, tableLockTimeoutSeconds) + query = fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout=%d`, tableLockTimeoutSeconds) if _, err := tx.Exec(query); err != nil { tableLocked <- err return err @@ -1037,25 +1041,31 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke return nil } -// AtomicCutoverRename -func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error { - tx, err := this.db.Begin() +func (this *Applier) atomicCutoverRename(db *gosql.DB, sessionIdChan chan int64, tablesRenamed chan<- error) error { + tx, err := db.Begin() if err != nil { return err } defer func() { tx.Rollback() - sessionIdChan <- -1 - tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads") + if sessionIdChan != nil { + sessionIdChan <- -1 + } + if tablesRenamed != nil { + tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads") + } }() - var sessionId int64 - if err := tx.QueryRow(`select /* gh-ost */ connection_id()`).Scan(&sessionId); err != nil { - return err + + if sessionIdChan != nil { + var sessionId int64 + if err := tx.QueryRow(`select /* gh-ost */ connection_id()`).Scan(&sessionId); err != nil { + return err + } + sessionIdChan <- sessionId } - sessionIdChan <- sessionId this.migrationContext.Log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.CutOverLockTimeoutSeconds) - query := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds) + query := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout=%d`, this.migrationContext.CutOverLockTimeoutSeconds) if _, err := tx.Exec(query); err != nil { return err } @@ -1072,14 +1082,28 @@ func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed ) this.migrationContext.Log.Infof("Issuing and expecting this to block: %s", query) if _, err := tx.Exec(query); err != nil { - tablesRenamed <- err + if tablesRenamed != nil { + tablesRenamed <- err + } return this.migrationContext.Log.Errore(err) } - tablesRenamed <- nil + if tablesRenamed != nil { + tablesRenamed <- nil + } this.migrationContext.Log.Infof("Tables renamed") return nil } +// AtomicCutoverRename renames tables for atomic cut over in non lock session +func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error { + return this.atomicCutoverRename(this.db, sessionIdChan, tablesRenamed) +} + +// AtomicCutoverRenameWithLock renames tables for atomic cut over in the lock session +func (this *Applier) AtomicCutoverRenameWithLock() error { + return this.atomicCutoverRename(this.singletonDB, nil, nil) +} + func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) { query := fmt.Sprintf(`show /* gh-ost */ global status like '%s'`, variableName) if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil { diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 9d414a43e..e811db53f 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -56,7 +56,7 @@ func (this *Inspector) InitDBConnections() (err error) { if err := this.validateConnection(); err != nil { return err } - if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL { + if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL && !this.migrationContext.OceanBaseBinlogService { if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil { return err } else { diff --git a/go/logic/migrator.go b/go/logic/migrator.go index fed7c944b..faefec592 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -200,6 +200,11 @@ func (this *Migrator) canStopStreaming() bool { // onChangelogEvent is called when a binlog event operation on the changelog table is intercepted. func (this *Migrator) onChangelogEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) { + if dmlEvent.NewColumnValues == nil { + // in some compatible systems, such as OceanBase Binlog Service, an UPSERT event is + // converted to a DELETE event and an INSERT event, we need to skip the DELETE event. + return nil + } // Hey, I created the changelog table, I know the type of columns it has! switch hint := dmlEvent.NewColumnValues.StringColumn(2); hint { case "state": @@ -551,9 +556,15 @@ func (this *Migrator) cutOver() (err error) { switch this.migrationContext.CutOverType { case base.CutOverAtomic: - // Atomic solution: we use low timeout and multiple attempts. But for - // each failed attempt, we throttle until replication lag is back to normal - err = this.atomicCutOver() + if this.migrationContext.OceanBaseBinlogService || !mysql.IsSmallerMinorVersion(this.migrationContext.ApplierMySQLVersion, "8.0.13") { + // Atomic solution for latest MySQL: cut over the tables in the same session where the origin + // table and ghost table are both locked, it can only work on MySQL 8.0.13 or later versions + err = this.atomicCutOverMySQL8() + } else { + // Atomic solution: we use low timeout and multiple attempts. But for + // each failed attempt, we throttle until replication lag is back to normal + err = this.atomicCutOver() + } case base.CutOverTwoStep: err = this.cutOverTwoStep() default: @@ -632,6 +643,39 @@ func (this *Migrator) cutOverTwoStep() (err error) { return nil } +// atomicCutOverMySQL8 will lock down the original table and the ghost table, execute +// what's left of last DML entries, and atomically swap original->old, then new->original. +// It requires to execute RENAME TABLE when the table is LOCKED under WRITE LOCK, which is +// supported from MySQL 8.0.13, see https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-13.html. +func (this *Migrator) atomicCutOverMySQL8() (err error) { + atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) + defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) + atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0) + + if err := this.retryOperation(this.applier.LockOriginalTable); err != nil { + return err + } + + if err := this.retryOperation(this.waitForEventsUpToLock); err != nil { + return err + } + if err := this.retryOperation(this.applier.LockGhostTable); err != nil { + return err + } + + if err := this.applier.AtomicCutoverRenameWithLock(); err != nil { + return err + } + if err := this.retryOperation(this.applier.UnlockTables); err != nil { + return err + } + + lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime) + renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime) + this.migrationContext.Log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) + return nil +} + // atomicCutOver func (this *Migrator) atomicCutOver() (err error) { atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) diff --git a/go/mysql/utils.go b/go/mysql/utils.go index c69a3f255..8315eb0f5 100644 --- a/go/mysql/utils.go +++ b/go/mysql/utils.go @@ -8,6 +8,7 @@ package mysql import ( gosql "database/sql" "fmt" + "strconv" "strings" "sync" "time" @@ -211,3 +212,47 @@ func Kill(db *gosql.DB, connectionID string) error { _, err := db.Exec(`KILL QUERY %s`, connectionID) return err } + +func versionTokens(version string, digits int) []int { + v := strings.Split(version, "-")[0] + tokens := strings.Split(v, ".") + intTokens := make([]int, digits) + for i := range tokens { + if i >= digits { + break + } + intTokens[i], _ = strconv.Atoi(tokens[i]) + } + return intTokens +} + +func isSmallerVersion(version string, otherVersion string, digits int) bool { + v := versionTokens(version, digits) + o := versionTokens(otherVersion, digits) + for i := 0; i < len(v); i++ { + if v[i] < o[i] { + return true + } + if v[i] > o[i] { + return false + } + if i == digits { + break + } + } + return false +} + +// IsSmallerMajorVersion tests two versions against another and returns true if +// the former is a smaller "major" version than the latter. +// e.g. 5.5.36 is NOT a smaller major version as compared to 5.5.40, but IS as compared to 5.6.9 +func IsSmallerMajorVersion(version string, otherVersion string) bool { + return isSmallerVersion(version, otherVersion, 2) +} + +// IsSmallerMinorVersion tests two versions against another and returns true if +// the former is a smaller "minor" version than the latter. +// e.g. 5.5.36 is a smaller major version as compared to 5.5.40, as well as compared to 5.6.7 +func IsSmallerMinorVersion(version string, otherVersion string) bool { + return isSmallerVersion(version, otherVersion, 3) +}