Просмотр исходного кода

AuthProxy: Optimistic lock pattern for remote cache Set (#17485)

* Implementation of optimistic lock pattern

Try to insert the remote cache key and handle integrity error

* Remove transaction

Integrity error inside a transaction results in deadlock

* Remove check for existing remote cache key

Is no longer needed since integrity constrain violations are handled

* Add check for integrity constrain violation

Do not update the row if the insert statement fails
for other than an integrity constrain violation

* Handle failing inserts because of deadlocks

If the insert statement fails because of a deadlock
try to update the row

* Add utility function for returning SQL error code

Useful for debugging

* Add logging for failing expired cache key deletion

Do not shallow it completely

* Revert "Add utility function for returning SQL error code"

This reverts commit 8e0b82c79633e7d8bc350823cbbab2ac7a58c0a5.

* Better log for failing deletion of expired cache key

* Add some comments

* Remove check for existing cache key

Attempt to insert the key without checking if it's already there
and handle the error situations

* Do not propagate deadlocks created during update

Most probably somebody else is trying to insert/update
the key at the same time so it is safe enough to ignore it
Sofia Papagiannaki 6 лет назад
Родитель
Сommit
7b70e7db2d

+ 31 - 23
pkg/infra/remotecache/database_storage.go

@@ -70,7 +70,10 @@ func (dc *databaseCache) Get(key string) (interface{}, error) {
 	if cacheHit.Expires > 0 {
 		existedButExpired := getTime().Unix()-cacheHit.CreatedAt >= cacheHit.Expires
 		if existedButExpired {
-			_ = dc.Delete(key) //ignore this error since we will return `ErrCacheItemNotFound` anyway
+			err = dc.Delete(key) //ignore this error since we will return `ErrCacheItemNotFound` anyway
+			if err != nil {
+				dc.log.Debug("Deletion of expired key failed: %v", err)
+			}
 			return nil, ErrCacheItemNotFound
 		}
 	}
@@ -84,35 +87,40 @@ func (dc *databaseCache) Get(key string) (interface{}, error) {
 }
 
 func (dc *databaseCache) Set(key string, value interface{}, expire time.Duration) error {
-	return dc.SQLStore.WithTransactionalDbSession(context.Background(), func(session *sqlstore.DBSession) error {
-		item := &cachedItem{Val: value}
-		data, err := encodeGob(item)
-		if err != nil {
-			return err
-		}
+	item := &cachedItem{Val: value}
+	data, err := encodeGob(item)
+	if err != nil {
+		return err
+	}
 
-		var cacheHit CacheData
-		has, err := session.Where("cache_key = ?", key).Get(&cacheHit)
-		if err != nil {
-			return err
-		}
+	session := dc.SQLStore.NewSession()
+	defer session.Close()
 
-		var expiresInSeconds int64
-		if expire != 0 {
-			expiresInSeconds = int64(expire) / int64(time.Second)
-		}
+	var expiresInSeconds int64
+	if expire != 0 {
+		expiresInSeconds = int64(expire) / int64(time.Second)
+	}
 
-		// insert or update depending on if item already exist
-		if has {
+	// attempt to insert the key
+	sql := `INSERT INTO cache_data (cache_key,data,created_at,expires) VALUES(?,?,?,?)`
+	_, err = session.Exec(sql, key, data, getTime().Unix(), expiresInSeconds)
+	if err != nil {
+		// attempt to update if a unique constrain violation or a deadlock (for MySQL) occurs
+		// if the update fails propagate the error
+		// which eventually will result in a key that is not finally set
+		// but since it's a cache does not harm a lot
+		if dc.SQLStore.Dialect.IsUniqueConstraintViolation(err) || dc.SQLStore.Dialect.IsDeadlock(err) {
 			sql := `UPDATE cache_data SET data=?, created_at=?, expires=? WHERE cache_key=?`
 			_, err = session.Exec(sql, data, getTime().Unix(), expiresInSeconds, key)
-		} else {
-			sql := `INSERT INTO cache_data (cache_key,data,created_at,expires) VALUES(?,?,?,?)`
-			_, err = session.Exec(sql, key, data, getTime().Unix(), expiresInSeconds)
+			if err != nil && dc.SQLStore.Dialect.IsDeadlock(err) {
+				// most probably somebody else is upserting the key
+				// so it is safe enough not to propagate this error
+				return nil
+			}
 		}
+	}
 
-		return err
-	})
+	return err
 }
 
 func (dc *databaseCache) Delete(key string) error {

+ 1 - 0
pkg/services/sqlstore/migrator/dialect.go

@@ -48,6 +48,7 @@ type Dialect interface {
 	NoOpSql() string
 
 	IsUniqueConstraintViolation(err error) bool
+	IsDeadlock(err error) bool
 }
 
 func NewDialect(engine *xorm.Engine) Dialect {

+ 10 - 2
pkg/services/sqlstore/migrator/mysql_dialect.go

@@ -134,12 +134,20 @@ func (db *Mysql) CleanDB() error {
 	return nil
 }
 
-func (db *Mysql) IsUniqueConstraintViolation(err error) bool {
+func (db *Mysql) isThisError(err error, errcode uint16) bool {
 	if driverErr, ok := err.(*mysql.MySQLError); ok {
-		if driverErr.Number == mysqlerr.ER_DUP_ENTRY {
+		if driverErr.Number == errcode {
 			return true
 		}
 	}
 
 	return false
 }
+
+func (db *Mysql) IsUniqueConstraintViolation(err error) bool {
+	return db.isThisError(err, mysqlerr.ER_DUP_ENTRY)
+}
+
+func (db *Mysql) IsDeadlock(err error) bool {
+	return db.isThisError(err, mysqlerr.ER_LOCK_DEADLOCK)
+}

+ 10 - 2
pkg/services/sqlstore/migrator/postgres_dialect.go

@@ -138,12 +138,20 @@ func (db *Postgres) CleanDB() error {
 	return nil
 }
 
-func (db *Postgres) IsUniqueConstraintViolation(err error) bool {
+func (db *Postgres) isThisError(err error, errcode string) bool {
 	if driverErr, ok := err.(*pq.Error); ok {
-		if driverErr.Code == "23505" {
+		if string(driverErr.Code) == errcode {
 			return true
 		}
 	}
 
 	return false
 }
+
+func (db *Postgres) IsUniqueConstraintViolation(err error) bool {
+	return db.isThisError(err, "23505")
+}
+
+func (db *Postgres) IsDeadlock(err error) bool {
+	return db.isThisError(err, "40P01")
+}

+ 10 - 2
pkg/services/sqlstore/migrator/sqlite_dialect.go

@@ -85,12 +85,20 @@ func (db *Sqlite3) CleanDB() error {
 	return nil
 }
 
-func (db *Sqlite3) IsUniqueConstraintViolation(err error) bool {
+func (db *Sqlite3) isThisError(err error, errcode int) bool {
 	if driverErr, ok := err.(sqlite3.Error); ok {
-		if driverErr.ExtendedCode == sqlite3.ErrConstraintUnique {
+		if int(driverErr.ExtendedCode) == errcode {
 			return true
 		}
 	}
 
 	return false
 }
+
+func (db *Sqlite3) IsUniqueConstraintViolation(err error) bool {
+	return db.isThisError(err, int(sqlite3.ErrConstraintUnique))
+}
+
+func (db *Sqlite3) IsDeadlock(err error) bool {
+	return false // No deadlock
+}