serverlock.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package serverlock
  2. import (
  3. "context"
  4. "time"
  5. "github.com/grafana/grafana/pkg/log"
  6. "github.com/grafana/grafana/pkg/registry"
  7. "github.com/grafana/grafana/pkg/services/sqlstore"
  8. )
  9. func init() {
  10. registry.RegisterService(&ServerLockService{})
  11. }
  12. // ServerLockService allows servers in HA mode to execute function once over in the group
  13. type ServerLockService struct {
  14. SQLStore *sqlstore.SqlStore `inject:""`
  15. log log.Logger
  16. }
  17. // Init this service
  18. func (sl *ServerLockService) Init() error {
  19. sl.log = log.New("infra.lockservice")
  20. return nil
  21. }
  22. // OncePerServerGroup try to create a lock for this server and only executes the
  23. // `fn` function when successful. This should not be used at low internal. But services
  24. // that needs to be run once every ex 10m.
  25. func (sl *ServerLockService) OncePerServerGroup(ctx context.Context, actionName string, maxInterval time.Duration, fn func()) error {
  26. // gets or creates a lockable row
  27. rowLock, err := sl.getOrCreate(ctx, actionName)
  28. if err != nil {
  29. return err
  30. }
  31. // avoid execution if last lock happened less than `matInterval` ago
  32. if rowLock.LastExecution != 0 {
  33. lastExeuctionTime := time.Unix(rowLock.LastExecution, 0)
  34. if lastExeuctionTime.Unix() > time.Now().Add(-maxInterval).Unix() {
  35. return nil
  36. }
  37. }
  38. // try to get lock based on rowLow version
  39. acquiredLock, err := sl.acquireLock(ctx, rowLock)
  40. if err != nil {
  41. return err
  42. }
  43. if acquiredLock {
  44. fn()
  45. }
  46. return nil
  47. }
  48. func (sl *ServerLockService) acquireLock(ctx context.Context, serverLock *serverLock) (bool, error) {
  49. var result bool
  50. err := sl.SQLStore.WithDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
  51. newVersion := serverLock.Version + 1
  52. sql := `UPDATE server_lock SET
  53. version = ?,
  54. last_execution = ?
  55. WHERE
  56. id = ? AND version = ?`
  57. res, err := dbSession.Exec(sql, newVersion, time.Now().Unix(), serverLock.Id, serverLock.Version)
  58. if err != nil {
  59. return err
  60. }
  61. affected, err := res.RowsAffected()
  62. result = affected == 1
  63. return err
  64. })
  65. return result, err
  66. }
  67. func (sl *ServerLockService) getOrCreate(ctx context.Context, actionName string) (*serverLock, error) {
  68. var result *serverLock
  69. err := sl.SQLStore.WithTransactionalDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
  70. lockRows := []*serverLock{}
  71. err := dbSession.Where("operation_uid = ?", actionName).Find(&lockRows)
  72. if err != nil {
  73. return err
  74. }
  75. if len(lockRows) > 0 {
  76. result = lockRows[0]
  77. return nil
  78. }
  79. lockRow := &serverLock{
  80. OperationUid: actionName,
  81. LastExecution: 0,
  82. }
  83. _, err = dbSession.Insert(lockRow)
  84. if err != nil {
  85. return err
  86. }
  87. result = lockRow
  88. return nil
  89. })
  90. return result, err
  91. }