serverlock.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package serverlock
  2. import (
  3. "context"
  4. "time"
  5. "github.com/grafana/grafana/pkg/infra/log"
  6. "github.com/grafana/grafana/pkg/registry"
  7. "github.com/grafana/grafana/pkg/services/sqlstore"
  8. )
  9. func init() {
  10. registry.RegisterService(&ServerLockService{})
  11. }
  12. // ServerLockService allows servers in HA mode to claim a lock
  13. // and execute an function if the server was granted the lock
  14. type ServerLockService struct {
  15. SQLStore *sqlstore.SqlStore `inject:""`
  16. log log.Logger
  17. }
  18. // Init this service
  19. func (sl *ServerLockService) Init() error {
  20. sl.log = log.New("infra.lockservice")
  21. return nil
  22. }
  23. // LockAndExecute try to create a lock for this server and only executes the
  24. // `fn` function when successful. This should not be used at low internal. But services
  25. // that needs to be run once every ex 10m.
  26. func (sl *ServerLockService) LockAndExecute(ctx context.Context, actionName string, maxInterval time.Duration, fn func()) error {
  27. // gets or creates a lockable row
  28. rowLock, err := sl.getOrCreate(ctx, actionName)
  29. if err != nil {
  30. return err
  31. }
  32. // avoid execution if last lock happened less than `maxInterval` ago
  33. if rowLock.LastExecution != 0 {
  34. lastExeuctionTime := time.Unix(rowLock.LastExecution, 0)
  35. if lastExeuctionTime.Unix() > time.Now().Add(-maxInterval).Unix() {
  36. return nil
  37. }
  38. }
  39. // try to get lock based on rowLow version
  40. acquiredLock, err := sl.acquireLock(ctx, rowLock)
  41. if err != nil {
  42. return err
  43. }
  44. if acquiredLock {
  45. fn()
  46. }
  47. return nil
  48. }
  49. func (sl *ServerLockService) acquireLock(ctx context.Context, serverLock *serverLock) (bool, error) {
  50. var result bool
  51. err := sl.SQLStore.WithDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
  52. newVersion := serverLock.Version + 1
  53. sql := `UPDATE server_lock SET
  54. version = ?,
  55. last_execution = ?
  56. WHERE
  57. id = ? AND version = ?`
  58. res, err := dbSession.Exec(sql, newVersion, time.Now().Unix(), serverLock.Id, serverLock.Version)
  59. if err != nil {
  60. return err
  61. }
  62. affected, err := res.RowsAffected()
  63. result = affected == 1
  64. return err
  65. })
  66. return result, err
  67. }
  68. func (sl *ServerLockService) getOrCreate(ctx context.Context, actionName string) (*serverLock, error) {
  69. var result *serverLock
  70. err := sl.SQLStore.WithTransactionalDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
  71. lockRows := []*serverLock{}
  72. err := dbSession.Where("operation_uid = ?", actionName).Find(&lockRows)
  73. if err != nil {
  74. return err
  75. }
  76. if len(lockRows) > 0 {
  77. result = lockRows[0]
  78. return nil
  79. }
  80. lockRow := &serverLock{
  81. OperationUid: actionName,
  82. LastExecution: 0,
  83. }
  84. _, err = dbSession.Insert(lockRow)
  85. if err != nil {
  86. return err
  87. }
  88. result = lockRow
  89. return nil
  90. })
  91. return result, err
  92. }