transactions.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package sqlstore
  2. import (
  3. "context"
  4. "time"
  5. "github.com/grafana/grafana/pkg/bus"
  6. "github.com/grafana/grafana/pkg/log"
  7. sqlite3 "github.com/mattn/go-sqlite3"
  8. )
  9. func (ss *SqlStore) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error {
  10. return ss.inTransactionWithRetry(ctx, fn, 0)
  11. }
  12. func (ss *SqlStore) inTransactionWithRetry(ctx context.Context, fn func(ctx context.Context) error, retry int) error {
  13. sess := startSession(ctx)
  14. defer sess.Close()
  15. withValue := context.WithValue(ctx, ContextSessionName, sess)
  16. err := fn(withValue)
  17. // special handling of database locked errors for sqlite, then we can retry 3 times
  18. if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 {
  19. if sqlError.Code == sqlite3.ErrLocked {
  20. sess.Rollback()
  21. time.Sleep(time.Millisecond * time.Duration(10))
  22. ss.log.Info("Database table locked, sleeping then retrying", "retry", retry)
  23. return ss.inTransactionWithRetry(ctx, fn, retry+1)
  24. }
  25. }
  26. if err != nil {
  27. sess.Rollback()
  28. return err
  29. }
  30. if err = sess.Commit(); err != nil {
  31. return err
  32. }
  33. if len(sess.events) > 0 {
  34. for _, e := range sess.events {
  35. if err = bus.Publish(e); err != nil {
  36. ss.log.Error("Failed to publish event after commit", err)
  37. }
  38. }
  39. }
  40. return nil
  41. }
  42. func inTransactionWithRetry(callback dbTransactionFunc, retry int) error {
  43. return inTransactionWithRetryCtx(context.Background(), callback, retry)
  44. }
  45. func inTransactionWithRetryCtx(ctx context.Context, callback dbTransactionFunc, retry int) error {
  46. var err error
  47. sess := startSession(ctx)
  48. defer sess.Close()
  49. if err = sess.Begin(); err != nil {
  50. return err
  51. }
  52. err = callback(sess)
  53. // special handling of database locked errors for sqlite, then we can retry 3 times
  54. if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 {
  55. if sqlError.Code == sqlite3.ErrLocked {
  56. sess.Rollback()
  57. time.Sleep(time.Millisecond * time.Duration(10))
  58. sqlog.Info("Database table locked, sleeping then retrying", "retry", retry)
  59. return inTransactionWithRetry(callback, retry+1)
  60. }
  61. }
  62. if err != nil {
  63. sess.Rollback()
  64. return err
  65. } else if err = sess.Commit(); err != nil {
  66. return err
  67. }
  68. if len(sess.events) > 0 {
  69. for _, e := range sess.events {
  70. if err = bus.Publish(e); err != nil {
  71. log.Error(3, "Failed to publish event after commit", err)
  72. }
  73. }
  74. }
  75. return nil
  76. }
  77. func inTransaction(callback dbTransactionFunc) error {
  78. return inTransactionWithRetry(callback, 0)
  79. }