database_storage.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package remotecache
  2. import (
  3. "context"
  4. "time"
  5. "github.com/grafana/grafana/pkg/infra/log"
  6. "github.com/grafana/grafana/pkg/services/sqlstore"
  7. )
  8. var getTime = time.Now
  9. const databaseCacheType = "database"
  10. type databaseCache struct {
  11. SQLStore *sqlstore.SqlStore
  12. log log.Logger
  13. }
  14. func newDatabaseCache(sqlstore *sqlstore.SqlStore) *databaseCache {
  15. dc := &databaseCache{
  16. SQLStore: sqlstore,
  17. log: log.New("remotecache.database"),
  18. }
  19. return dc
  20. }
  21. func (dc *databaseCache) Run(ctx context.Context) error {
  22. ticker := time.NewTicker(time.Minute * 10)
  23. for {
  24. select {
  25. case <-ctx.Done():
  26. return ctx.Err()
  27. case <-ticker.C:
  28. dc.internalRunGC()
  29. }
  30. }
  31. }
  32. func (dc *databaseCache) internalRunGC() {
  33. err := dc.SQLStore.WithDbSession(context.Background(), func(session *sqlstore.DBSession) error {
  34. now := getTime().Unix()
  35. sql := `DELETE FROM cache_data WHERE (? - created_at) >= expires AND expires <> 0`
  36. _, err := session.Exec(sql, now)
  37. return err
  38. })
  39. if err != nil {
  40. dc.log.Error("failed to run garbage collect", "error", err)
  41. }
  42. }
  43. func (dc *databaseCache) Get(key string) (interface{}, error) {
  44. cacheHit := CacheData{}
  45. session := dc.SQLStore.NewSession()
  46. defer session.Close()
  47. exist, err := session.Where("cache_key= ?", key).Get(&cacheHit)
  48. if err != nil {
  49. return nil, err
  50. }
  51. if !exist {
  52. return nil, ErrCacheItemNotFound
  53. }
  54. if cacheHit.Expires > 0 {
  55. existedButExpired := getTime().Unix()-cacheHit.CreatedAt >= cacheHit.Expires
  56. if existedButExpired {
  57. err = dc.Delete(key) //ignore this error since we will return `ErrCacheItemNotFound` anyway
  58. if err != nil {
  59. dc.log.Debug("Deletion of expired key failed: %v", err)
  60. }
  61. return nil, ErrCacheItemNotFound
  62. }
  63. }
  64. item := &cachedItem{}
  65. if err = decodeGob(cacheHit.Data, item); err != nil {
  66. return nil, err
  67. }
  68. return item.Val, nil
  69. }
  70. func (dc *databaseCache) Set(key string, value interface{}, expire time.Duration) error {
  71. item := &cachedItem{Val: value}
  72. data, err := encodeGob(item)
  73. if err != nil {
  74. return err
  75. }
  76. session := dc.SQLStore.NewSession()
  77. defer session.Close()
  78. var expiresInSeconds int64
  79. if expire != 0 {
  80. expiresInSeconds = int64(expire) / int64(time.Second)
  81. }
  82. // attempt to insert the key
  83. sql := `INSERT INTO cache_data (cache_key,data,created_at,expires) VALUES(?,?,?,?)`
  84. _, err = session.Exec(sql, key, data, getTime().Unix(), expiresInSeconds)
  85. if err != nil {
  86. // attempt to update if a unique constrain violation or a deadlock (for MySQL) occurs
  87. // if the update fails propagate the error
  88. // which eventually will result in a key that is not finally set
  89. // but since it's a cache does not harm a lot
  90. if dc.SQLStore.Dialect.IsUniqueConstraintViolation(err) || dc.SQLStore.Dialect.IsDeadlock(err) {
  91. sql := `UPDATE cache_data SET data=?, created_at=?, expires=? WHERE cache_key=?`
  92. _, err = session.Exec(sql, data, getTime().Unix(), expiresInSeconds, key)
  93. if err != nil && dc.SQLStore.Dialect.IsDeadlock(err) {
  94. // most probably somebody else is upserting the key
  95. // so it is safe enough not to propagate this error
  96. return nil
  97. }
  98. }
  99. }
  100. return err
  101. }
  102. func (dc *databaseCache) Delete(key string) error {
  103. return dc.SQLStore.WithDbSession(context.Background(), func(session *sqlstore.DBSession) error {
  104. sql := "DELETE FROM cache_data WHERE cache_key=?"
  105. _, err := session.Exec(sql, key)
  106. return err
  107. })
  108. }
  109. // CacheData is the struct representing the table in the database
  110. type CacheData struct {
  111. CacheKey string
  112. Data []byte
  113. Expires int64
  114. CreatedAt int64
  115. }