migrator.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package migrator
  2. import (
  3. "time"
  4. _ "github.com/go-sql-driver/mysql"
  5. "github.com/go-xorm/xorm"
  6. "github.com/grafana/grafana/pkg/log"
  7. _ "github.com/lib/pq"
  8. _ "github.com/mattn/go-sqlite3"
  9. )
  10. type Migrator struct {
  11. x *xorm.Engine
  12. Dialect Dialect
  13. migrations []Migration
  14. Logger log.Logger
  15. }
  16. type MigrationLog struct {
  17. Id int64
  18. MigrationId string
  19. Sql string
  20. Success bool
  21. Error string
  22. Timestamp time.Time
  23. }
  24. func NewMigrator(engine *xorm.Engine) *Migrator {
  25. mg := &Migrator{}
  26. mg.x = engine
  27. mg.Logger = log.New("migrator")
  28. mg.migrations = make([]Migration, 0)
  29. mg.Dialect = NewDialect(mg.x)
  30. return mg
  31. }
  32. func (mg *Migrator) MigrationsCount() int {
  33. return len(mg.migrations)
  34. }
  35. func (mg *Migrator) AddMigration(id string, m Migration) {
  36. m.SetId(id)
  37. mg.migrations = append(mg.migrations, m)
  38. }
  39. func (mg *Migrator) GetMigrationLog() (map[string]MigrationLog, error) {
  40. logMap := make(map[string]MigrationLog)
  41. logItems := make([]MigrationLog, 0)
  42. exists, err := mg.x.IsTableExist(new(MigrationLog))
  43. if err != nil {
  44. return nil, err
  45. }
  46. if !exists {
  47. return logMap, nil
  48. }
  49. if err = mg.x.Find(&logItems); err != nil {
  50. return nil, err
  51. }
  52. for _, logItem := range logItems {
  53. if !logItem.Success {
  54. continue
  55. }
  56. logMap[logItem.MigrationId] = logItem
  57. }
  58. return logMap, nil
  59. }
  60. func (mg *Migrator) Start() error {
  61. mg.Logger.Info("Starting DB migration")
  62. logMap, err := mg.GetMigrationLog()
  63. if err != nil {
  64. return err
  65. }
  66. for _, m := range mg.migrations {
  67. _, exists := logMap[m.Id()]
  68. if exists {
  69. mg.Logger.Debug("Skipping migration: Already executed", "id", m.Id())
  70. continue
  71. }
  72. sql := m.Sql(mg.Dialect)
  73. record := MigrationLog{
  74. MigrationId: m.Id(),
  75. Sql: sql,
  76. Timestamp: time.Now(),
  77. }
  78. err := mg.inTransaction(func(sess *xorm.Session) error {
  79. err := mg.exec(m, sess)
  80. if err != nil {
  81. mg.Logger.Error("Exec failed", "error", err, "sql", sql)
  82. record.Error = err.Error()
  83. sess.Insert(&record)
  84. return err
  85. }
  86. record.Success = true
  87. sess.Insert(&record)
  88. return nil
  89. })
  90. if err != nil {
  91. return err
  92. }
  93. }
  94. return nil
  95. }
  96. func (mg *Migrator) exec(m Migration, sess *xorm.Session) error {
  97. mg.Logger.Info("Executing migration", "id", m.Id())
  98. condition := m.GetCondition()
  99. if condition != nil {
  100. sql, args := condition.Sql(mg.Dialect)
  101. if sql != "" {
  102. mg.Logger.Debug("Executing migration condition sql", "id", m.Id(), "sql", sql, "args", args)
  103. results, err := sess.SQL(sql, args...).Query()
  104. if err != nil {
  105. mg.Logger.Error("Executing migration condition failed", "id", m.Id(), "error", err)
  106. return err
  107. }
  108. if !condition.IsFulfilled(results) {
  109. mg.Logger.Warn("Skipping migration: Already executed, but not recorded in migration log", "id", m.Id())
  110. return nil
  111. }
  112. }
  113. }
  114. var err error
  115. if codeMigration, ok := m.(CodeMigration); ok {
  116. mg.Logger.Debug("Executing code migration", "id", m.Id())
  117. err = codeMigration.Exec(sess, mg)
  118. } else {
  119. sql := m.Sql(mg.Dialect)
  120. mg.Logger.Debug("Executing sql migration", "id", m.Id(), "sql", sql)
  121. _, err = sess.Exec(sql)
  122. }
  123. if err != nil {
  124. mg.Logger.Error("Executing migration failed", "id", m.Id(), "error", err)
  125. return err
  126. }
  127. return nil
  128. }
  129. type dbTransactionFunc func(sess *xorm.Session) error
  130. func (mg *Migrator) inTransaction(callback dbTransactionFunc) error {
  131. var err error
  132. sess := mg.x.NewSession()
  133. defer sess.Close()
  134. if err = sess.Begin(); err != nil {
  135. return err
  136. }
  137. err = callback(sess)
  138. if err != nil {
  139. sess.Rollback()
  140. return err
  141. } else if err = sess.Commit(); err != nil {
  142. return err
  143. }
  144. return nil
  145. }