shared.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package sqlstore
  2. import (
  3. "context"
  4. "reflect"
  5. "time"
  6. "github.com/go-xorm/xorm"
  7. "github.com/grafana/grafana/pkg/bus"
  8. "github.com/grafana/grafana/pkg/log"
  9. sqlite3 "github.com/mattn/go-sqlite3"
  10. )
  11. type DBSession struct {
  12. *xorm.Session
  13. events []interface{}
  14. }
  15. type dbTransactionFunc func(sess *DBSession) error
  16. func (sess *DBSession) publishAfterCommit(msg interface{}) {
  17. sess.events = append(sess.events, msg)
  18. }
  19. func newSession() *DBSession {
  20. return &DBSession{Session: x.NewSession()}
  21. }
  22. func inTransaction(callback dbTransactionFunc) error {
  23. return inTransactionWithRetry(callback, 0)
  24. }
  25. func startSession(ctx context.Context) *DBSession {
  26. value := ctx.Value(ContextSessionName)
  27. var sess *xorm.Session
  28. sess, ok := value.(*xorm.Session)
  29. if !ok {
  30. return newSession()
  31. }
  32. old := newSession()
  33. old.Session = sess
  34. return old
  35. }
  36. func withDbSession(ctx context.Context, callback dbTransactionFunc) error {
  37. sess := startSession(ctx)
  38. return callback(sess)
  39. }
  40. func inTransactionWithRetry(callback dbTransactionFunc, retry int) error {
  41. return inTransactionWithRetryCtx(context.Background(), callback, retry)
  42. }
  43. func inTransactionWithRetryCtx(ctx context.Context, callback dbTransactionFunc, retry int) error {
  44. var err error
  45. sess := startSession(ctx)
  46. defer sess.Close()
  47. if err = sess.Begin(); err != nil {
  48. return err
  49. }
  50. err = callback(sess)
  51. // special handling of database locked errors for sqlite, then we can retry 3 times
  52. if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 {
  53. if sqlError.Code == sqlite3.ErrLocked {
  54. sess.Rollback()
  55. time.Sleep(time.Millisecond * time.Duration(10))
  56. sqlog.Info("Database table locked, sleeping then retrying", "retry", retry)
  57. return inTransactionWithRetry(callback, retry+1)
  58. }
  59. }
  60. if err != nil {
  61. sess.Rollback()
  62. return err
  63. } else if err = sess.Commit(); err != nil {
  64. return err
  65. }
  66. if len(sess.events) > 0 {
  67. for _, e := range sess.events {
  68. if err = bus.Publish(e); err != nil {
  69. log.Error(3, "Failed to publish event after commit", err)
  70. }
  71. }
  72. }
  73. return nil
  74. }
  75. func (sess *DBSession) InsertId(bean interface{}) (int64, error) {
  76. table := sess.DB().Mapper.Obj2Table(getTypeName(bean))
  77. dialect.PreInsertId(table, sess.Session)
  78. id, err := sess.Session.InsertOne(bean)
  79. dialect.PostInsertId(table, sess.Session)
  80. return id, err
  81. }
  82. func getTypeName(bean interface{}) (res string) {
  83. t := reflect.TypeOf(bean)
  84. for t.Kind() == reflect.Ptr {
  85. t = t.Elem()
  86. }
  87. return t.Name()
  88. }