Przeglądaj źródła

refactor sql engine to make it hold all common code for sql datasources

Marcus Efraimsson 7 lat temu
rodzic
commit
d42cea5d42
1 zmienionych plików z 279 dodań i 45 usunięć
  1. 279 45
      pkg/tsdb/sql_engine.go

+ 279 - 45
pkg/tsdb/sql_engine.go

@@ -1,11 +1,17 @@
 package tsdb
 
 import (
+	"container/list"
 	"context"
+	"database/sql"
 	"fmt"
+	"math"
+	"strings"
 	"sync"
 	"time"
 
+	"github.com/grafana/grafana/pkg/log"
+
 	"github.com/grafana/grafana/pkg/components/null"
 
 	"github.com/go-xorm/core"
@@ -14,27 +20,15 @@ import (
 	"github.com/grafana/grafana/pkg/models"
 )
 
-// SqlEngine is a wrapper class around xorm for relational database data sources.
-type SqlEngine interface {
-	InitEngine(driverName string, dsInfo *models.DataSource, cnnstr string) error
-	Query(
-		ctx context.Context,
-		ds *models.DataSource,
-		query *TsdbQuery,
-		transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
-		transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
-	) (*Response, error)
-}
-
 // SqlMacroEngine interpolates macros into sql. It takes in the Query to have access to query context and
 // timeRange to be able to generate queries that use from and to.
 type SqlMacroEngine interface {
 	Interpolate(query *Query, timeRange *TimeRange, sql string) (string, error)
 }
 
-type DefaultSqlEngine struct {
-	MacroEngine SqlMacroEngine
-	XormEngine  *xorm.Engine
+// SqlTableRowTransformer transforms a query result row to RowValues with proper types.
+type SqlTableRowTransformer interface {
+	Transform(columnTypes []*sql.ColumnType, rows *core.Rows) (RowValues, error)
 }
 
 type engineCacheType struct {
@@ -48,69 +42,92 @@ var engineCache = engineCacheType{
 	versions: make(map[int64]int),
 }
 
-// InitEngine creates the db connection and inits the xorm engine or loads it from the engine cache
-func (e *DefaultSqlEngine) InitEngine(driverName string, dsInfo *models.DataSource, cnnstr string) error {
+var NewXormEngine = func(driverName string, connectionString string) (*xorm.Engine, error) {
+	return xorm.NewEngine(driverName, connectionString)
+}
+
+type sqlQueryEndpoint struct {
+	macroEngine       SqlMacroEngine
+	rowTransformer    SqlTableRowTransformer
+	engine            *xorm.Engine
+	timeColumnNames   []string
+	metricColumnTypes []string
+	log               log.Logger
+}
+
+type SqlQueryEndpointConfiguration struct {
+	DriverName        string
+	Datasource        *models.DataSource
+	ConnectionString  string
+	TimeColumnNames   []string
+	MetricColumnTypes []string
+}
+
+var NewSqlQueryEndpoint = func(config *SqlQueryEndpointConfiguration, rowTransformer SqlTableRowTransformer, macroEngine SqlMacroEngine, log log.Logger) (TsdbQueryEndpoint, error) {
+	queryEndpoint := sqlQueryEndpoint{
+		rowTransformer:  rowTransformer,
+		macroEngine:     macroEngine,
+		timeColumnNames: []string{"time"},
+		log:             log,
+	}
+
+	if len(config.TimeColumnNames) > 0 {
+		queryEndpoint.timeColumnNames = config.TimeColumnNames
+	}
+
 	engineCache.Lock()
 	defer engineCache.Unlock()
 
-	if engine, present := engineCache.cache[dsInfo.Id]; present {
-		if version := engineCache.versions[dsInfo.Id]; version == dsInfo.Version {
-			e.XormEngine = engine
-			return nil
+	if engine, present := engineCache.cache[config.Datasource.Id]; present {
+		if version := engineCache.versions[config.Datasource.Id]; version == config.Datasource.Version {
+			queryEndpoint.engine = engine
+			return &queryEndpoint, nil
 		}
 	}
 
-	engine, err := xorm.NewEngine(driverName, cnnstr)
+	engine, err := NewXormEngine(config.DriverName, config.ConnectionString)
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	engine.SetMaxOpenConns(10)
 	engine.SetMaxIdleConns(10)
 
-	engineCache.versions[dsInfo.Id] = dsInfo.Version
-	engineCache.cache[dsInfo.Id] = engine
-	e.XormEngine = engine
+	engineCache.versions[config.Datasource.Id] = config.Datasource.Version
+	engineCache.cache[config.Datasource.Id] = engine
+	queryEndpoint.engine = engine
 
-	return nil
+	return &queryEndpoint, nil
 }
 
-// Query is a default implementation of the Query method for an SQL data source.
-// The caller of this function must implement transformToTimeSeries and transformToTable and
-// pass them in as parameters.
-func (e *DefaultSqlEngine) Query(
-	ctx context.Context,
-	dsInfo *models.DataSource,
-	tsdbQuery *TsdbQuery,
-	transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
-	transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
-) (*Response, error) {
+// Query is the main function for the SqlQueryEndpoint
+func (e *sqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *TsdbQuery) (*Response, error) {
 	result := &Response{
 		Results: make(map[string]*QueryResult),
 	}
 
-	session := e.XormEngine.NewSession()
+	session := e.engine.NewSession()
 	defer session.Close()
 	db := session.DB()
 
 	for _, query := range tsdbQuery.Queries {
-		rawSql := query.Model.Get("rawSql").MustString()
-		if rawSql == "" {
+		rawSQL := query.Model.Get("rawSql").MustString()
+		if rawSQL == "" {
 			continue
 		}
 
 		queryResult := &QueryResult{Meta: simplejson.New(), RefId: query.RefId}
 		result.Results[query.RefId] = queryResult
 
-		rawSql, err := e.MacroEngine.Interpolate(query, tsdbQuery.TimeRange, rawSql)
+		rawSQL, err := e.macroEngine.Interpolate(query, tsdbQuery.TimeRange, rawSQL)
 		if err != nil {
 			queryResult.Error = err
 			continue
 		}
 
-		queryResult.Meta.Set("sql", rawSql)
+		queryResult.Meta.Set("sql", rawSQL)
 
-		rows, err := db.Query(rawSql)
+		rows, err := db.Query(rawSQL)
 		if err != nil {
 			queryResult.Error = err
 			continue
@@ -122,13 +139,13 @@ func (e *DefaultSqlEngine) Query(
 
 		switch format {
 		case "time_series":
-			err := transformToTimeSeries(query, rows, queryResult, tsdbQuery)
+			err := e.transformToTimeSeries(query, rows, queryResult, tsdbQuery)
 			if err != nil {
 				queryResult.Error = err
 				continue
 			}
 		case "table":
-			err := transformToTable(query, rows, queryResult, tsdbQuery)
+			err := e.transformToTable(query, rows, queryResult, tsdbQuery)
 			if err != nil {
 				queryResult.Error = err
 				continue
@@ -139,6 +156,223 @@ func (e *DefaultSqlEngine) Query(
 	return result, nil
 }
 
+func (e *sqlQueryEndpoint) transformToTable(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error {
+	columnNames, err := rows.Columns()
+	columnCount := len(columnNames)
+
+	if err != nil {
+		return err
+	}
+
+	rowLimit := 1000000
+	rowCount := 0
+	timeIndex := -1
+
+	table := &Table{
+		Columns: make([]TableColumn, columnCount),
+		Rows:    make([]RowValues, 0),
+	}
+
+	for i, name := range columnNames {
+		table.Columns[i].Text = name
+
+		for _, tc := range e.timeColumnNames {
+			if name == tc {
+				timeIndex = i
+				break
+			}
+		}
+	}
+
+	columnTypes, err := rows.ColumnTypes()
+	if err != nil {
+		return err
+	}
+
+	for ; rows.Next(); rowCount++ {
+		if rowCount > rowLimit {
+			return fmt.Errorf("query row limit exceeded, limit %d", rowLimit)
+		}
+
+		values, err := e.rowTransformer.Transform(columnTypes, rows)
+		if err != nil {
+			return err
+		}
+
+		// converts column named time to unix timestamp in milliseconds
+		// to make native mssql datetime types and epoch dates work in
+		// annotation and table queries.
+		ConvertSqlTimeColumnToEpochMs(values, timeIndex)
+		table.Rows = append(table.Rows, values)
+	}
+
+	result.Tables = append(result.Tables, table)
+	result.Meta.Set("rowCount", rowCount)
+	return nil
+}
+
+func (e *sqlQueryEndpoint) transformToTimeSeries(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error {
+	pointsBySeries := make(map[string]*TimeSeries)
+	seriesByQueryOrder := list.New()
+
+	columnNames, err := rows.Columns()
+	if err != nil {
+		return err
+	}
+
+	columnTypes, err := rows.ColumnTypes()
+	if err != nil {
+		return err
+	}
+
+	rowLimit := 1000000
+	rowCount := 0
+	timeIndex := -1
+	metricIndex := -1
+
+	// check columns of resultset: a column named time is mandatory
+	// the first text column is treated as metric name unless a column named metric is present
+	for i, col := range columnNames {
+		for _, tc := range e.timeColumnNames {
+			if col == tc {
+				timeIndex = i
+				continue
+			}
+		}
+		switch col {
+		case "metric":
+			metricIndex = i
+		default:
+			if metricIndex == -1 {
+				columnType := columnTypes[i].DatabaseTypeName()
+
+				for _, mct := range e.metricColumnTypes {
+					if columnType == mct {
+						metricIndex = i
+						continue
+					}
+				}
+			}
+		}
+	}
+
+	if timeIndex == -1 {
+		return fmt.Errorf("Found no column named %s", strings.Join(e.timeColumnNames, " or "))
+	}
+
+	fillMissing := query.Model.Get("fill").MustBool(false)
+	var fillInterval float64
+	fillValue := null.Float{}
+	if fillMissing {
+		fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000
+		if !query.Model.Get("fillNull").MustBool(false) {
+			fillValue.Float64 = query.Model.Get("fillValue").MustFloat64()
+			fillValue.Valid = true
+		}
+	}
+
+	for rows.Next() {
+		var timestamp float64
+		var value null.Float
+		var metric string
+
+		if rowCount > rowLimit {
+			return fmt.Errorf("query row limit exceeded, limit %d", rowLimit)
+		}
+
+		values, err := e.rowTransformer.Transform(columnTypes, rows)
+		if err != nil {
+			return err
+		}
+
+		// converts column named time to unix timestamp in milliseconds to make
+		// native mysql datetime types and epoch dates work in
+		// annotation and table queries.
+		ConvertSqlTimeColumnToEpochMs(values, timeIndex)
+
+		switch columnValue := values[timeIndex].(type) {
+		case int64:
+			timestamp = float64(columnValue)
+		case float64:
+			timestamp = columnValue
+		default:
+			return fmt.Errorf("Invalid type for column time, must be of type timestamp or unix timestamp, got: %T %v", columnValue, columnValue)
+		}
+
+		if metricIndex >= 0 {
+			if columnValue, ok := values[metricIndex].(string); ok {
+				metric = columnValue
+			} else {
+				return fmt.Errorf("Column metric must be of type %s. metric column name: %s type: %s but datatype is %T", strings.Join(e.metricColumnTypes, ", "), columnNames[metricIndex], columnTypes[metricIndex].DatabaseTypeName(), values[metricIndex])
+			}
+		}
+
+		for i, col := range columnNames {
+			if i == timeIndex || i == metricIndex {
+				continue
+			}
+
+			if value, err = ConvertSqlValueColumnToFloat(col, values[i]); err != nil {
+				return err
+			}
+
+			if metricIndex == -1 {
+				metric = col
+			}
+
+			series, exist := pointsBySeries[metric]
+			if !exist {
+				series = &TimeSeries{Name: metric}
+				pointsBySeries[metric] = series
+				seriesByQueryOrder.PushBack(metric)
+			}
+
+			if fillMissing {
+				var intervalStart float64
+				if !exist {
+					intervalStart = float64(tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6)
+				} else {
+					intervalStart = series.Points[len(series.Points)-1][1].Float64 + fillInterval
+				}
+
+				// align interval start
+				intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
+
+				for i := intervalStart; i < timestamp; i += fillInterval {
+					series.Points = append(series.Points, TimePoint{fillValue, null.FloatFrom(i)})
+					rowCount++
+				}
+			}
+
+			series.Points = append(series.Points, TimePoint{value, null.FloatFrom(timestamp)})
+
+			e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value)
+		}
+	}
+
+	for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() {
+		key := elem.Value.(string)
+		result.Series = append(result.Series, pointsBySeries[key])
+
+		if fillMissing {
+			series := pointsBySeries[key]
+			// fill in values from last fetched value till interval end
+			intervalStart := series.Points[len(series.Points)-1][1].Float64
+			intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6)
+
+			// align interval start
+			intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
+			for i := intervalStart + fillInterval; i < intervalEnd; i += fillInterval {
+				series.Points = append(series.Points, TimePoint{fillValue, null.FloatFrom(i)})
+				rowCount++
+			}
+		}
+	}
+
+	result.Meta.Set("rowCount", rowCount)
+	return nil
+}
+
 // ConvertSqlTimeColumnToEpochMs converts column named time to unix timestamp in milliseconds
 // to make native datetime types and epoch dates work in annotation and table queries.
 func ConvertSqlTimeColumnToEpochMs(values RowValues, timeIndex int) {