Marcus Efraimsson 7 лет назад
Родитель
Сommit
4f7882cda2
4 измененных файлов с 64 добавлено и 274 удалено
  1. 19 19
      pkg/tsdb/mssql/macros.go
  2. 1 1
      pkg/tsdb/mssql/macros_test.go
  3. 25 243
      pkg/tsdb/mssql/mssql.go
  4. 19 11
      pkg/tsdb/mssql/mssql_test.go

+ 19 - 19
pkg/tsdb/mssql/macros.go

@@ -14,18 +14,18 @@ import (
 const rsIdentifier = `([_a-zA-Z0-9]+)`
 const rsIdentifier = `([_a-zA-Z0-9]+)`
 const sExpr = `\$` + rsIdentifier + `\(([^\)]*)\)`
 const sExpr = `\$` + rsIdentifier + `\(([^\)]*)\)`
 
 
-type MsSqlMacroEngine struct {
-	TimeRange *tsdb.TimeRange
-	Query     *tsdb.Query
+type msSqlMacroEngine struct {
+	timeRange *tsdb.TimeRange
+	query     *tsdb.Query
 }
 }
 
 
-func NewMssqlMacroEngine() tsdb.SqlMacroEngine {
-	return &MsSqlMacroEngine{}
+func newMssqlMacroEngine() tsdb.SqlMacroEngine {
+	return &msSqlMacroEngine{}
 }
 }
 
 
-func (m *MsSqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) {
-	m.TimeRange = timeRange
-	m.Query = query
+func (m *msSqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) {
+	m.timeRange = timeRange
+	m.query = query
 	rExp, _ := regexp.Compile(sExpr)
 	rExp, _ := regexp.Compile(sExpr)
 	var macroError error
 	var macroError error
 
 
@@ -66,7 +66,7 @@ func replaceAllStringSubmatchFunc(re *regexp.Regexp, str string, repl func([]str
 	return result + str[lastIndex:]
 	return result + str[lastIndex:]
 }
 }
 
 
-func (m *MsSqlMacroEngine) evaluateMacro(name string, args []string) (string, error) {
+func (m *msSqlMacroEngine) evaluateMacro(name string, args []string) (string, error) {
 	switch name {
 	switch name {
 	case "__time":
 	case "__time":
 		if len(args) == 0 {
 		if len(args) == 0 {
@@ -83,11 +83,11 @@ func (m *MsSqlMacroEngine) evaluateMacro(name string, args []string) (string, er
 			return "", fmt.Errorf("missing time column argument for macro %v", name)
 			return "", fmt.Errorf("missing time column argument for macro %v", name)
 		}
 		}
 
 
-		return fmt.Sprintf("%s BETWEEN '%s' AND '%s'", args[0], m.TimeRange.GetFromAsTimeUTC().Format(time.RFC3339), m.TimeRange.GetToAsTimeUTC().Format(time.RFC3339)), nil
+		return fmt.Sprintf("%s BETWEEN '%s' AND '%s'", args[0], m.timeRange.GetFromAsTimeUTC().Format(time.RFC3339), m.timeRange.GetToAsTimeUTC().Format(time.RFC3339)), nil
 	case "__timeFrom":
 	case "__timeFrom":
-		return fmt.Sprintf("'%s'", m.TimeRange.GetFromAsTimeUTC().Format(time.RFC3339)), nil
+		return fmt.Sprintf("'%s'", m.timeRange.GetFromAsTimeUTC().Format(time.RFC3339)), nil
 	case "__timeTo":
 	case "__timeTo":
-		return fmt.Sprintf("'%s'", m.TimeRange.GetToAsTimeUTC().Format(time.RFC3339)), nil
+		return fmt.Sprintf("'%s'", m.timeRange.GetToAsTimeUTC().Format(time.RFC3339)), nil
 	case "__timeGroup":
 	case "__timeGroup":
 		if len(args) < 2 {
 		if len(args) < 2 {
 			return "", fmt.Errorf("macro %v needs time column and interval", name)
 			return "", fmt.Errorf("macro %v needs time column and interval", name)
@@ -97,16 +97,16 @@ func (m *MsSqlMacroEngine) evaluateMacro(name string, args []string) (string, er
 			return "", fmt.Errorf("error parsing interval %v", args[1])
 			return "", fmt.Errorf("error parsing interval %v", args[1])
 		}
 		}
 		if len(args) == 3 {
 		if len(args) == 3 {
-			m.Query.Model.Set("fill", true)
-			m.Query.Model.Set("fillInterval", interval.Seconds())
+			m.query.Model.Set("fill", true)
+			m.query.Model.Set("fillInterval", interval.Seconds())
 			if args[2] == "NULL" {
 			if args[2] == "NULL" {
-				m.Query.Model.Set("fillNull", true)
+				m.query.Model.Set("fillNull", true)
 			} else {
 			} else {
 				floatVal, err := strconv.ParseFloat(args[2], 64)
 				floatVal, err := strconv.ParseFloat(args[2], 64)
 				if err != nil {
 				if err != nil {
 					return "", fmt.Errorf("error parsing fill value %v", args[2])
 					return "", fmt.Errorf("error parsing fill value %v", args[2])
 				}
 				}
-				m.Query.Model.Set("fillValue", floatVal)
+				m.query.Model.Set("fillValue", floatVal)
 			}
 			}
 		}
 		}
 		return fmt.Sprintf("FLOOR(DATEDIFF(second, '1970-01-01', %s)/%.0f)*%.0f", args[0], interval.Seconds(), interval.Seconds()), nil
 		return fmt.Sprintf("FLOOR(DATEDIFF(second, '1970-01-01', %s)/%.0f)*%.0f", args[0], interval.Seconds(), interval.Seconds()), nil
@@ -114,11 +114,11 @@ func (m *MsSqlMacroEngine) evaluateMacro(name string, args []string) (string, er
 		if len(args) == 0 {
 		if len(args) == 0 {
 			return "", fmt.Errorf("missing time column argument for macro %v", name)
 			return "", fmt.Errorf("missing time column argument for macro %v", name)
 		}
 		}
-		return fmt.Sprintf("%s >= %d AND %s <= %d", args[0], m.TimeRange.GetFromAsSecondsEpoch(), args[0], m.TimeRange.GetToAsSecondsEpoch()), nil
+		return fmt.Sprintf("%s >= %d AND %s <= %d", args[0], m.timeRange.GetFromAsSecondsEpoch(), args[0], m.timeRange.GetToAsSecondsEpoch()), nil
 	case "__unixEpochFrom":
 	case "__unixEpochFrom":
-		return fmt.Sprintf("%d", m.TimeRange.GetFromAsSecondsEpoch()), nil
+		return fmt.Sprintf("%d", m.timeRange.GetFromAsSecondsEpoch()), nil
 	case "__unixEpochTo":
 	case "__unixEpochTo":
-		return fmt.Sprintf("%d", m.TimeRange.GetToAsSecondsEpoch()), nil
+		return fmt.Sprintf("%d", m.timeRange.GetToAsSecondsEpoch()), nil
 	default:
 	default:
 		return "", fmt.Errorf("Unknown macro %v", name)
 		return "", fmt.Errorf("Unknown macro %v", name)
 	}
 	}

+ 1 - 1
pkg/tsdb/mssql/macros_test.go

@@ -14,7 +14,7 @@ import (
 
 
 func TestMacroEngine(t *testing.T) {
 func TestMacroEngine(t *testing.T) {
 	Convey("MacroEngine", t, func() {
 	Convey("MacroEngine", t, func() {
-		engine := &MsSqlMacroEngine{}
+		engine := &msSqlMacroEngine{}
 		query := &tsdb.Query{
 		query := &tsdb.Query{
 			Model: simplejson.New(),
 			Model: simplejson.New(),
 		}
 		}

+ 25 - 243
pkg/tsdb/mssql/mssql.go

@@ -1,49 +1,40 @@
 package mssql
 package mssql
 
 
 import (
 import (
-	"container/list"
-	"context"
 	"database/sql"
 	"database/sql"
 	"fmt"
 	"fmt"
 	"strconv"
 	"strconv"
 	"strings"
 	"strings"
 
 
-	"math"
-
 	_ "github.com/denisenkom/go-mssqldb"
 	_ "github.com/denisenkom/go-mssqldb"
 	"github.com/go-xorm/core"
 	"github.com/go-xorm/core"
-	"github.com/grafana/grafana/pkg/components/null"
 	"github.com/grafana/grafana/pkg/log"
 	"github.com/grafana/grafana/pkg/log"
 	"github.com/grafana/grafana/pkg/models"
 	"github.com/grafana/grafana/pkg/models"
 	"github.com/grafana/grafana/pkg/tsdb"
 	"github.com/grafana/grafana/pkg/tsdb"
 )
 )
 
 
-type MssqlQueryEndpoint struct {
-	sqlEngine tsdb.SqlEngine
-	log       log.Logger
-}
-
 func init() {
 func init() {
-	tsdb.RegisterTsdbQueryEndpoint("mssql", NewMssqlQueryEndpoint)
+	tsdb.RegisterTsdbQueryEndpoint("mssql", newMssqlQueryEndpoint)
 }
 }
 
 
-func NewMssqlQueryEndpoint(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
-	endpoint := &MssqlQueryEndpoint{
-		log: log.New("tsdb.mssql"),
-	}
-
-	endpoint.sqlEngine = &tsdb.DefaultSqlEngine{
-		MacroEngine: NewMssqlMacroEngine(),
-	}
+func newMssqlQueryEndpoint(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
+	logger := log.New("tsdb.mssql")
 
 
 	cnnstr := generateConnectionString(datasource)
 	cnnstr := generateConnectionString(datasource)
-	endpoint.log.Debug("getEngine", "connection", cnnstr)
+	logger.Debug("getEngine", "connection", cnnstr)
 
 
-	if err := endpoint.sqlEngine.InitEngine("mssql", datasource, cnnstr); err != nil {
-		return nil, err
+	config := tsdb.SqlQueryEndpointConfiguration{
+		DriverName:        "mssql",
+		ConnectionString:  cnnstr,
+		Datasource:        datasource,
+		MetricColumnTypes: []string{"VARCHAR", "CHAR", "NVARCHAR", "NCHAR"},
+	}
+
+	rowTransformer := mssqlRowTransformer{
+		log: logger,
 	}
 	}
 
 
-	return endpoint, nil
+	return tsdb.NewSqlQueryEndpoint(&config, &rowTransformer, newMssqlMacroEngine(), logger)
 }
 }
 
 
 func generateConnectionString(datasource *models.DataSource) string {
 func generateConnectionString(datasource *models.DataSource) string {
@@ -70,71 +61,16 @@ func generateConnectionString(datasource *models.DataSource) string {
 	)
 	)
 }
 }
 
 
-// Query is the main function for the MssqlQueryEndpoint
-func (e *MssqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) {
-	return e.sqlEngine.Query(ctx, dsInfo, tsdbQuery, e.transformToTimeSeries, e.transformToTable)
-}
-
-func (e MssqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
-	columnNames, err := rows.Columns()
-	columnCount := len(columnNames)
-
-	if err != nil {
-		return err
-	}
-
-	rowLimit := 1000000
-	rowCount := 0
-	timeIndex := -1
-
-	table := &tsdb.Table{
-		Columns: make([]tsdb.TableColumn, columnCount),
-		Rows:    make([]tsdb.RowValues, 0),
-	}
-
-	for i, name := range columnNames {
-		table.Columns[i].Text = name
-
-		// check if there is a column named time
-		switch name {
-		case "time":
-			timeIndex = i
-		}
-	}
-
-	columnTypes, err := rows.ColumnTypes()
-	if err != nil {
-		return err
-	}
-
-	for ; rows.Next(); rowCount++ {
-		if rowCount > rowLimit {
-			return fmt.Errorf("MsSQL query row limit exceeded, limit %d", rowLimit)
-		}
-
-		values, err := e.getTypedRowData(columnTypes, rows)
-		if err != nil {
-			return err
-		}
-
-		// converts column named time to unix timestamp in milliseconds
-		// to make native mssql datetime types and epoch dates work in
-		// annotation and table queries.
-		tsdb.ConvertSqlTimeColumnToEpochMs(values, timeIndex)
-		table.Rows = append(table.Rows, values)
-	}
-
-	result.Tables = append(result.Tables, table)
-	result.Meta.Set("rowCount", rowCount)
-	return nil
+type mssqlRowTransformer struct {
+	log log.Logger
 }
 }
 
 
-func (e MssqlQueryEndpoint) getTypedRowData(types []*sql.ColumnType, rows *core.Rows) (tsdb.RowValues, error) {
-	values := make([]interface{}, len(types))
-	valuePtrs := make([]interface{}, len(types))
+func (t *mssqlRowTransformer) Transform(columnTypes []*sql.ColumnType, rows *core.Rows) (tsdb.RowValues, error) {
+	values := make([]interface{}, len(columnTypes))
+	valuePtrs := make([]interface{}, len(columnTypes))
 
 
-	for i, stype := range types {
-		e.log.Debug("type", "type", stype)
+	for i, stype := range columnTypes {
+		t.log.Debug("type", "type", stype)
 		valuePtrs[i] = &values[i]
 		valuePtrs[i] = &values[i]
 	}
 	}
 
 
@@ -144,17 +80,17 @@ func (e MssqlQueryEndpoint) getTypedRowData(types []*sql.ColumnType, rows *core.
 
 
 	// convert types not handled by denisenkom/go-mssqldb
 	// convert types not handled by denisenkom/go-mssqldb
 	// unhandled types are returned as []byte
 	// unhandled types are returned as []byte
-	for i := 0; i < len(types); i++ {
+	for i := 0; i < len(columnTypes); i++ {
 		if value, ok := values[i].([]byte); ok {
 		if value, ok := values[i].([]byte); ok {
-			switch types[i].DatabaseTypeName() {
+			switch columnTypes[i].DatabaseTypeName() {
 			case "MONEY", "SMALLMONEY", "DECIMAL":
 			case "MONEY", "SMALLMONEY", "DECIMAL":
 				if v, err := strconv.ParseFloat(string(value), 64); err == nil {
 				if v, err := strconv.ParseFloat(string(value), 64); err == nil {
 					values[i] = v
 					values[i] = v
 				} else {
 				} else {
-					e.log.Debug("Rows", "Error converting numeric to float", value)
+					t.log.Debug("Rows", "Error converting numeric to float", value)
 				}
 				}
 			default:
 			default:
-				e.log.Debug("Rows", "Unknown database type", types[i].DatabaseTypeName(), "value", value)
+				t.log.Debug("Rows", "Unknown database type", columnTypes[i].DatabaseTypeName(), "value", value)
 				values[i] = string(value)
 				values[i] = string(value)
 			}
 			}
 		}
 		}
@@ -162,157 +98,3 @@ func (e MssqlQueryEndpoint) getTypedRowData(types []*sql.ColumnType, rows *core.
 
 
 	return values, nil
 	return values, nil
 }
 }
-
-func (e MssqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
-	pointsBySeries := make(map[string]*tsdb.TimeSeries)
-	seriesByQueryOrder := list.New()
-
-	columnNames, err := rows.Columns()
-	if err != nil {
-		return err
-	}
-
-	columnTypes, err := rows.ColumnTypes()
-	if err != nil {
-		return err
-	}
-
-	rowLimit := 1000000
-	rowCount := 0
-	timeIndex := -1
-	metricIndex := -1
-
-	// check columns of resultset: a column named time is mandatory
-	// the first text column is treated as metric name unless a column named metric is present
-	for i, col := range columnNames {
-		switch col {
-		case "time":
-			timeIndex = i
-		case "metric":
-			metricIndex = i
-		default:
-			if metricIndex == -1 {
-				switch columnTypes[i].DatabaseTypeName() {
-				case "VARCHAR", "CHAR", "NVARCHAR", "NCHAR":
-					metricIndex = i
-				}
-			}
-		}
-	}
-
-	if timeIndex == -1 {
-		return fmt.Errorf("Found no column named time")
-	}
-
-	fillMissing := query.Model.Get("fill").MustBool(false)
-	var fillInterval float64
-	fillValue := null.Float{}
-	if fillMissing {
-		fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000
-		if !query.Model.Get("fillNull").MustBool(false) {
-			fillValue.Float64 = query.Model.Get("fillValue").MustFloat64()
-			fillValue.Valid = true
-		}
-	}
-
-	for rows.Next() {
-		var timestamp float64
-		var value null.Float
-		var metric string
-
-		if rowCount > rowLimit {
-			return fmt.Errorf("MSSQL query row limit exceeded, limit %d", rowLimit)
-		}
-
-		values, err := e.getTypedRowData(columnTypes, rows)
-		if err != nil {
-			return err
-		}
-
-		// converts column named time to unix timestamp in milliseconds to make
-		// native mysql datetime types and epoch dates work in
-		// annotation and table queries.
-		tsdb.ConvertSqlTimeColumnToEpochMs(values, timeIndex)
-
-		switch columnValue := values[timeIndex].(type) {
-		case int64:
-			timestamp = float64(columnValue)
-		case float64:
-			timestamp = columnValue
-		default:
-			return fmt.Errorf("Invalid type for column time, must be of type timestamp or unix timestamp, got: %T %v", columnValue, columnValue)
-		}
-
-		if metricIndex >= 0 {
-			if columnValue, ok := values[metricIndex].(string); ok {
-				metric = columnValue
-			} else {
-				return fmt.Errorf("Column metric must be of type CHAR, VARCHAR, NCHAR or NVARCHAR. metric column name: %s type: %s but datatype is %T", columnNames[metricIndex], columnTypes[metricIndex].DatabaseTypeName(), values[metricIndex])
-			}
-		}
-
-		for i, col := range columnNames {
-			if i == timeIndex || i == metricIndex {
-				continue
-			}
-
-			if value, err = tsdb.ConvertSqlValueColumnToFloat(col, values[i]); err != nil {
-				return err
-			}
-
-			if metricIndex == -1 {
-				metric = col
-			}
-
-			series, exist := pointsBySeries[metric]
-			if !exist {
-				series = &tsdb.TimeSeries{Name: metric}
-				pointsBySeries[metric] = series
-				seriesByQueryOrder.PushBack(metric)
-			}
-
-			if fillMissing {
-				var intervalStart float64
-				if !exist {
-					intervalStart = float64(tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6)
-				} else {
-					intervalStart = series.Points[len(series.Points)-1][1].Float64 + fillInterval
-				}
-
-				// align interval start
-				intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
-
-				for i := intervalStart; i < timestamp; i += fillInterval {
-					series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
-					rowCount++
-				}
-			}
-
-			series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)})
-
-			e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value)
-		}
-	}
-
-	for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() {
-		key := elem.Value.(string)
-		result.Series = append(result.Series, pointsBySeries[key])
-
-		if fillMissing {
-			series := pointsBySeries[key]
-			// fill in values from last fetched value till interval end
-			intervalStart := series.Points[len(series.Points)-1][1].Float64
-			intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6)
-
-			// align interval start
-			intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
-			for i := intervalStart + fillInterval; i < intervalEnd; i += fillInterval {
-				series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
-				rowCount++
-			}
-		}
-	}
-
-	result.Meta.Set("rowCount", rowCount)
-	return nil
-}

+ 19 - 11
pkg/tsdb/mssql/mssql_test.go

@@ -8,8 +8,9 @@ import (
 	"time"
 	"time"
 
 
 	"github.com/go-xorm/xorm"
 	"github.com/go-xorm/xorm"
+	"github.com/grafana/grafana/pkg/components/securejsondata"
 	"github.com/grafana/grafana/pkg/components/simplejson"
 	"github.com/grafana/grafana/pkg/components/simplejson"
-	"github.com/grafana/grafana/pkg/log"
+	"github.com/grafana/grafana/pkg/models"
 	"github.com/grafana/grafana/pkg/services/sqlstore/sqlutil"
 	"github.com/grafana/grafana/pkg/services/sqlstore/sqlutil"
 	"github.com/grafana/grafana/pkg/tsdb"
 	"github.com/grafana/grafana/pkg/tsdb"
 	. "github.com/smartystreets/goconvey/convey"
 	. "github.com/smartystreets/goconvey/convey"
@@ -19,8 +20,9 @@ import (
 // The tests require a MSSQL db named grafanatest and a user/password grafana/Password!
 // The tests require a MSSQL db named grafanatest and a user/password grafana/Password!
 // Use the docker/blocks/mssql_tests/docker-compose.yaml to spin up a
 // Use the docker/blocks/mssql_tests/docker-compose.yaml to spin up a
 // preconfigured MSSQL server suitable for running these tests.
 // preconfigured MSSQL server suitable for running these tests.
-// There is also a dashboard.json in same directory that you can import to Grafana
-// once you've created a datasource for the test server/database.
+// There is also a datasource and dashboard provisioned by devenv scripts that you can
+// use to verify that the generated data are vizualized as expected, see
+// devenv/README.md for setup instructions.
 // If needed, change the variable below to the IP address of the database.
 // If needed, change the variable below to the IP address of the database.
 var serverIP = "localhost"
 var serverIP = "localhost"
 
 
@@ -28,19 +30,25 @@ func TestMSSQL(t *testing.T) {
 	SkipConvey("MSSQL", t, func() {
 	SkipConvey("MSSQL", t, func() {
 		x := InitMSSQLTestDB(t)
 		x := InitMSSQLTestDB(t)
 
 
-		endpoint := &MssqlQueryEndpoint{
-			sqlEngine: &tsdb.DefaultSqlEngine{
-				MacroEngine: NewMssqlMacroEngine(),
-				XormEngine:  x,
-			},
-			log: log.New("tsdb.mssql"),
+		origXormEngine := tsdb.NewXormEngine
+		tsdb.NewXormEngine = func(d, c string) (*xorm.Engine, error) {
+			return x, nil
 		}
 		}
 
 
-		sess := x.NewSession()
-		defer sess.Close()
+		endpoint, err := newMssqlQueryEndpoint(&models.DataSource{
+			JsonData:       simplejson.New(),
+			SecureJsonData: securejsondata.SecureJsonData{},
+		})
+		So(err, ShouldBeNil)
 
 
+		sess := x.NewSession()
 		fromStart := time.Date(2018, 3, 15, 13, 0, 0, 0, time.UTC).In(time.Local)
 		fromStart := time.Date(2018, 3, 15, 13, 0, 0, 0, time.UTC).In(time.Local)
 
 
+		Reset(func() {
+			sess.Close()
+			tsdb.NewXormEngine = origXormEngine
+		})
+
 		Convey("Given a table with different native data types", func() {
 		Convey("Given a table with different native data types", func() {
 			sql := `
 			sql := `
 					IF OBJECT_ID('dbo.[mssql_types]', 'U') IS NOT NULL
 					IF OBJECT_ID('dbo.[mssql_types]', 'U') IS NOT NULL