|
|
@@ -1,11 +1,17 @@
|
|
|
package tsdb
|
|
|
|
|
|
import (
|
|
|
+ "container/list"
|
|
|
"context"
|
|
|
+ "database/sql"
|
|
|
"fmt"
|
|
|
+ "math"
|
|
|
+ "strings"
|
|
|
"sync"
|
|
|
"time"
|
|
|
|
|
|
+ "github.com/grafana/grafana/pkg/log"
|
|
|
+
|
|
|
"github.com/grafana/grafana/pkg/components/null"
|
|
|
|
|
|
"github.com/go-xorm/core"
|
|
|
@@ -14,27 +20,15 @@ import (
|
|
|
"github.com/grafana/grafana/pkg/models"
|
|
|
)
|
|
|
|
|
|
-// SqlEngine is a wrapper class around xorm for relational database data sources.
|
|
|
-type SqlEngine interface {
|
|
|
- InitEngine(driverName string, dsInfo *models.DataSource, cnnstr string) error
|
|
|
- Query(
|
|
|
- ctx context.Context,
|
|
|
- ds *models.DataSource,
|
|
|
- query *TsdbQuery,
|
|
|
- transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
|
|
|
- transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
|
|
|
- ) (*Response, error)
|
|
|
-}
|
|
|
-
|
|
|
// SqlMacroEngine interpolates macros into sql. It takes in the Query to have access to query context and
|
|
|
// timeRange to be able to generate queries that use from and to.
|
|
|
type SqlMacroEngine interface {
|
|
|
Interpolate(query *Query, timeRange *TimeRange, sql string) (string, error)
|
|
|
}
|
|
|
|
|
|
-type DefaultSqlEngine struct {
|
|
|
- MacroEngine SqlMacroEngine
|
|
|
- XormEngine *xorm.Engine
|
|
|
+// SqlTableRowTransformer transforms a query result row to RowValues with proper types.
|
|
|
+type SqlTableRowTransformer interface {
|
|
|
+ Transform(columnTypes []*sql.ColumnType, rows *core.Rows) (RowValues, error)
|
|
|
}
|
|
|
|
|
|
type engineCacheType struct {
|
|
|
@@ -48,69 +42,94 @@ var engineCache = engineCacheType{
|
|
|
versions: make(map[int64]int),
|
|
|
}
|
|
|
|
|
|
-// InitEngine creates the db connection and inits the xorm engine or loads it from the engine cache
|
|
|
-func (e *DefaultSqlEngine) InitEngine(driverName string, dsInfo *models.DataSource, cnnstr string) error {
|
|
|
+var NewXormEngine = func(driverName string, connectionString string) (*xorm.Engine, error) {
|
|
|
+ return xorm.NewEngine(driverName, connectionString)
|
|
|
+}
|
|
|
+
|
|
|
+type sqlQueryEndpoint struct {
|
|
|
+ macroEngine SqlMacroEngine
|
|
|
+ rowTransformer SqlTableRowTransformer
|
|
|
+ engine *xorm.Engine
|
|
|
+ timeColumnNames []string
|
|
|
+ metricColumnTypes []string
|
|
|
+ log log.Logger
|
|
|
+}
|
|
|
+
|
|
|
+type SqlQueryEndpointConfiguration struct {
|
|
|
+ DriverName string
|
|
|
+ Datasource *models.DataSource
|
|
|
+ ConnectionString string
|
|
|
+ TimeColumnNames []string
|
|
|
+ MetricColumnTypes []string
|
|
|
+}
|
|
|
+
|
|
|
+var NewSqlQueryEndpoint = func(config *SqlQueryEndpointConfiguration, rowTransformer SqlTableRowTransformer, macroEngine SqlMacroEngine, log log.Logger) (TsdbQueryEndpoint, error) {
|
|
|
+ queryEndpoint := sqlQueryEndpoint{
|
|
|
+ rowTransformer: rowTransformer,
|
|
|
+ macroEngine: macroEngine,
|
|
|
+ timeColumnNames: []string{"time"},
|
|
|
+ log: log,
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(config.TimeColumnNames) > 0 {
|
|
|
+ queryEndpoint.timeColumnNames = config.TimeColumnNames
|
|
|
+ }
|
|
|
+
|
|
|
engineCache.Lock()
|
|
|
defer engineCache.Unlock()
|
|
|
|
|
|
- if engine, present := engineCache.cache[dsInfo.Id]; present {
|
|
|
- if version := engineCache.versions[dsInfo.Id]; version == dsInfo.Version {
|
|
|
- e.XormEngine = engine
|
|
|
- return nil
|
|
|
+ if engine, present := engineCache.cache[config.Datasource.Id]; present {
|
|
|
+ if version := engineCache.versions[config.Datasource.Id]; version == config.Datasource.Version {
|
|
|
+ queryEndpoint.engine = engine
|
|
|
+ return &queryEndpoint, nil
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- engine, err := xorm.NewEngine(driverName, cnnstr)
|
|
|
+ engine, err := NewXormEngine(config.DriverName, config.ConnectionString)
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ return nil, err
|
|
|
}
|
|
|
|
|
|
engine.SetMaxOpenConns(10)
|
|
|
engine.SetMaxIdleConns(10)
|
|
|
|
|
|
- engineCache.versions[dsInfo.Id] = dsInfo.Version
|
|
|
- engineCache.cache[dsInfo.Id] = engine
|
|
|
- e.XormEngine = engine
|
|
|
+ engineCache.versions[config.Datasource.Id] = config.Datasource.Version
|
|
|
+ engineCache.cache[config.Datasource.Id] = engine
|
|
|
+ queryEndpoint.engine = engine
|
|
|
|
|
|
- return nil
|
|
|
+ return &queryEndpoint, nil
|
|
|
}
|
|
|
|
|
|
-// Query is a default implementation of the Query method for an SQL data source.
|
|
|
-// The caller of this function must implement transformToTimeSeries and transformToTable and
|
|
|
-// pass them in as parameters.
|
|
|
-func (e *DefaultSqlEngine) Query(
|
|
|
- ctx context.Context,
|
|
|
- dsInfo *models.DataSource,
|
|
|
- tsdbQuery *TsdbQuery,
|
|
|
- transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
|
|
|
- transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
|
|
|
-) (*Response, error) {
|
|
|
+const rowLimit = 1000000
|
|
|
+
|
|
|
+// Query is the main function for the SqlQueryEndpoint
|
|
|
+func (e *sqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *TsdbQuery) (*Response, error) {
|
|
|
result := &Response{
|
|
|
Results: make(map[string]*QueryResult),
|
|
|
}
|
|
|
|
|
|
- session := e.XormEngine.NewSession()
|
|
|
+ session := e.engine.NewSession()
|
|
|
defer session.Close()
|
|
|
db := session.DB()
|
|
|
|
|
|
for _, query := range tsdbQuery.Queries {
|
|
|
- rawSql := query.Model.Get("rawSql").MustString()
|
|
|
- if rawSql == "" {
|
|
|
+ rawSQL := query.Model.Get("rawSql").MustString()
|
|
|
+ if rawSQL == "" {
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
queryResult := &QueryResult{Meta: simplejson.New(), RefId: query.RefId}
|
|
|
result.Results[query.RefId] = queryResult
|
|
|
|
|
|
- rawSql, err := e.MacroEngine.Interpolate(query, tsdbQuery.TimeRange, rawSql)
|
|
|
+ rawSQL, err := e.macroEngine.Interpolate(query, tsdbQuery.TimeRange, rawSQL)
|
|
|
if err != nil {
|
|
|
queryResult.Error = err
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- queryResult.Meta.Set("sql", rawSql)
|
|
|
+ queryResult.Meta.Set("sql", rawSQL)
|
|
|
|
|
|
- rows, err := db.Query(rawSql)
|
|
|
+ rows, err := db.Query(rawSQL)
|
|
|
if err != nil {
|
|
|
queryResult.Error = err
|
|
|
continue
|
|
|
@@ -122,13 +141,13 @@ func (e *DefaultSqlEngine) Query(
|
|
|
|
|
|
switch format {
|
|
|
case "time_series":
|
|
|
- err := transformToTimeSeries(query, rows, queryResult, tsdbQuery)
|
|
|
+ err := e.transformToTimeSeries(query, rows, queryResult, tsdbQuery)
|
|
|
if err != nil {
|
|
|
queryResult.Error = err
|
|
|
continue
|
|
|
}
|
|
|
case "table":
|
|
|
- err := transformToTable(query, rows, queryResult, tsdbQuery)
|
|
|
+ err := e.transformToTable(query, rows, queryResult, tsdbQuery)
|
|
|
if err != nil {
|
|
|
queryResult.Error = err
|
|
|
continue
|
|
|
@@ -139,6 +158,221 @@ func (e *DefaultSqlEngine) Query(
|
|
|
return result, nil
|
|
|
}
|
|
|
|
|
|
+func (e *sqlQueryEndpoint) transformToTable(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error {
|
|
|
+ columnNames, err := rows.Columns()
|
|
|
+ columnCount := len(columnNames)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ rowCount := 0
|
|
|
+ timeIndex := -1
|
|
|
+
|
|
|
+ table := &Table{
|
|
|
+ Columns: make([]TableColumn, columnCount),
|
|
|
+ Rows: make([]RowValues, 0),
|
|
|
+ }
|
|
|
+
|
|
|
+ for i, name := range columnNames {
|
|
|
+ table.Columns[i].Text = name
|
|
|
+
|
|
|
+ for _, tc := range e.timeColumnNames {
|
|
|
+ if name == tc {
|
|
|
+ timeIndex = i
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ columnTypes, err := rows.ColumnTypes()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ for ; rows.Next(); rowCount++ {
|
|
|
+ if rowCount > rowLimit {
|
|
|
+ return fmt.Errorf("query row limit exceeded, limit %d", rowLimit)
|
|
|
+ }
|
|
|
+
|
|
|
+ values, err := e.rowTransformer.Transform(columnTypes, rows)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ // converts column named time to unix timestamp in milliseconds
|
|
|
+ // to make native mssql datetime types and epoch dates work in
|
|
|
+ // annotation and table queries.
|
|
|
+ ConvertSqlTimeColumnToEpochMs(values, timeIndex)
|
|
|
+ table.Rows = append(table.Rows, values)
|
|
|
+ }
|
|
|
+
|
|
|
+ result.Tables = append(result.Tables, table)
|
|
|
+ result.Meta.Set("rowCount", rowCount)
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (e *sqlQueryEndpoint) transformToTimeSeries(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error {
|
|
|
+ pointsBySeries := make(map[string]*TimeSeries)
|
|
|
+ seriesByQueryOrder := list.New()
|
|
|
+
|
|
|
+ columnNames, err := rows.Columns()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ columnTypes, err := rows.ColumnTypes()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ rowCount := 0
|
|
|
+ timeIndex := -1
|
|
|
+ metricIndex := -1
|
|
|
+
|
|
|
+ // check columns of resultset: a column named time is mandatory
|
|
|
+ // the first text column is treated as metric name unless a column named metric is present
|
|
|
+ for i, col := range columnNames {
|
|
|
+ for _, tc := range e.timeColumnNames {
|
|
|
+ if col == tc {
|
|
|
+ timeIndex = i
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ }
|
|
|
+ switch col {
|
|
|
+ case "metric":
|
|
|
+ metricIndex = i
|
|
|
+ default:
|
|
|
+ if metricIndex == -1 {
|
|
|
+ columnType := columnTypes[i].DatabaseTypeName()
|
|
|
+
|
|
|
+ for _, mct := range e.metricColumnTypes {
|
|
|
+ if columnType == mct {
|
|
|
+ metricIndex = i
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if timeIndex == -1 {
|
|
|
+ return fmt.Errorf("Found no column named %s", strings.Join(e.timeColumnNames, " or "))
|
|
|
+ }
|
|
|
+
|
|
|
+ fillMissing := query.Model.Get("fill").MustBool(false)
|
|
|
+ var fillInterval float64
|
|
|
+ fillValue := null.Float{}
|
|
|
+ if fillMissing {
|
|
|
+ fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000
|
|
|
+ if !query.Model.Get("fillNull").MustBool(false) {
|
|
|
+ fillValue.Float64 = query.Model.Get("fillValue").MustFloat64()
|
|
|
+ fillValue.Valid = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for rows.Next() {
|
|
|
+ var timestamp float64
|
|
|
+ var value null.Float
|
|
|
+ var metric string
|
|
|
+
|
|
|
+ if rowCount > rowLimit {
|
|
|
+ return fmt.Errorf("query row limit exceeded, limit %d", rowLimit)
|
|
|
+ }
|
|
|
+
|
|
|
+ values, err := e.rowTransformer.Transform(columnTypes, rows)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ // converts column named time to unix timestamp in milliseconds to make
|
|
|
+ // native mysql datetime types and epoch dates work in
|
|
|
+ // annotation and table queries.
|
|
|
+ ConvertSqlTimeColumnToEpochMs(values, timeIndex)
|
|
|
+
|
|
|
+ switch columnValue := values[timeIndex].(type) {
|
|
|
+ case int64:
|
|
|
+ timestamp = float64(columnValue)
|
|
|
+ case float64:
|
|
|
+ timestamp = columnValue
|
|
|
+ default:
|
|
|
+ return fmt.Errorf("Invalid type for column time, must be of type timestamp or unix timestamp, got: %T %v", columnValue, columnValue)
|
|
|
+ }
|
|
|
+
|
|
|
+ if metricIndex >= 0 {
|
|
|
+ if columnValue, ok := values[metricIndex].(string); ok {
|
|
|
+ metric = columnValue
|
|
|
+ } else {
|
|
|
+ return fmt.Errorf("Column metric must be of type %s. metric column name: %s type: %s but datatype is %T", strings.Join(e.metricColumnTypes, ", "), columnNames[metricIndex], columnTypes[metricIndex].DatabaseTypeName(), values[metricIndex])
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for i, col := range columnNames {
|
|
|
+ if i == timeIndex || i == metricIndex {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ if value, err = ConvertSqlValueColumnToFloat(col, values[i]); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ if metricIndex == -1 {
|
|
|
+ metric = col
|
|
|
+ }
|
|
|
+
|
|
|
+ series, exist := pointsBySeries[metric]
|
|
|
+ if !exist {
|
|
|
+ series = &TimeSeries{Name: metric}
|
|
|
+ pointsBySeries[metric] = series
|
|
|
+ seriesByQueryOrder.PushBack(metric)
|
|
|
+ }
|
|
|
+
|
|
|
+ if fillMissing {
|
|
|
+ var intervalStart float64
|
|
|
+ if !exist {
|
|
|
+ intervalStart = float64(tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6)
|
|
|
+ } else {
|
|
|
+ intervalStart = series.Points[len(series.Points)-1][1].Float64 + fillInterval
|
|
|
+ }
|
|
|
+
|
|
|
+ // align interval start
|
|
|
+ intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
|
|
|
+
|
|
|
+ for i := intervalStart; i < timestamp; i += fillInterval {
|
|
|
+ series.Points = append(series.Points, TimePoint{fillValue, null.FloatFrom(i)})
|
|
|
+ rowCount++
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ series.Points = append(series.Points, TimePoint{value, null.FloatFrom(timestamp)})
|
|
|
+
|
|
|
+ e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() {
|
|
|
+ key := elem.Value.(string)
|
|
|
+ result.Series = append(result.Series, pointsBySeries[key])
|
|
|
+
|
|
|
+ if fillMissing {
|
|
|
+ series := pointsBySeries[key]
|
|
|
+ // fill in values from last fetched value till interval end
|
|
|
+ intervalStart := series.Points[len(series.Points)-1][1].Float64
|
|
|
+ intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6)
|
|
|
+
|
|
|
+ // align interval start
|
|
|
+ intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
|
|
|
+ for i := intervalStart + fillInterval; i < intervalEnd; i += fillInterval {
|
|
|
+ series.Points = append(series.Points, TimePoint{fillValue, null.FloatFrom(i)})
|
|
|
+ rowCount++
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ result.Meta.Set("rowCount", rowCount)
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
// ConvertSqlTimeColumnToEpochMs converts column named time to unix timestamp in milliseconds
|
|
|
// to make native datetime types and epoch dates work in annotation and table queries.
|
|
|
func ConvertSqlTimeColumnToEpochMs(values RowValues, timeIndex int) {
|