Explorar o código

adds server lock package

bergquist %!s(int64=7) %!d(string=hai) anos
pai
achega
dc49bebb00

+ 23 - 0
pkg/infra/serverlock/migrations.go

@@ -0,0 +1,23 @@
+package serverlock
+
+import "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
+
+// AddMigration create database migrations for server lock
+func (sl *ServerLockService) AddMigration(mg *migrator.Migrator) {
+	serverLock := migrator.Table{
+		Name: "server_lock",
+		Columns: []*migrator.Column{
+			{Name: "id", Type: migrator.DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true},
+			{Name: "operation_uid", Type: migrator.DB_Text},
+			{Name: "version", Type: migrator.DB_BigInt},
+			{Name: "last_execution", Type: migrator.DB_BigInt, Nullable: false},
+		},
+		Indices: []*migrator.Index{
+			{Cols: []string{"operation_uid"}, Type: migrator.UniqueIndex},
+		},
+	}
+
+	mg.AddMigration("create server_lock table", migrator.NewAddTableMigration(serverLock))
+
+	mg.AddMigration("add index server_lock.operation_uid", migrator.NewAddIndexMigration(serverLock, serverLock.Indices[0]))
+}

+ 8 - 0
pkg/infra/serverlock/model.go

@@ -0,0 +1,8 @@
+package serverlock
+
+type serverLock struct {
+	Id            int64
+	OperationUid  string
+	LastExecution int64
+	Version       int64
+}

+ 111 - 0
pkg/infra/serverlock/serverlock.go

@@ -0,0 +1,111 @@
+package serverlock
+
+import (
+	"context"
+	"time"
+
+	"github.com/grafana/grafana/pkg/log"
+	"github.com/grafana/grafana/pkg/registry"
+	"github.com/grafana/grafana/pkg/services/sqlstore"
+)
+
+func init() {
+	registry.RegisterService(&ServerLockService{})
+}
+
+// ServerLockService allows servers in HA mode to execute function once over in the group
+type ServerLockService struct {
+	SQLStore *sqlstore.SqlStore `inject:""`
+	log      log.Logger
+}
+
+// Init this service
+func (sl *ServerLockService) Init() error {
+	return nil
+}
+
+// OncePerServerGroup try to create a lock for this server and only executes the
+// `fn` function when successful. This should not be used at low internal. But services
+// that needs to be run once every ex 10m.
+func (sl *ServerLockService) OncePerServerGroup(ctx context.Context, actionName string, maxEvery time.Duration, fn func()) error {
+	rowLock, err := sl.getOrCreate(ctx, actionName)
+	if err != nil {
+		return err
+	}
+
+	if rowLock.LastExecution != 0 {
+		lastExeuctionTime := time.Unix(rowLock.LastExecution, 0)
+		if lastExeuctionTime.Unix() > time.Now().Add(-maxEvery).Unix() {
+			return nil
+		}
+	}
+
+	acquiredLock, err := sl.acquireLock(ctx, rowLock, maxEvery)
+	if err != nil {
+		return err
+	}
+
+	if acquiredLock {
+		fn()
+	}
+
+	return nil
+}
+
+func (sl *ServerLockService) acquireLock(ctx context.Context, serverLock *serverLock, maxEvery time.Duration) (bool, error) {
+	var result bool
+
+	err := sl.SQLStore.WithDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
+		newVersion := serverLock.Version + 1
+		sql := `UPDATE server_lock SET
+			version = ?,
+			last_execution = ?
+		WHERE
+			id = ? AND version = ?`
+
+		res, err := dbSession.Exec(sql, newVersion, time.Now().Unix(), serverLock.Id, serverLock.Version)
+		if err != nil {
+			return err
+		}
+
+		affected, err := res.RowsAffected()
+		result = affected == 1
+
+		return err
+	})
+
+	return result, err
+}
+
+func (sl *ServerLockService) getOrCreate(ctx context.Context, actionName string) (*serverLock, error) {
+	var result *serverLock
+
+	err := sl.SQLStore.WithTransactionalDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
+		lockRows := []*serverLock{}
+		err := dbSession.Where("operation_uid = ?", actionName).Find(&lockRows)
+		if err != nil {
+			return err
+		}
+
+		if len(lockRows) > 0 {
+			result = lockRows[0]
+			return nil
+		}
+
+		lockRow := &serverLock{
+			OperationUid:  actionName,
+			LastExecution: 0,
+		}
+
+		_, err = dbSession.Insert(lockRow)
+		if err != nil {
+			return err
+		}
+
+		result = lockRow
+
+		return nil
+	})
+
+	return result, err
+}

+ 75 - 0
pkg/infra/serverlock/serverlock_integration_test.go

@@ -0,0 +1,75 @@
+// +build integration
+
+package serverlock
+
+import (
+	"context"
+	"testing"
+	"time"
+
+	. "github.com/smartystreets/goconvey/convey"
+)
+
+func TestServerLok(t *testing.T) {
+	sl := createTestableServerLock(t)
+
+	Convey("Server lock integration test", t, func() {
+
+		Convey("Check that we can call OncePerServerGroup multiple times without executing callback", func() {
+			counter := 0
+			var err error
+
+			//this time `fn` should be executed
+			err = sl.OncePerServerGroup(context.Background(), "test-operation", time.Second*5, func() { counter++ })
+			So(err, ShouldBeNil)
+
+			//this should not execute `fn`
+			err = sl.OncePerServerGroup(context.Background(), "test-operation", time.Second*5, func() { counter++ })
+			So(err, ShouldBeNil)
+
+			//this should not execute `fn`
+			err = sl.OncePerServerGroup(context.Background(), "test-operation", time.Second*5, func() { counter++ })
+			So(err, ShouldBeNil)
+
+			// wg := sync.WaitGroup{}
+			// for i := 0; i < 3; i++ {
+			// 	wg.Add(1)
+			// 	go func(index int) {
+			// 		defer wg.Done()
+			// 		//sl := createTestableServerLock(t)
+			// 		//<-time.After(time.Second)
+
+			// 		j := 0
+			// 		for {
+			// 			select {
+			// 			case <-time.Tick(time.Second):
+			// 				fmt.Printf("running worker %d loop %d\n", index, j)
+			// 				err := sl.OncePerServerGroup(context.Background(), "test-operation", time.Second*2, func() {
+			// 					counter++
+			// 				})
+
+			// 				if err != nil {
+			// 					t.Errorf("expected. err: %v", err)
+			// 				}
+
+			// 				j++
+			// 				if j > 3 {
+			// 					return
+			// 				}
+			// 			}
+			// 		}
+			// 	}(i)
+			// }
+
+			// wg.Wait()
+
+			// wait 5 second.
+			<-time.After(time.Second * 10)
+
+			// now `fn` should be executed again
+			err = sl.OncePerServerGroup(context.Background(), "test-operation", time.Second*5, func() { counter++ })
+			So(err, ShouldBeNil)
+			So(counter, ShouldEqual, 2)
+		})
+	})
+}

+ 57 - 0
pkg/infra/serverlock/serverlock_test.go

@@ -0,0 +1,57 @@
+package serverlock
+
+import (
+	"context"
+	"testing"
+	"time"
+
+	"github.com/grafana/grafana/pkg/log"
+
+	"github.com/grafana/grafana/pkg/services/sqlstore"
+	. "github.com/smartystreets/goconvey/convey"
+)
+
+func createTestableServerLock(t *testing.T) *ServerLockService {
+	t.Helper()
+
+	sqlstore := sqlstore.InitTestDB(t)
+
+	return &ServerLockService{
+		SQLStore: sqlstore,
+		log:      log.New("test-logger"),
+	}
+}
+
+func TestServerLock(t *testing.T) {
+	Convey("Server lock", t, func() {
+		sl := createTestableServerLock(t)
+		operationUID := "test-operation"
+
+		first, err := sl.getOrCreate(context.Background(), operationUID)
+		So(err, ShouldBeNil)
+
+		lastExecution := first.LastExecution
+		Convey("trying to create three new row locks", func() {
+			for i := 0; i < 3; i++ {
+				first, err = sl.getOrCreate(context.Background(), operationUID)
+				So(err, ShouldBeNil)
+				So(first.OperationUid, ShouldEqual, operationUID)
+				So(first.Id, ShouldEqual, 1)
+			}
+
+			Convey("Should not create new since lock already exist", func() {
+				So(lastExecution, ShouldEqual, first.LastExecution)
+			})
+		})
+
+		Convey("Should be able to create lock on first row", func() {
+			gotLock, err := sl.acquireLock(context.Background(), first, time.Second*1)
+			So(err, ShouldBeNil)
+			So(gotLock, ShouldBeTrue)
+
+			gotLock, err = sl.acquireLock(context.Background(), first, time.Second*1)
+			So(err, ShouldBeNil)
+			So(gotLock, ShouldBeFalse)
+		})
+	})
+}