فهرست منبع

Merge remote-tracking branch 'upstream/master' into postgres-query-builder

Sven Klemm 7 سال پیش
والد
کامیت
6766028d7a

+ 0 - 2
docs/sources/guides/whats-new-in-v5.md

@@ -12,8 +12,6 @@ weight = -6
 
 
 # What's New in Grafana v5.0
 # What's New in Grafana v5.0
 
 
-> Out in beta: [Download now!](https://grafana.com/grafana/download/beta)
-
 This is the most substantial update that Grafana has ever seen. This article will detail the major new features and enhancements.
 This is the most substantial update that Grafana has ever seen. This article will detail the major new features and enhancements.
 
 
 - [New Dashboard Layout Engine]({{< relref "#new-dashboard-layout-engine" >}}) enables a much easier drag, drop and resize experience and new types of layouts.
 - [New Dashboard Layout Engine]({{< relref "#new-dashboard-layout-engine" >}}) enables a much easier drag, drop and resize experience and new types of layouts.

+ 24 - 4
pkg/tsdb/mysql/macros.go

@@ -3,6 +3,7 @@ package mysql
 import (
 import (
 	"fmt"
 	"fmt"
 	"regexp"
 	"regexp"
+	"strconv"
 	"strings"
 	"strings"
 	"time"
 	"time"
 
 
@@ -15,19 +16,25 @@ const sExpr = `\$` + rsIdentifier + `\(([^\)]*)\)`
 
 
 type MySqlMacroEngine struct {
 type MySqlMacroEngine struct {
 	TimeRange *tsdb.TimeRange
 	TimeRange *tsdb.TimeRange
+	Query     *tsdb.Query
 }
 }
 
 
 func NewMysqlMacroEngine() tsdb.SqlMacroEngine {
 func NewMysqlMacroEngine() tsdb.SqlMacroEngine {
 	return &MySqlMacroEngine{}
 	return &MySqlMacroEngine{}
 }
 }
 
 
-func (m *MySqlMacroEngine) Interpolate(timeRange *tsdb.TimeRange, sql string) (string, error) {
+func (m *MySqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) {
 	m.TimeRange = timeRange
 	m.TimeRange = timeRange
+	m.Query = query
 	rExp, _ := regexp.Compile(sExpr)
 	rExp, _ := regexp.Compile(sExpr)
 	var macroError error
 	var macroError error
 
 
 	sql = replaceAllStringSubmatchFunc(rExp, sql, func(groups []string) string {
 	sql = replaceAllStringSubmatchFunc(rExp, sql, func(groups []string) string {
-		res, err := m.evaluateMacro(groups[1], strings.Split(groups[2], ","))
+		args := strings.Split(groups[2], ",")
+		for i, arg := range args {
+			args[i] = strings.Trim(arg, " ")
+		}
+		res, err := m.evaluateMacro(groups[1], args)
 		if err != nil && macroError == nil {
 		if err != nil && macroError == nil {
 			macroError = err
 			macroError = err
 			return "macro_error()"
 			return "macro_error()"
@@ -76,13 +83,26 @@ func (m *MySqlMacroEngine) evaluateMacro(name string, args []string) (string, er
 	case "__timeTo":
 	case "__timeTo":
 		return fmt.Sprintf("FROM_UNIXTIME(%d)", uint64(m.TimeRange.GetToAsMsEpoch()/1000)), nil
 		return fmt.Sprintf("FROM_UNIXTIME(%d)", uint64(m.TimeRange.GetToAsMsEpoch()/1000)), nil
 	case "__timeGroup":
 	case "__timeGroup":
-		if len(args) != 2 {
+		if len(args) < 2 {
 			return "", fmt.Errorf("macro %v needs time column and interval", name)
 			return "", fmt.Errorf("macro %v needs time column and interval", name)
 		}
 		}
-		interval, err := time.ParseDuration(strings.Trim(args[1], `'" `))
+		interval, err := time.ParseDuration(strings.Trim(args[1], `'"`))
 		if err != nil {
 		if err != nil {
 			return "", fmt.Errorf("error parsing interval %v", args[1])
 			return "", fmt.Errorf("error parsing interval %v", args[1])
 		}
 		}
+		if len(args) == 3 {
+			m.Query.Model.Set("fill", true)
+			m.Query.Model.Set("fillInterval", interval.Seconds())
+			if args[2] == "NULL" {
+				m.Query.Model.Set("fillNull", true)
+			} else {
+				floatVal, err := strconv.ParseFloat(args[2], 64)
+				if err != nil {
+					return "", fmt.Errorf("error parsing fill value %v", args[2])
+				}
+				m.Query.Model.Set("fillValue", floatVal)
+			}
+		}
 		return fmt.Sprintf("cast(cast(UNIX_TIMESTAMP(%s)/(%.0f) as signed)*%.0f as signed)", args[0], interval.Seconds(), interval.Seconds()), nil
 		return fmt.Sprintf("cast(cast(UNIX_TIMESTAMP(%s)/(%.0f) as signed)*%.0f as signed)", args[0], interval.Seconds(), interval.Seconds()), nil
 	case "__unixEpochFilter":
 	case "__unixEpochFilter":
 		if len(args) == 0 {
 		if len(args) == 0 {

+ 18 - 9
pkg/tsdb/mysql/macros_test.go

@@ -10,31 +10,32 @@ import (
 func TestMacroEngine(t *testing.T) {
 func TestMacroEngine(t *testing.T) {
 	Convey("MacroEngine", t, func() {
 	Convey("MacroEngine", t, func() {
 		engine := &MySqlMacroEngine{}
 		engine := &MySqlMacroEngine{}
+		query := &tsdb.Query{}
 		timeRange := &tsdb.TimeRange{From: "5m", To: "now"}
 		timeRange := &tsdb.TimeRange{From: "5m", To: "now"}
 
 
 		Convey("interpolate __time function", func() {
 		Convey("interpolate __time function", func() {
-			sql, err := engine.Interpolate(nil, "select $__time(time_column)")
+			sql, err := engine.Interpolate(query, timeRange, "select $__time(time_column)")
 			So(err, ShouldBeNil)
 			So(err, ShouldBeNil)
 
 
 			So(sql, ShouldEqual, "select UNIX_TIMESTAMP(time_column) as time_sec")
 			So(sql, ShouldEqual, "select UNIX_TIMESTAMP(time_column) as time_sec")
 		})
 		})
 
 
 		Convey("interpolate __time function wrapped in aggregation", func() {
 		Convey("interpolate __time function wrapped in aggregation", func() {
-			sql, err := engine.Interpolate(nil, "select min($__time(time_column))")
+			sql, err := engine.Interpolate(query, timeRange, "select min($__time(time_column))")
 			So(err, ShouldBeNil)
 			So(err, ShouldBeNil)
 
 
 			So(sql, ShouldEqual, "select min(UNIX_TIMESTAMP(time_column) as time_sec)")
 			So(sql, ShouldEqual, "select min(UNIX_TIMESTAMP(time_column) as time_sec)")
 		})
 		})
 
 
 		Convey("interpolate __timeFilter function", func() {
 		Convey("interpolate __timeFilter function", func() {
-			sql, err := engine.Interpolate(timeRange, "WHERE $__timeFilter(time_column)")
+			sql, err := engine.Interpolate(query, timeRange, "WHERE $__timeFilter(time_column)")
 			So(err, ShouldBeNil)
 			So(err, ShouldBeNil)
 
 
 			So(sql, ShouldEqual, "WHERE time_column >= FROM_UNIXTIME(18446744066914186738) AND time_column <= FROM_UNIXTIME(18446744066914187038)")
 			So(sql, ShouldEqual, "WHERE time_column >= FROM_UNIXTIME(18446744066914186738) AND time_column <= FROM_UNIXTIME(18446744066914187038)")
 		})
 		})
 
 
 		Convey("interpolate __timeFrom function", func() {
 		Convey("interpolate __timeFrom function", func() {
-			sql, err := engine.Interpolate(timeRange, "select $__timeFrom(time_column)")
+			sql, err := engine.Interpolate(query, timeRange, "select $__timeFrom(time_column)")
 			So(err, ShouldBeNil)
 			So(err, ShouldBeNil)
 
 
 			So(sql, ShouldEqual, "select FROM_UNIXTIME(18446744066914186738)")
 			So(sql, ShouldEqual, "select FROM_UNIXTIME(18446744066914186738)")
@@ -42,35 +43,43 @@ func TestMacroEngine(t *testing.T) {
 
 
 		Convey("interpolate __timeGroup function", func() {
 		Convey("interpolate __timeGroup function", func() {
 
 
-			sql, err := engine.Interpolate(timeRange, "GROUP BY $__timeGroup(time_column,'5m')")
+			sql, err := engine.Interpolate(query, timeRange, "GROUP BY $__timeGroup(time_column,'5m')")
+			So(err, ShouldBeNil)
+
+			So(sql, ShouldEqual, "GROUP BY cast(cast(UNIX_TIMESTAMP(time_column)/(300) as signed)*300 as signed)")
+		})
+
+		Convey("interpolate __timeGroup function with spaces around arguments", func() {
+
+			sql, err := engine.Interpolate(query, timeRange, "GROUP BY $__timeGroup(time_column , '5m')")
 			So(err, ShouldBeNil)
 			So(err, ShouldBeNil)
 
 
 			So(sql, ShouldEqual, "GROUP BY cast(cast(UNIX_TIMESTAMP(time_column)/(300) as signed)*300 as signed)")
 			So(sql, ShouldEqual, "GROUP BY cast(cast(UNIX_TIMESTAMP(time_column)/(300) as signed)*300 as signed)")
 		})
 		})
 
 
 		Convey("interpolate __timeTo function", func() {
 		Convey("interpolate __timeTo function", func() {
-			sql, err := engine.Interpolate(timeRange, "select $__timeTo(time_column)")
+			sql, err := engine.Interpolate(query, timeRange, "select $__timeTo(time_column)")
 			So(err, ShouldBeNil)
 			So(err, ShouldBeNil)
 
 
 			So(sql, ShouldEqual, "select FROM_UNIXTIME(18446744066914187038)")
 			So(sql, ShouldEqual, "select FROM_UNIXTIME(18446744066914187038)")
 		})
 		})
 
 
 		Convey("interpolate __unixEpochFilter function", func() {
 		Convey("interpolate __unixEpochFilter function", func() {
-			sql, err := engine.Interpolate(timeRange, "select $__unixEpochFilter(18446744066914186738)")
+			sql, err := engine.Interpolate(query, timeRange, "select $__unixEpochFilter(18446744066914186738)")
 			So(err, ShouldBeNil)
 			So(err, ShouldBeNil)
 
 
 			So(sql, ShouldEqual, "select 18446744066914186738 >= 18446744066914186738 AND 18446744066914186738 <= 18446744066914187038")
 			So(sql, ShouldEqual, "select 18446744066914186738 >= 18446744066914186738 AND 18446744066914186738 <= 18446744066914187038")
 		})
 		})
 
 
 		Convey("interpolate __unixEpochFrom function", func() {
 		Convey("interpolate __unixEpochFrom function", func() {
-			sql, err := engine.Interpolate(timeRange, "select $__unixEpochFrom()")
+			sql, err := engine.Interpolate(query, timeRange, "select $__unixEpochFrom()")
 			So(err, ShouldBeNil)
 			So(err, ShouldBeNil)
 
 
 			So(sql, ShouldEqual, "select 18446744066914186738")
 			So(sql, ShouldEqual, "select 18446744066914186738")
 		})
 		})
 
 
 		Convey("interpolate __unixEpochTo function", func() {
 		Convey("interpolate __unixEpochTo function", func() {
-			sql, err := engine.Interpolate(timeRange, "select $__unixEpochTo()")
+			sql, err := engine.Interpolate(query, timeRange, "select $__unixEpochTo()")
 			So(err, ShouldBeNil)
 			So(err, ShouldBeNil)
 
 
 			So(sql, ShouldEqual, "select 18446744066914187038")
 			So(sql, ShouldEqual, "select 18446744066914187038")

+ 51 - 7
pkg/tsdb/mysql/mysql.go

@@ -5,6 +5,7 @@ import (
 	"context"
 	"context"
 	"database/sql"
 	"database/sql"
 	"fmt"
 	"fmt"
+	"math"
 	"reflect"
 	"reflect"
 	"strconv"
 	"strconv"
 	"time"
 	"time"
@@ -56,7 +57,7 @@ func (e *MysqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSourc
 	return e.sqlEngine.Query(ctx, dsInfo, tsdbQuery, e.transformToTimeSeries, e.transformToTable)
 	return e.sqlEngine.Query(ctx, dsInfo, tsdbQuery, e.transformToTimeSeries, e.transformToTable)
 }
 }
 
 
-func (e MysqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult) error {
+func (e MysqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
 	columnNames, err := rows.Columns()
 	columnNames, err := rows.Columns()
 	columnCount := len(columnNames)
 	columnCount := len(columnNames)
 
 
@@ -175,7 +176,7 @@ func (e MysqlQueryEndpoint) getTypedRowData(rows *core.Rows) (tsdb.RowValues, er
 	return values, nil
 	return values, nil
 }
 }
 
 
-func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult) error {
+func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
 	pointsBySeries := make(map[string]*tsdb.TimeSeries)
 	pointsBySeries := make(map[string]*tsdb.TimeSeries)
 	seriesByQueryOrder := list.New()
 	seriesByQueryOrder := list.New()
 
 
@@ -188,6 +189,18 @@ func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.
 	rowLimit := 1000000
 	rowLimit := 1000000
 	rowCount := 0
 	rowCount := 0
 
 
+	fillMissing := query.Model.Get("fill").MustBool(false)
+	var fillInterval float64
+	fillValue := null.Float{}
+	if fillMissing {
+		fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000
+		if query.Model.Get("fillNull").MustBool(false) == false {
+			fillValue.Float64 = query.Model.Get("fillValue").MustFloat64()
+			fillValue.Valid = true
+		}
+
+	}
+
 	for ; rows.Next(); rowCount++ {
 	for ; rows.Next(); rowCount++ {
 		if rowCount > rowLimit {
 		if rowCount > rowLimit {
 			return fmt.Errorf("MySQL query row limit exceeded, limit %d", rowLimit)
 			return fmt.Errorf("MySQL query row limit exceeded, limit %d", rowLimit)
@@ -207,19 +220,50 @@ func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.
 			return fmt.Errorf("Found row with no time value")
 			return fmt.Errorf("Found row with no time value")
 		}
 		}
 
 
-		if series, exist := pointsBySeries[rowData.metric]; exist {
-			series.Points = append(series.Points, tsdb.TimePoint{rowData.value, rowData.time})
-		} else {
-			series := &tsdb.TimeSeries{Name: rowData.metric}
-			series.Points = append(series.Points, tsdb.TimePoint{rowData.value, rowData.time})
+		series, exist := pointsBySeries[rowData.metric]
+		if exist == false {
+			series = &tsdb.TimeSeries{Name: rowData.metric}
 			pointsBySeries[rowData.metric] = series
 			pointsBySeries[rowData.metric] = series
 			seriesByQueryOrder.PushBack(rowData.metric)
 			seriesByQueryOrder.PushBack(rowData.metric)
 		}
 		}
+
+		if fillMissing {
+			var intervalStart float64
+			if exist == false {
+				intervalStart = float64(tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6)
+			} else {
+				intervalStart = series.Points[len(series.Points)-1][1].Float64 + fillInterval
+			}
+
+			// align interval start
+			intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
+
+			for i := intervalStart; i < rowData.time.Float64; i += fillInterval {
+				series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
+				rowCount++
+			}
+		}
+
+		series.Points = append(series.Points, tsdb.TimePoint{rowData.value, rowData.time})
 	}
 	}
 
 
 	for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() {
 	for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() {
 		key := elem.Value.(string)
 		key := elem.Value.(string)
 		result.Series = append(result.Series, pointsBySeries[key])
 		result.Series = append(result.Series, pointsBySeries[key])
+
+		if fillMissing {
+			series := pointsBySeries[key]
+			// fill in values from last fetched value till interval end
+			intervalStart := series.Points[len(series.Points)-1][1].Float64
+			intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6)
+
+			// align interval start
+			intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
+			for i := intervalStart + fillInterval; i < intervalEnd; i += fillInterval {
+				series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
+				rowCount++
+			}
+		}
 	}
 	}
 
 
 	result.Meta.Set("rowCount", rowCount)
 	result.Meta.Set("rowCount", rowCount)

+ 25 - 5
pkg/tsdb/postgres/macros.go

@@ -3,6 +3,7 @@ package postgres
 import (
 import (
 	"fmt"
 	"fmt"
 	"regexp"
 	"regexp"
+	"strconv"
 	"strings"
 	"strings"
 	"time"
 	"time"
 
 
@@ -15,19 +16,25 @@ const sExpr = `\$` + rsIdentifier + `\(([^\)]*)\)`
 
 
 type PostgresMacroEngine struct {
 type PostgresMacroEngine struct {
 	TimeRange *tsdb.TimeRange
 	TimeRange *tsdb.TimeRange
+	Query     *tsdb.Query
 }
 }
 
 
 func NewPostgresMacroEngine() tsdb.SqlMacroEngine {
 func NewPostgresMacroEngine() tsdb.SqlMacroEngine {
 	return &PostgresMacroEngine{}
 	return &PostgresMacroEngine{}
 }
 }
 
 
-func (m *PostgresMacroEngine) Interpolate(timeRange *tsdb.TimeRange, sql string) (string, error) {
+func (m *PostgresMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) {
 	m.TimeRange = timeRange
 	m.TimeRange = timeRange
+	m.Query = query
 	rExp, _ := regexp.Compile(sExpr)
 	rExp, _ := regexp.Compile(sExpr)
 	var macroError error
 	var macroError error
 
 
 	sql = replaceAllStringSubmatchFunc(rExp, sql, func(groups []string) string {
 	sql = replaceAllStringSubmatchFunc(rExp, sql, func(groups []string) string {
-		res, err := m.evaluateMacro(groups[1], strings.Split(groups[2], ","))
+		args := strings.Split(groups[2], ",")
+		for i, arg := range args {
+			args[i] = strings.Trim(arg, " ")
+		}
+		res, err := m.evaluateMacro(groups[1], args)
 		if err != nil && macroError == nil {
 		if err != nil && macroError == nil {
 			macroError = err
 			macroError = err
 			return "macro_error()"
 			return "macro_error()"
@@ -82,13 +89,26 @@ func (m *PostgresMacroEngine) evaluateMacro(name string, args []string) (string,
 	case "__timeTo":
 	case "__timeTo":
 		return fmt.Sprintf("to_timestamp(%d)", uint64(m.TimeRange.GetToAsMsEpoch()/1000)), nil
 		return fmt.Sprintf("to_timestamp(%d)", uint64(m.TimeRange.GetToAsMsEpoch()/1000)), nil
 	case "__timeGroup":
 	case "__timeGroup":
-		if len(args) != 2 {
-			return "", fmt.Errorf("macro %v needs time column and interval", name)
+		if len(args) < 2 {
+			return "", fmt.Errorf("macro %v needs time column and interval and optional fill value", name)
 		}
 		}
-		interval, err := time.ParseDuration(strings.Trim(args[1], `' `))
+		interval, err := time.ParseDuration(strings.Trim(args[1], `'`))
 		if err != nil {
 		if err != nil {
 			return "", fmt.Errorf("error parsing interval %v", args[1])
 			return "", fmt.Errorf("error parsing interval %v", args[1])
 		}
 		}
+		if len(args) == 3 {
+			m.Query.Model.Set("fill", true)
+			m.Query.Model.Set("fillInterval", interval.Seconds())
+			if args[2] == "NULL" {
+				m.Query.Model.Set("fillNull", true)
+			} else {
+				floatVal, err := strconv.ParseFloat(args[2], 64)
+				if err != nil {
+					return "", fmt.Errorf("error parsing fill value %v", args[2])
+				}
+				m.Query.Model.Set("fillValue", floatVal)
+			}
+		}
 		return fmt.Sprintf("(extract(epoch from %s)/%v)::bigint*%v AS time", args[0], interval.Seconds(), interval.Seconds()), nil
 		return fmt.Sprintf("(extract(epoch from %s)/%v)::bigint*%v AS time", args[0], interval.Seconds(), interval.Seconds()), nil
 	case "__unixEpochFilter":
 	case "__unixEpochFilter":
 		if len(args) == 0 {
 		if len(args) == 0 {

+ 18 - 9
pkg/tsdb/postgres/macros_test.go

@@ -10,31 +10,32 @@ import (
 func TestMacroEngine(t *testing.T) {
 func TestMacroEngine(t *testing.T) {
 	Convey("MacroEngine", t, func() {
 	Convey("MacroEngine", t, func() {
 		engine := &PostgresMacroEngine{}
 		engine := &PostgresMacroEngine{}
+		query := &tsdb.Query{}
 		timeRange := &tsdb.TimeRange{From: "5m", To: "now"}
 		timeRange := &tsdb.TimeRange{From: "5m", To: "now"}
 
 
 		Convey("interpolate __time function", func() {
 		Convey("interpolate __time function", func() {
-			sql, err := engine.Interpolate(nil, "select $__time(time_column)")
+			sql, err := engine.Interpolate(query, timeRange, "select $__time(time_column)")
 			So(err, ShouldBeNil)
 			So(err, ShouldBeNil)
 
 
 			So(sql, ShouldEqual, "select time_column AS \"time\"")
 			So(sql, ShouldEqual, "select time_column AS \"time\"")
 		})
 		})
 
 
 		Convey("interpolate __time function wrapped in aggregation", func() {
 		Convey("interpolate __time function wrapped in aggregation", func() {
-			sql, err := engine.Interpolate(nil, "select min($__time(time_column))")
+			sql, err := engine.Interpolate(query, timeRange, "select min($__time(time_column))")
 			So(err, ShouldBeNil)
 			So(err, ShouldBeNil)
 
 
 			So(sql, ShouldEqual, "select min(time_column AS \"time\")")
 			So(sql, ShouldEqual, "select min(time_column AS \"time\")")
 		})
 		})
 
 
 		Convey("interpolate __timeFilter function", func() {
 		Convey("interpolate __timeFilter function", func() {
-			sql, err := engine.Interpolate(timeRange, "WHERE $__timeFilter(time_column)")
+			sql, err := engine.Interpolate(query, timeRange, "WHERE $__timeFilter(time_column)")
 			So(err, ShouldBeNil)
 			So(err, ShouldBeNil)
 
 
 			So(sql, ShouldEqual, "WHERE extract(epoch from time_column) BETWEEN 18446744066914186738 AND 18446744066914187038")
 			So(sql, ShouldEqual, "WHERE extract(epoch from time_column) BETWEEN 18446744066914186738 AND 18446744066914187038")
 		})
 		})
 
 
 		Convey("interpolate __timeFrom function", func() {
 		Convey("interpolate __timeFrom function", func() {
-			sql, err := engine.Interpolate(timeRange, "select $__timeFrom(time_column)")
+			sql, err := engine.Interpolate(query, timeRange, "select $__timeFrom(time_column)")
 			So(err, ShouldBeNil)
 			So(err, ShouldBeNil)
 
 
 			So(sql, ShouldEqual, "select to_timestamp(18446744066914186738)")
 			So(sql, ShouldEqual, "select to_timestamp(18446744066914186738)")
@@ -42,35 +43,43 @@ func TestMacroEngine(t *testing.T) {
 
 
 		Convey("interpolate __timeGroup function", func() {
 		Convey("interpolate __timeGroup function", func() {
 
 
-			sql, err := engine.Interpolate(timeRange, "GROUP BY $__timeGroup(time_column,'5m')")
+			sql, err := engine.Interpolate(query, timeRange, "GROUP BY $__timeGroup(time_column,'5m')")
+			So(err, ShouldBeNil)
+
+			So(sql, ShouldEqual, "GROUP BY (extract(epoch from time_column)/300)::bigint*300 AS time")
+		})
+
+		Convey("interpolate __timeGroup function with spaces between args", func() {
+
+			sql, err := engine.Interpolate(query, timeRange, "GROUP BY $__timeGroup(time_column , '5m')")
 			So(err, ShouldBeNil)
 			So(err, ShouldBeNil)
 
 
 			So(sql, ShouldEqual, "GROUP BY (extract(epoch from time_column)/300)::bigint*300 AS time")
 			So(sql, ShouldEqual, "GROUP BY (extract(epoch from time_column)/300)::bigint*300 AS time")
 		})
 		})
 
 
 		Convey("interpolate __timeTo function", func() {
 		Convey("interpolate __timeTo function", func() {
-			sql, err := engine.Interpolate(timeRange, "select $__timeTo(time_column)")
+			sql, err := engine.Interpolate(query, timeRange, "select $__timeTo(time_column)")
 			So(err, ShouldBeNil)
 			So(err, ShouldBeNil)
 
 
 			So(sql, ShouldEqual, "select to_timestamp(18446744066914187038)")
 			So(sql, ShouldEqual, "select to_timestamp(18446744066914187038)")
 		})
 		})
 
 
 		Convey("interpolate __unixEpochFilter function", func() {
 		Convey("interpolate __unixEpochFilter function", func() {
-			sql, err := engine.Interpolate(timeRange, "select $__unixEpochFilter(18446744066914186738)")
+			sql, err := engine.Interpolate(query, timeRange, "select $__unixEpochFilter(18446744066914186738)")
 			So(err, ShouldBeNil)
 			So(err, ShouldBeNil)
 
 
 			So(sql, ShouldEqual, "select 18446744066914186738 >= 18446744066914186738 AND 18446744066914186738 <= 18446744066914187038")
 			So(sql, ShouldEqual, "select 18446744066914186738 >= 18446744066914186738 AND 18446744066914186738 <= 18446744066914187038")
 		})
 		})
 
 
 		Convey("interpolate __unixEpochFrom function", func() {
 		Convey("interpolate __unixEpochFrom function", func() {
-			sql, err := engine.Interpolate(timeRange, "select $__unixEpochFrom()")
+			sql, err := engine.Interpolate(query, timeRange, "select $__unixEpochFrom()")
 			So(err, ShouldBeNil)
 			So(err, ShouldBeNil)
 
 
 			So(sql, ShouldEqual, "select 18446744066914186738")
 			So(sql, ShouldEqual, "select 18446744066914186738")
 		})
 		})
 
 
 		Convey("interpolate __unixEpochTo function", func() {
 		Convey("interpolate __unixEpochTo function", func() {
-			sql, err := engine.Interpolate(timeRange, "select $__unixEpochTo()")
+			sql, err := engine.Interpolate(query, timeRange, "select $__unixEpochTo()")
 			So(err, ShouldBeNil)
 			So(err, ShouldBeNil)
 
 
 			So(sql, ShouldEqual, "select 18446744066914187038")
 			So(sql, ShouldEqual, "select 18446744066914187038")

+ 57 - 15
pkg/tsdb/postgres/postgres.go

@@ -4,6 +4,7 @@ import (
 	"container/list"
 	"container/list"
 	"context"
 	"context"
 	"fmt"
 	"fmt"
+	"math"
 	"net/url"
 	"net/url"
 	"strconv"
 	"strconv"
 	"time"
 	"time"
@@ -60,7 +61,7 @@ func (e *PostgresQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSo
 	return e.sqlEngine.Query(ctx, dsInfo, tsdbQuery, e.transformToTimeSeries, e.transformToTable)
 	return e.sqlEngine.Query(ctx, dsInfo, tsdbQuery, e.transformToTimeSeries, e.transformToTable)
 }
 }
 
 
-func (e PostgresQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult) error {
+func (e PostgresQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
 
 
 	columnNames, err := rows.Columns()
 	columnNames, err := rows.Columns()
 	if err != nil {
 	if err != nil {
@@ -157,7 +158,7 @@ func (e PostgresQueryEndpoint) getTypedRowData(rows *core.Rows) (tsdb.RowValues,
 	return values, nil
 	return values, nil
 }
 }
 
 
-func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult) error {
+func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
 	pointsBySeries := make(map[string]*tsdb.TimeSeries)
 	pointsBySeries := make(map[string]*tsdb.TimeSeries)
 	seriesByQueryOrder := list.New()
 	seriesByQueryOrder := list.New()
 
 
@@ -198,6 +199,18 @@ func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *co
 		return fmt.Errorf("Found no column named time")
 		return fmt.Errorf("Found no column named time")
 	}
 	}
 
 
+	fillMissing := query.Model.Get("fill").MustBool(false)
+	var fillInterval float64
+	fillValue := null.Float{}
+	if fillMissing {
+		fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000
+		if query.Model.Get("fillNull").MustBool(false) == false {
+			fillValue.Float64 = query.Model.Get("fillValue").MustFloat64()
+			fillValue.Valid = true
+		}
+
+	}
+
 	for rows.Next() {
 	for rows.Next() {
 		var timestamp float64
 		var timestamp float64
 		var value null.Float
 		var value null.Float
@@ -249,7 +262,34 @@ func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *co
 			if metricIndex == -1 {
 			if metricIndex == -1 {
 				metric = col
 				metric = col
 			}
 			}
-			e.appendTimePoint(pointsBySeries, seriesByQueryOrder, metric, timestamp, value)
+
+			series, exist := pointsBySeries[metric]
+			if exist == false {
+				series = &tsdb.TimeSeries{Name: metric}
+				pointsBySeries[metric] = series
+				seriesByQueryOrder.PushBack(metric)
+			}
+
+			if fillMissing {
+				var intervalStart float64
+				if exist == false {
+					intervalStart = float64(tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6)
+				} else {
+					intervalStart = series.Points[len(series.Points)-1][1].Float64 + fillInterval
+				}
+
+				// align interval start
+				intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
+
+				for i := intervalStart; i < timestamp; i += fillInterval {
+					series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
+					rowCount++
+				}
+			}
+
+			series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)})
+
+			e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value)
 			rowCount++
 			rowCount++
 
 
 		}
 		}
@@ -258,20 +298,22 @@ func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *co
 	for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() {
 	for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() {
 		key := elem.Value.(string)
 		key := elem.Value.(string)
 		result.Series = append(result.Series, pointsBySeries[key])
 		result.Series = append(result.Series, pointsBySeries[key])
+
+		if fillMissing {
+			series := pointsBySeries[key]
+			// fill in values from last fetched value till interval end
+			intervalStart := series.Points[len(series.Points)-1][1].Float64
+			intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6)
+
+			// align interval start
+			intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
+			for i := intervalStart + fillInterval; i < intervalEnd; i += fillInterval {
+				series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
+				rowCount++
+			}
+		}
 	}
 	}
 
 
 	result.Meta.Set("rowCount", rowCount)
 	result.Meta.Set("rowCount", rowCount)
 	return nil
 	return nil
 }
 }
-
-func (e PostgresQueryEndpoint) appendTimePoint(pointsBySeries map[string]*tsdb.TimeSeries, seriesByQueryOrder *list.List, metric string, timestamp float64, value null.Float) {
-	if series, exist := pointsBySeries[metric]; exist {
-		series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)})
-	} else {
-		series := &tsdb.TimeSeries{Name: metric}
-		series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)})
-		pointsBySeries[metric] = series
-		seriesByQueryOrder.PushBack(metric)
-	}
-	e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value)
-}

+ 10 - 10
pkg/tsdb/sql_engine.go

@@ -17,15 +17,15 @@ type SqlEngine interface {
 		ctx context.Context,
 		ctx context.Context,
 		ds *models.DataSource,
 		ds *models.DataSource,
 		query *TsdbQuery,
 		query *TsdbQuery,
-		transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult) error,
-		transformToTable func(query *Query, rows *core.Rows, result *QueryResult) error,
+		transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
+		transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
 	) (*Response, error)
 	) (*Response, error)
 }
 }
 
 
-// SqlMacroEngine interpolates macros into sql. It takes in the timeRange to be able to
-// generate queries that use from and to.
+// SqlMacroEngine interpolates macros into sql. It takes in the Query to have access to query context and
+// timeRange to be able to generate queries that use from and to.
 type SqlMacroEngine interface {
 type SqlMacroEngine interface {
-	Interpolate(timeRange *TimeRange, sql string) (string, error)
+	Interpolate(query *Query, timeRange *TimeRange, sql string) (string, error)
 }
 }
 
 
 type DefaultSqlEngine struct {
 type DefaultSqlEngine struct {
@@ -77,8 +77,8 @@ func (e *DefaultSqlEngine) Query(
 	ctx context.Context,
 	ctx context.Context,
 	dsInfo *models.DataSource,
 	dsInfo *models.DataSource,
 	tsdbQuery *TsdbQuery,
 	tsdbQuery *TsdbQuery,
-	transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult) error,
-	transformToTable func(query *Query, rows *core.Rows, result *QueryResult) error,
+	transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
+	transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
 ) (*Response, error) {
 ) (*Response, error) {
 	result := &Response{
 	result := &Response{
 		Results: make(map[string]*QueryResult),
 		Results: make(map[string]*QueryResult),
@@ -97,7 +97,7 @@ func (e *DefaultSqlEngine) Query(
 		queryResult := &QueryResult{Meta: simplejson.New(), RefId: query.RefId}
 		queryResult := &QueryResult{Meta: simplejson.New(), RefId: query.RefId}
 		result.Results[query.RefId] = queryResult
 		result.Results[query.RefId] = queryResult
 
 
-		rawSql, err := e.MacroEngine.Interpolate(tsdbQuery.TimeRange, rawSql)
+		rawSql, err := e.MacroEngine.Interpolate(query, tsdbQuery.TimeRange, rawSql)
 		if err != nil {
 		if err != nil {
 			queryResult.Error = err
 			queryResult.Error = err
 			continue
 			continue
@@ -117,13 +117,13 @@ func (e *DefaultSqlEngine) Query(
 
 
 		switch format {
 		switch format {
 		case "time_series":
 		case "time_series":
-			err := transformToTimeSeries(query, rows, queryResult)
+			err := transformToTimeSeries(query, rows, queryResult, tsdbQuery)
 			if err != nil {
 			if err != nil {
 				queryResult.Error = err
 				queryResult.Error = err
 				continue
 				continue
 			}
 			}
 		case "table":
 		case "table":
-			err := transformToTable(query, rows, queryResult)
+			err := transformToTable(query, rows, queryResult, tsdbQuery)
 			if err != nil {
 			if err != nil {
 				queryResult.Error = err
 				queryResult.Error = err
 				continue
 				continue

+ 6 - 14
public/app/plugins/datasource/prometheus/dashboards/prometheus_2_stats.json

@@ -348,7 +348,7 @@
           "tableColumn": "",
           "tableColumn": "",
           "targets": [
           "targets": [
             {
             {
-              "expr": "tsdb_wal_corruptions_total{job=\"prometheus\"}",
+              "expr": "prometheus_tsdb_wal_corruptions_total{job=\"prometheus\"}",
               "format": "time_series",
               "format": "time_series",
               "intervalFactor": 2,
               "intervalFactor": 2,
               "legendFormat": "",
               "legendFormat": "",
@@ -1048,7 +1048,7 @@
           "steppedLine": false,
           "steppedLine": false,
           "targets": [
           "targets": [
             {
             {
-              "expr": "max(prometheus_evaluator_duration_seconds{job=\"prometheus\", quantile!=\"0.01\", quantile!=\"0.05\"}) by (quantile)",
+              "expr": "max(prometheus_rule_group_duration_seconds{job=\"prometheus\"}) by (quantile)",
               "format": "time_series",
               "format": "time_series",
               "interval": "",
               "interval": "",
               "intervalFactor": 2,
               "intervalFactor": 2,
@@ -1060,7 +1060,7 @@
           "thresholds": [],
           "thresholds": [],
           "timeFrom": null,
           "timeFrom": null,
           "timeShift": null,
           "timeShift": null,
-          "title": "Rule Eval Duration",
+          "title": "Rule Group Eval Duration",
           "tooltip": {
           "tooltip": {
             "shared": true,
             "shared": true,
             "sort": 0,
             "sort": 0,
@@ -1124,7 +1124,7 @@
           "steppedLine": false,
           "steppedLine": false,
           "targets": [
           "targets": [
             {
             {
-              "expr": "rate(prometheus_evaluator_iterations_missed_total{job=\"prometheus\"}[5m])",
+              "expr": "rate(prometheus_rule_group_iterations_missed_total{job=\"prometheus\"}[5m])",
               "format": "time_series",
               "format": "time_series",
               "intervalFactor": 2,
               "intervalFactor": 2,
               "legendFormat": "missed",
               "legendFormat": "missed",
@@ -1132,15 +1132,7 @@
               "step": 10
               "step": 10
             },
             },
             {
             {
-              "expr": "rate(prometheus_evaluator_iterations_skipped_total{job=\"prometheus\"}[5m])",
-              "format": "time_series",
-              "intervalFactor": 2,
-              "legendFormat": "skipped",
-              "refId": "C",
-              "step": 10
-            },
-            {
-              "expr": "rate(prometheus_evaluator_iterations_total{job=\"prometheus\"}[5m])",
+              "expr": "rate(prometheus_rule_group_iterations_total{job=\"prometheus\"}[5m])",
               "format": "time_series",
               "format": "time_series",
               "intervalFactor": 2,
               "intervalFactor": 2,
               "legendFormat": "iterations",
               "legendFormat": "iterations",
@@ -1151,7 +1143,7 @@
           "thresholds": [],
           "thresholds": [],
           "timeFrom": null,
           "timeFrom": null,
           "timeShift": null,
           "timeShift": null,
-          "title": "Rule Eval Activity",
+          "title": "Rule Group Eval Activity",
           "tooltip": {
           "tooltip": {
             "shared": true,
             "shared": true,
             "sort": 0,
             "sort": 0,