Explorar o código

Merge branch 'go_routines'

Torkel Ödegaard %!s(int64=9) %!d(string=hai) anos
pai
achega
c28d004731

+ 37 - 9
pkg/cmd/grafana-server/main.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"context"
 	"flag"
 	"fmt"
 	"io/ioutil"
@@ -12,18 +13,26 @@ import (
 	"syscall"
 	"time"
 
+	"golang.org/x/sync/errgroup"
+
 	"github.com/grafana/grafana/pkg/log"
 	"github.com/grafana/grafana/pkg/login"
 	"github.com/grafana/grafana/pkg/metrics"
 	"github.com/grafana/grafana/pkg/plugins"
-	alertingInit "github.com/grafana/grafana/pkg/services/alerting/init"
-	"github.com/grafana/grafana/pkg/services/backgroundtasks"
+	"github.com/grafana/grafana/pkg/services/cleanup"
 	"github.com/grafana/grafana/pkg/services/eventpublisher"
 	"github.com/grafana/grafana/pkg/services/notifications"
 	"github.com/grafana/grafana/pkg/services/search"
 	"github.com/grafana/grafana/pkg/services/sqlstore"
 	"github.com/grafana/grafana/pkg/setting"
 	"github.com/grafana/grafana/pkg/social"
+
+	"github.com/grafana/grafana/pkg/services/alerting"
+	_ "github.com/grafana/grafana/pkg/services/alerting/conditions"
+	_ "github.com/grafana/grafana/pkg/services/alerting/notifiers"
+	_ "github.com/grafana/grafana/pkg/tsdb/graphite"
+	_ "github.com/grafana/grafana/pkg/tsdb/prometheus"
+	_ "github.com/grafana/grafana/pkg/tsdb/testdata"
 )
 
 var version = "3.1.0"
@@ -57,26 +66,41 @@ func main() {
 	setting.BuildCommit = commit
 	setting.BuildStamp = buildstampInt64
 
-	go listenToSystemSignals()
+	appContext, shutdownFn := context.WithCancel(context.Background())
+	grafanaGroup, appContext := errgroup.WithContext(appContext)
+
+	go listenToSystemSignals(shutdownFn, grafanaGroup)
 
 	flag.Parse()
 	writePIDFile()
+
 	initRuntime()
+	initSql()
 	metrics.Init()
 	search.Init()
 	login.Init()
 	social.NewOAuthService()
 	eventpublisher.Init()
 	plugins.Init()
-	alertingInit.Init()
-	backgroundtasks.Init()
+
+	// init alerting
+	if setting.AlertingEnabled {
+		engine := alerting.NewEngine()
+		grafanaGroup.Go(func() error { return engine.Run(appContext) })
+	}
+
+	// cleanup service
+	cleanUpService := cleanup.NewCleanUpService()
+	grafanaGroup.Go(func() error { return cleanUpService.Run(appContext) })
 
 	if err := notifications.Init(); err != nil {
 		log.Fatal(3, "Notification service failed to initialize", err)
 	}
 
-	StartServer()
-	exitChan <- 0
+	exitCode := StartServer()
+
+	grafanaGroup.Wait()
+	exitChan <- exitCode
 }
 
 func initRuntime() {
@@ -94,7 +118,9 @@ func initRuntime() {
 	logger.Info("Starting Grafana", "version", version, "commit", commit, "compiled", time.Unix(setting.BuildStamp, 0))
 
 	setting.LogConfigurationInfo()
+}
 
+func initSql() {
 	sqlstore.NewEngine()
 	sqlstore.EnsureAdminUser()
 }
@@ -117,7 +143,7 @@ func writePIDFile() {
 	}
 }
 
-func listenToSystemSignals() {
+func listenToSystemSignals(cancel context.CancelFunc, grafanaGroup *errgroup.Group) {
 	signalChan := make(chan os.Signal, 1)
 	code := 0
 
@@ -125,7 +151,7 @@ func listenToSystemSignals() {
 
 	select {
 	case sig := <-signalChan:
-		log.Info("Received signal %s. shutting down", sig)
+		log.Info2("Received system signal. Shutting down", "signal", sig)
 	case code = <-exitChan:
 		switch code {
 		case 0:
@@ -135,6 +161,8 @@ func listenToSystemSignals() {
 		}
 	}
 
+	cancel()
+	grafanaGroup.Wait()
 	log.Close()
 	os.Exit(code)
 }

+ 5 - 4
pkg/cmd/grafana-server/web.go

@@ -6,7 +6,6 @@ package main
 import (
 	"fmt"
 	"net/http"
-	"os"
 	"path"
 
 	"gopkg.in/macaron.v1"
@@ -79,7 +78,7 @@ func mapStatic(m *macaron.Macaron, rootDir string, dir string, prefix string) {
 	))
 }
 
-func StartServer() {
+func StartServer() int {
 	logger = log.New("server")
 
 	var err error
@@ -95,11 +94,13 @@ func StartServer() {
 		err = http.ListenAndServeTLS(listenAddr, setting.CertFile, setting.KeyFile, m)
 	default:
 		logger.Error("Invalid protocol", "protocol", setting.Protocol)
-		os.Exit(1)
+		return 1
 	}
 
 	if err != nil {
 		logger.Error("Fail to start server", "error", err)
-		os.Exit(1)
+		return 1
 	}
+
+	return 0
 }

+ 3 - 0
pkg/models/dashboard_snapshot.go

@@ -63,6 +63,9 @@ type DeleteDashboardSnapshotCommand struct {
 	DeleteKey string `json:"-"`
 }
 
+type DeleteExpiredSnapshotsCommand struct {
+}
+
 type GetDashboardSnapshotQuery struct {
 	Key string
 

+ 0 - 7
pkg/models/timer.go

@@ -1,7 +0,0 @@
-package models
-
-import "time"
-
-type HourCommand struct {
-	Time time.Time
-}

+ 58 - 19
pkg/services/alerting/engine.go

@@ -1,10 +1,12 @@
 package alerting
 
 import (
+	"context"
 	"time"
 
 	"github.com/benbjohnson/clock"
 	"github.com/grafana/grafana/pkg/log"
+	"golang.org/x/sync/errgroup"
 )
 
 type Engine struct {
@@ -34,12 +36,19 @@ func NewEngine() *Engine {
 	return e
 }
 
-func (e *Engine) Start() {
-	e.log.Info("Starting Alerting Engine")
+func (e *Engine) Run(ctx context.Context) error {
+	e.log.Info("Initializing Alerting")
 
-	go e.alertingTicker()
-	go e.execDispatcher()
-	go e.resultDispatcher()
+	g, ctx := errgroup.WithContext(ctx)
+
+	g.Go(func() error { return e.alertingTicker(ctx) })
+	g.Go(func() error { return e.execDispatcher(ctx) })
+	g.Go(func() error { return e.resultDispatcher(ctx) })
+
+	err := g.Wait()
+
+	e.log.Info("Stopped Alerting", "reason", err)
+	return err
 }
 
 func (e *Engine) Stop() {
@@ -47,7 +56,7 @@ func (e *Engine) Stop() {
 	close(e.resultQueue)
 }
 
-func (e *Engine) alertingTicker() {
+func (e *Engine) alertingTicker(grafanaCtx context.Context) error {
 	defer func() {
 		if err := recover(); err != nil {
 			e.log.Error("Scheduler Panic: stopping alertingTicker", "error", err, "stack", log.Stack(1))
@@ -58,6 +67,8 @@ func (e *Engine) alertingTicker() {
 
 	for {
 		select {
+		case <-grafanaCtx.Done():
+			return grafanaCtx.Err()
 		case tick := <-e.ticker.C:
 			// TEMP SOLUTION update rules ever tenth tick
 			if tickIndex%10 == 0 {
@@ -70,31 +81,59 @@ func (e *Engine) alertingTicker() {
 	}
 }
 
-func (e *Engine) execDispatcher() {
-	for job := range e.execQueue {
-		e.log.Debug("Starting executing alert rule", "alert id", job.Rule.Id)
-		go e.executeJob(job)
+func (e *Engine) execDispatcher(grafanaCtx context.Context) error {
+	for {
+		select {
+		case <-grafanaCtx.Done():
+			close(e.resultQueue)
+			return grafanaCtx.Err()
+		case job := <-e.execQueue:
+			go e.executeJob(grafanaCtx, job)
+		}
 	}
 }
 
-func (e *Engine) executeJob(job *Job) {
+func (e *Engine) executeJob(grafanaCtx context.Context, job *Job) error {
 	defer func() {
 		if err := recover(); err != nil {
 			e.log.Error("Execute Alert Panic", "error", err, "stack", log.Stack(1))
 		}
 	}()
 
-	job.Running = true
-	context := NewEvalContext(job.Rule)
-	e.evalHandler.Eval(context)
-	job.Running = false
+	done := make(chan *EvalContext, 1)
+	go func() {
+		job.Running = true
+		context := NewEvalContext(job.Rule)
+		e.evalHandler.Eval(context)
+		job.Running = false
+		done <- context
+		close(done)
+	}()
+
+	select {
 
-	e.resultQueue <- context
+	case <-grafanaCtx.Done():
+		return grafanaCtx.Err()
+	case evalContext := <-done:
+		e.resultQueue <- evalContext
+	}
+
+	return nil
 }
 
-func (e *Engine) resultDispatcher() {
-	for result := range e.resultQueue {
-		go e.handleResponse(result)
+func (e *Engine) resultDispatcher(grafanaCtx context.Context) error {
+	for {
+		select {
+		case <-grafanaCtx.Done():
+			//handle all responses before shutting down.
+			for result := range e.resultQueue {
+				e.handleResponse(result)
+			}
+
+			return grafanaCtx.Err()
+		case result := <-e.resultQueue:
+			e.handleResponse(result)
+		}
 	}
 }
 

+ 5 - 5
pkg/services/alerting/init/init.go

@@ -1,6 +1,8 @@
 package init
 
 import (
+	"context"
+
 	"github.com/grafana/grafana/pkg/services/alerting"
 	_ "github.com/grafana/grafana/pkg/services/alerting/conditions"
 	_ "github.com/grafana/grafana/pkg/services/alerting/notifiers"
@@ -10,13 +12,11 @@ import (
 	_ "github.com/grafana/grafana/pkg/tsdb/testdata"
 )
 
-var engine *alerting.Engine
-
-func Init() {
+func Init(ctx context.Context) error {
 	if !setting.AlertingEnabled {
-		return
+		return nil
 	}
 
 	engine = alerting.NewEngine()
-	engine.Start()
+	return engine.Start(ctx)
 }

+ 0 - 39
pkg/services/backgroundtasks/background_tasks.go

@@ -1,39 +0,0 @@
-//"I want to be a cleaner, just like you," said Mathilda
-//"Okay," replied Leon
-
-package backgroundtasks
-
-import (
-	"time"
-
-	"github.com/grafana/grafana/pkg/bus"
-	"github.com/grafana/grafana/pkg/log"
-	"github.com/grafana/grafana/pkg/models"
-)
-
-var (
-	tlog log.Logger = log.New("ticker")
-)
-
-func Init() {
-	go start()
-}
-
-func start() {
-	go cleanup(time.Now())
-
-	ticker := time.NewTicker(time.Hour * 1)
-	for {
-		select {
-		case tick := <-ticker.C:
-			go cleanup(tick)
-		}
-	}
-}
-
-func cleanup(now time.Time) {
-	err := bus.Publish(&models.HourCommand{Time: now})
-	if err != nil {
-		tlog.Error("Cleanup job failed", "error", err)
-	}
-}

+ 0 - 38
pkg/services/backgroundtasks/remove_tmp_images.go

@@ -1,38 +0,0 @@
-package backgroundtasks
-
-import (
-	"io/ioutil"
-	"os"
-	"path"
-
-	"github.com/grafana/grafana/pkg/bus"
-	"github.com/grafana/grafana/pkg/models"
-	"github.com/grafana/grafana/pkg/setting"
-)
-
-func init() {
-	bus.AddEventListener(CleanTmpFiles)
-}
-
-func CleanTmpFiles(cmd *models.HourCommand) error {
-	files, err := ioutil.ReadDir(setting.ImagesDir)
-
-	var toDelete []os.FileInfo
-	for _, file := range files {
-		if file.ModTime().AddDate(0, 0, setting.RenderedImageTTLDays).Before(cmd.Time) {
-			toDelete = append(toDelete, file)
-		}
-	}
-
-	for _, file := range toDelete {
-		fullPath := path.Join(setting.ImagesDir, file.Name())
-		err := os.Remove(fullPath)
-		if err != nil {
-			return err
-		}
-	}
-
-	tlog.Debug("Found old rendered image to delete", "deleted", len(toDelete), "keept", len(files))
-
-	return err
-}

+ 81 - 0
pkg/services/cleanup/cleanup.go

@@ -0,0 +1,81 @@
+package cleanup
+
+import (
+	"context"
+	"io/ioutil"
+	"os"
+	"path"
+	"time"
+
+	"golang.org/x/sync/errgroup"
+
+	"github.com/grafana/grafana/pkg/bus"
+	"github.com/grafana/grafana/pkg/log"
+	m "github.com/grafana/grafana/pkg/models"
+	"github.com/grafana/grafana/pkg/setting"
+)
+
+type CleanUpService struct {
+	log log.Logger
+}
+
+func NewCleanUpService() *CleanUpService {
+	return &CleanUpService{
+		log: log.New("cleanup"),
+	}
+}
+
+func (service *CleanUpService) Run(ctx context.Context) error {
+	service.log.Info("Initializing CleanUpService")
+
+	g, _ := errgroup.WithContext(ctx)
+	g.Go(func() error { return service.start(ctx) })
+
+	err := g.Wait()
+	service.log.Info("Stopped CleanUpService", "reason", err)
+	return err
+}
+
+func (service *CleanUpService) start(ctx context.Context) error {
+	service.cleanUpTmpFiles()
+
+	ticker := time.NewTicker(time.Hour * 1)
+	for {
+		select {
+		case <-ticker.C:
+			service.cleanUpTmpFiles()
+			service.deleteExpiredSnapshots()
+		case <-ctx.Done():
+			return ctx.Err()
+		}
+	}
+}
+
+func (service *CleanUpService) cleanUpTmpFiles() {
+	files, err := ioutil.ReadDir(setting.ImagesDir)
+	if err != nil {
+		service.log.Error("Problem reading image dir", "error", err)
+		return
+	}
+
+	var toDelete []os.FileInfo
+	for _, file := range files {
+		if file.ModTime().AddDate(0, 0, 1).Before(time.Now()) {
+			toDelete = append(toDelete, file)
+		}
+	}
+
+	for _, file := range toDelete {
+		fullPath := path.Join(setting.ImagesDir, file.Name())
+		err := os.Remove(fullPath)
+		if err != nil {
+			service.log.Error("Failed to delete temp file", "file", file.Name(), "error", err)
+		}
+	}
+
+	service.log.Debug("Found old rendered image to delete", "deleted", len(toDelete), "keept", len(files))
+}
+
+func (service *CleanUpService) deleteExpiredSnapshots() {
+	bus.Dispatch(&m.DeleteExpiredSnapshotsCommand{})
+}

+ 5 - 12
pkg/services/sqlstore/dashboard_snapshot.go

@@ -5,7 +5,6 @@ import (
 
 	"github.com/go-xorm/xorm"
 	"github.com/grafana/grafana/pkg/bus"
-	"github.com/grafana/grafana/pkg/log"
 	m "github.com/grafana/grafana/pkg/models"
 	"github.com/grafana/grafana/pkg/setting"
 )
@@ -15,30 +14,24 @@ func init() {
 	bus.AddHandler("sql", GetDashboardSnapshot)
 	bus.AddHandler("sql", DeleteDashboardSnapshot)
 	bus.AddHandler("sql", SearchDashboardSnapshots)
-	bus.AddEventListener(DeleteExpiredSnapshots)
+	bus.AddHandler("sql", DeleteExpiredSnapshots)
 }
 
-func DeleteExpiredSnapshots(cmd *m.HourCommand) error {
+func DeleteExpiredSnapshots(cmd *m.DeleteExpiredSnapshotsCommand) error {
 	return inTransaction(func(sess *xorm.Session) error {
 		var expiredCount int64 = 0
-		var oldCount int64 = 0
 
 		if setting.SnapShotRemoveExpired {
 			deleteExpiredSql := "DELETE FROM dashboard_snapshot WHERE expires < ?"
-			expiredResponse, err := x.Exec(deleteExpiredSql, cmd.Time)
+			expiredResponse, err := x.Exec(deleteExpiredSql, time.Now)
 			if err != nil {
 				return err
 			}
 			expiredCount, _ = expiredResponse.RowsAffected()
 		}
 
-		oldSnapshotsSql := "DELETE FROM dashboard_snapshot WHERE created < ?"
-		oldResponse, err := x.Exec(oldSnapshotsSql, cmd.Time.AddDate(0, 0, setting.SnapShotTTLDays*-1))
-		oldCount, _ = oldResponse.RowsAffected()
-
-		log.Debug2("Deleted old/expired snaphots", "to old", oldCount, "expired", expiredCount)
-
-		return err
+		sqlog.Debug("Deleted old/expired snaphots", "expired", expiredCount)
+		return nil
 	})
 }
 

+ 2 - 6
pkg/setting/setting.go

@@ -120,9 +120,8 @@ var (
 	IsWindows    bool
 
 	// PhantomJs Rendering
-	ImagesDir            string
-	PhantomDir           string
-	RenderedImageTTLDays int
+	ImagesDir  string
+	PhantomDir string
 
 	// for logging purposes
 	configFiles                  []string
@@ -543,9 +542,6 @@ func NewConfigContext(args *CommandLineArgs) error {
 	ImagesDir = filepath.Join(DataPath, "png")
 	PhantomDir = filepath.Join(HomePath, "vendor/phantomjs")
 
-	tmpFilesSection := Cfg.Section("tmp.files")
-	RenderedImageTTLDays = tmpFilesSection.Key("rendered_image_ttl_days").MustInt(14)
-
 	analytics := Cfg.Section("analytics")
 	ReportingEnabled = analytics.Key("reporting_enabled").MustBool(true)
 	CheckForUpdates = analytics.Key("check_for_updates").MustBool(true)

+ 27 - 0
vendor/golang.org/x/sync/LICENSE

@@ -0,0 +1,27 @@
+Copyright (c) 2009 The Go Authors. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+   * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+   * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+   * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

+ 22 - 0
vendor/golang.org/x/sync/PATENTS

@@ -0,0 +1,22 @@
+Additional IP Rights Grant (Patents)
+
+"This implementation" means the copyrightable works distributed by
+Google as part of the Go project.
+
+Google hereby grants to You a perpetual, worldwide, non-exclusive,
+no-charge, royalty-free, irrevocable (except as stated in this section)
+patent license to make, have made, use, offer to sell, sell, import,
+transfer and otherwise run, modify and propagate the contents of this
+implementation of Go, where such license applies only to those patent
+claims, both currently owned or controlled by Google and acquired in
+the future, licensable by Google that are necessarily infringed by this
+implementation of Go.  This grant does not include claims that would be
+infringed only as a consequence of further modification of this
+implementation.  If you or your agent or exclusive licensee institute or
+order or agree to the institution of patent litigation against any
+entity (including a cross-claim or counterclaim in a lawsuit) alleging
+that this implementation of Go or any code incorporated within this
+implementation of Go constitutes direct or contributory patent
+infringement, or inducement of patent infringement, then any patent
+rights granted to you under this License for this implementation of Go
+shall terminate as of the date such litigation is filed.

+ 67 - 0
vendor/golang.org/x/sync/errgroup/errgroup.go

@@ -0,0 +1,67 @@
+// Copyright 2016 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package errgroup provides synchronization, error propagation, and Context
+// cancelation for groups of goroutines working on subtasks of a common task.
+package errgroup
+
+import (
+	"sync"
+
+	"golang.org/x/net/context"
+)
+
+// A Group is a collection of goroutines working on subtasks that are part of
+// the same overall task.
+//
+// A zero Group is valid and does not cancel on error.
+type Group struct {
+	cancel func()
+
+	wg sync.WaitGroup
+
+	errOnce sync.Once
+	err     error
+}
+
+// WithContext returns a new Group and an associated Context derived from ctx.
+//
+// The derived Context is canceled the first time a function passed to Go
+// returns a non-nil error or the first time Wait returns, whichever occurs
+// first.
+func WithContext(ctx context.Context) (*Group, context.Context) {
+	ctx, cancel := context.WithCancel(ctx)
+	return &Group{cancel: cancel}, ctx
+}
+
+// Wait blocks until all function calls from the Go method have returned, then
+// returns the first non-nil error (if any) from them.
+func (g *Group) Wait() error {
+	g.wg.Wait()
+	if g.cancel != nil {
+		g.cancel()
+	}
+	return g.err
+}
+
+// Go calls the given function in a new goroutine.
+//
+// The first call to return a non-nil error cancels the group; its error will be
+// returned by Wait.
+func (g *Group) Go(f func() error) {
+	g.wg.Add(1)
+
+	go func() {
+		defer g.wg.Done()
+
+		if err := f(); err != nil {
+			g.errOnce.Do(func() {
+				g.err = err
+				if g.cancel != nil {
+					g.cancel()
+				}
+			})
+		}
+	}()
+}

+ 6 - 0
vendor/vendor.json

@@ -57,6 +57,12 @@
 			"revisionTime": "2016-09-12T21:59:12Z"
 		},
 		{
+			"checksumSHA1": "S0DP7Pn7sZUmXc55IzZnNvERu6s=",
+			"path": "golang.org/x/sync/errgroup",
+			"revision": "316e794f7b5e3df4e95175a45a5fb8b12f85cb4f",
+			"revisionTime": "2016-07-15T18:54:39Z"
+    },
+    {
 			"checksumSHA1": "PoHLopxwkiXxa3uVhezeq/qJ/Vo=",
 			"path": "gopkg.in/guregu/null.v3",
 			"revision": "41961cea0328defc5f95c1c473f89ebf0d1813f6",