Browse Source

mysql: use new sql engine

Marcus Efraimsson 7 years ago
parent
commit
27db454012
4 changed files with 61 additions and 274 deletions
  1. 19 19
      pkg/tsdb/mysql/macros.go
  2. 1 1
      pkg/tsdb/mysql/macros_test.go
  3. 22 243
      pkg/tsdb/mysql/mysql.go
  4. 19 11
      pkg/tsdb/mysql/mysql_test.go

+ 19 - 19
pkg/tsdb/mysql/macros.go

@@ -14,18 +14,18 @@ import (
 const rsIdentifier = `([_a-zA-Z0-9]+)`
 const rsIdentifier = `([_a-zA-Z0-9]+)`
 const sExpr = `\$` + rsIdentifier + `\(([^\)]*)\)`
 const sExpr = `\$` + rsIdentifier + `\(([^\)]*)\)`
 
 
-type MySqlMacroEngine struct {
-	TimeRange *tsdb.TimeRange
-	Query     *tsdb.Query
+type mySqlMacroEngine struct {
+	timeRange *tsdb.TimeRange
+	query     *tsdb.Query
 }
 }
 
 
-func NewMysqlMacroEngine() tsdb.SqlMacroEngine {
-	return &MySqlMacroEngine{}
+func newMysqlMacroEngine() tsdb.SqlMacroEngine {
+	return &mySqlMacroEngine{}
 }
 }
 
 
-func (m *MySqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) {
-	m.TimeRange = timeRange
-	m.Query = query
+func (m *mySqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) {
+	m.timeRange = timeRange
+	m.query = query
 	rExp, _ := regexp.Compile(sExpr)
 	rExp, _ := regexp.Compile(sExpr)
 	var macroError error
 	var macroError error
 
 
@@ -66,7 +66,7 @@ func replaceAllStringSubmatchFunc(re *regexp.Regexp, str string, repl func([]str
 	return result + str[lastIndex:]
 	return result + str[lastIndex:]
 }
 }
 
 
-func (m *MySqlMacroEngine) evaluateMacro(name string, args []string) (string, error) {
+func (m *mySqlMacroEngine) evaluateMacro(name string, args []string) (string, error) {
 	switch name {
 	switch name {
 	case "__timeEpoch", "__time":
 	case "__timeEpoch", "__time":
 		if len(args) == 0 {
 		if len(args) == 0 {
@@ -78,11 +78,11 @@ func (m *MySqlMacroEngine) evaluateMacro(name string, args []string) (string, er
 			return "", fmt.Errorf("missing time column argument for macro %v", name)
 			return "", fmt.Errorf("missing time column argument for macro %v", name)
 		}
 		}
 
 
-		return fmt.Sprintf("%s BETWEEN '%s' AND '%s'", args[0], m.TimeRange.GetFromAsTimeUTC().Format(time.RFC3339), m.TimeRange.GetToAsTimeUTC().Format(time.RFC3339)), nil
+		return fmt.Sprintf("%s BETWEEN '%s' AND '%s'", args[0], m.timeRange.GetFromAsTimeUTC().Format(time.RFC3339), m.timeRange.GetToAsTimeUTC().Format(time.RFC3339)), nil
 	case "__timeFrom":
 	case "__timeFrom":
-		return fmt.Sprintf("'%s'", m.TimeRange.GetFromAsTimeUTC().Format(time.RFC3339)), nil
+		return fmt.Sprintf("'%s'", m.timeRange.GetFromAsTimeUTC().Format(time.RFC3339)), nil
 	case "__timeTo":
 	case "__timeTo":
-		return fmt.Sprintf("'%s'", m.TimeRange.GetToAsTimeUTC().Format(time.RFC3339)), nil
+		return fmt.Sprintf("'%s'", m.timeRange.GetToAsTimeUTC().Format(time.RFC3339)), nil
 	case "__timeGroup":
 	case "__timeGroup":
 		if len(args) < 2 {
 		if len(args) < 2 {
 			return "", fmt.Errorf("macro %v needs time column and interval", name)
 			return "", fmt.Errorf("macro %v needs time column and interval", name)
@@ -92,16 +92,16 @@ func (m *MySqlMacroEngine) evaluateMacro(name string, args []string) (string, er
 			return "", fmt.Errorf("error parsing interval %v", args[1])
 			return "", fmt.Errorf("error parsing interval %v", args[1])
 		}
 		}
 		if len(args) == 3 {
 		if len(args) == 3 {
-			m.Query.Model.Set("fill", true)
-			m.Query.Model.Set("fillInterval", interval.Seconds())
+			m.query.Model.Set("fill", true)
+			m.query.Model.Set("fillInterval", interval.Seconds())
 			if args[2] == "NULL" {
 			if args[2] == "NULL" {
-				m.Query.Model.Set("fillNull", true)
+				m.query.Model.Set("fillNull", true)
 			} else {
 			} else {
 				floatVal, err := strconv.ParseFloat(args[2], 64)
 				floatVal, err := strconv.ParseFloat(args[2], 64)
 				if err != nil {
 				if err != nil {
 					return "", fmt.Errorf("error parsing fill value %v", args[2])
 					return "", fmt.Errorf("error parsing fill value %v", args[2])
 				}
 				}
-				m.Query.Model.Set("fillValue", floatVal)
+				m.query.Model.Set("fillValue", floatVal)
 			}
 			}
 		}
 		}
 		return fmt.Sprintf("UNIX_TIMESTAMP(%s) DIV %.0f * %.0f", args[0], interval.Seconds(), interval.Seconds()), nil
 		return fmt.Sprintf("UNIX_TIMESTAMP(%s) DIV %.0f * %.0f", args[0], interval.Seconds(), interval.Seconds()), nil
@@ -109,11 +109,11 @@ func (m *MySqlMacroEngine) evaluateMacro(name string, args []string) (string, er
 		if len(args) == 0 {
 		if len(args) == 0 {
 			return "", fmt.Errorf("missing time column argument for macro %v", name)
 			return "", fmt.Errorf("missing time column argument for macro %v", name)
 		}
 		}
-		return fmt.Sprintf("%s >= %d AND %s <= %d", args[0], m.TimeRange.GetFromAsSecondsEpoch(), args[0], m.TimeRange.GetToAsSecondsEpoch()), nil
+		return fmt.Sprintf("%s >= %d AND %s <= %d", args[0], m.timeRange.GetFromAsSecondsEpoch(), args[0], m.timeRange.GetToAsSecondsEpoch()), nil
 	case "__unixEpochFrom":
 	case "__unixEpochFrom":
-		return fmt.Sprintf("%d", m.TimeRange.GetFromAsSecondsEpoch()), nil
+		return fmt.Sprintf("%d", m.timeRange.GetFromAsSecondsEpoch()), nil
 	case "__unixEpochTo":
 	case "__unixEpochTo":
-		return fmt.Sprintf("%d", m.TimeRange.GetToAsSecondsEpoch()), nil
+		return fmt.Sprintf("%d", m.timeRange.GetToAsSecondsEpoch()), nil
 	default:
 	default:
 		return "", fmt.Errorf("Unknown macro %v", name)
 		return "", fmt.Errorf("Unknown macro %v", name)
 	}
 	}

+ 1 - 1
pkg/tsdb/mysql/macros_test.go

@@ -12,7 +12,7 @@ import (
 
 
 func TestMacroEngine(t *testing.T) {
 func TestMacroEngine(t *testing.T) {
 	Convey("MacroEngine", t, func() {
 	Convey("MacroEngine", t, func() {
-		engine := &MySqlMacroEngine{}
+		engine := &mySqlMacroEngine{}
 		query := &tsdb.Query{}
 		query := &tsdb.Query{}
 
 
 		Convey("Given a time range between 2018-04-12 00:00 and 2018-04-12 00:05", func() {
 		Convey("Given a time range between 2018-04-12 00:00 and 2018-04-12 00:05", func() {

+ 22 - 243
pkg/tsdb/mysql/mysql.go

@@ -1,39 +1,24 @@
 package mysql
 package mysql
 
 
 import (
 import (
-	"container/list"
-	"context"
 	"database/sql"
 	"database/sql"
 	"fmt"
 	"fmt"
-	"math"
 	"reflect"
 	"reflect"
 	"strconv"
 	"strconv"
 
 
 	"github.com/go-sql-driver/mysql"
 	"github.com/go-sql-driver/mysql"
 	"github.com/go-xorm/core"
 	"github.com/go-xorm/core"
-	"github.com/grafana/grafana/pkg/components/null"
 	"github.com/grafana/grafana/pkg/log"
 	"github.com/grafana/grafana/pkg/log"
 	"github.com/grafana/grafana/pkg/models"
 	"github.com/grafana/grafana/pkg/models"
 	"github.com/grafana/grafana/pkg/tsdb"
 	"github.com/grafana/grafana/pkg/tsdb"
 )
 )
 
 
-type MysqlQueryEndpoint struct {
-	sqlEngine tsdb.SqlEngine
-	log       log.Logger
-}
-
 func init() {
 func init() {
-	tsdb.RegisterTsdbQueryEndpoint("mysql", NewMysqlQueryEndpoint)
+	tsdb.RegisterTsdbQueryEndpoint("mysql", newMysqlQueryEndpoint)
 }
 }
 
 
-func NewMysqlQueryEndpoint(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
-	endpoint := &MysqlQueryEndpoint{
-		log: log.New("tsdb.mysql"),
-	}
-
-	endpoint.sqlEngine = &tsdb.DefaultSqlEngine{
-		MacroEngine: NewMysqlMacroEngine(),
-	}
+func newMysqlQueryEndpoint(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
+	logger := log.New("tsdb.mysql")
 
 
 	cnnstr := fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&parseTime=true&loc=UTC&allowNativePasswords=true",
 	cnnstr := fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&parseTime=true&loc=UTC&allowNativePasswords=true",
 		datasource.User,
 		datasource.User,
@@ -42,85 +27,35 @@ func NewMysqlQueryEndpoint(datasource *models.DataSource) (tsdb.TsdbQueryEndpoin
 		datasource.Url,
 		datasource.Url,
 		datasource.Database,
 		datasource.Database,
 	)
 	)
-	endpoint.log.Debug("getEngine", "connection", cnnstr)
+	logger.Debug("getEngine", "connection", cnnstr)
 
 
-	if err := endpoint.sqlEngine.InitEngine("mysql", datasource, cnnstr); err != nil {
-		return nil, err
+	config := tsdb.SqlQueryEndpointConfiguration{
+		DriverName:        "mysql",
+		ConnectionString:  cnnstr,
+		Datasource:        datasource,
+		TimeColumnNames:   []string{"time", "time_sec"},
+		MetricColumnTypes: []string{"CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT"},
 	}
 	}
 
 
-	return endpoint, nil
-}
-
-// Query is the main function for the MysqlExecutor
-func (e *MysqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) {
-	return e.sqlEngine.Query(ctx, dsInfo, tsdbQuery, e.transformToTimeSeries, e.transformToTable)
-}
-
-func (e MysqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
-	columnNames, err := rows.Columns()
-	columnCount := len(columnNames)
-
-	if err != nil {
-		return err
+	rowTransformer := mysqlRowTransformer{
+		log: logger,
 	}
 	}
 
 
-	table := &tsdb.Table{
-		Columns: make([]tsdb.TableColumn, columnCount),
-		Rows:    make([]tsdb.RowValues, 0),
-	}
-
-	for i, name := range columnNames {
-		table.Columns[i].Text = name
-	}
-
-	rowLimit := 1000000
-	rowCount := 0
-	timeIndex := -1
-
-	// check if there is a column named time
-	for i, col := range columnNames {
-		switch col {
-		case "time", "time_sec":
-			timeIndex = i
-		}
-	}
-
-	for ; rows.Next(); rowCount++ {
-		if rowCount > rowLimit {
-			return fmt.Errorf("MySQL query row limit exceeded, limit %d", rowLimit)
-		}
-
-		values, err := e.getTypedRowData(rows)
-		if err != nil {
-			return err
-		}
-
-		// converts column named time to unix timestamp in milliseconds to make
-		// native mysql datetime types and epoch dates work in
-		// annotation and table queries.
-		tsdb.ConvertSqlTimeColumnToEpochMs(values, timeIndex)
-
-		table.Rows = append(table.Rows, values)
-	}
-
-	result.Tables = append(result.Tables, table)
-	result.Meta.Set("rowCount", rowCount)
-	return nil
+	return tsdb.NewSqlQueryEndpoint(&config, &rowTransformer, newMysqlMacroEngine(), logger)
 }
 }
 
 
-func (e MysqlQueryEndpoint) getTypedRowData(rows *core.Rows) (tsdb.RowValues, error) {
-	types, err := rows.ColumnTypes()
-	if err != nil {
-		return nil, err
-	}
+type mysqlRowTransformer struct {
+	log log.Logger
+}
 
 
-	values := make([]interface{}, len(types))
+func (t *mysqlRowTransformer) Transform(columnTypes []*sql.ColumnType, rows *core.Rows) (tsdb.RowValues, error) {
+	values := make([]interface{}, len(columnTypes))
 
 
 	for i := range values {
 	for i := range values {
-		scanType := types[i].ScanType()
+		scanType := columnTypes[i].ScanType()
 		values[i] = reflect.New(scanType).Interface()
 		values[i] = reflect.New(scanType).Interface()
 
 
-		if types[i].DatabaseTypeName() == "BIT" {
+		if columnTypes[i].DatabaseTypeName() == "BIT" {
 			values[i] = new([]byte)
 			values[i] = new([]byte)
 		}
 		}
 	}
 	}
@@ -129,7 +64,7 @@ func (e MysqlQueryEndpoint) getTypedRowData(rows *core.Rows) (tsdb.RowValues, er
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	for i := 0; i < len(types); i++ {
+	for i := 0; i < len(columnTypes); i++ {
 		typeName := reflect.ValueOf(values[i]).Type().String()
 		typeName := reflect.ValueOf(values[i]).Type().String()
 
 
 		switch typeName {
 		switch typeName {
@@ -158,7 +93,7 @@ func (e MysqlQueryEndpoint) getTypedRowData(rows *core.Rows) (tsdb.RowValues, er
 			}
 			}
 		}
 		}
 
 
-		if types[i].DatabaseTypeName() == "DECIMAL" {
+		if columnTypes[i].DatabaseTypeName() == "DECIMAL" {
 			f, err := strconv.ParseFloat(values[i].(string), 64)
 			f, err := strconv.ParseFloat(values[i].(string), 64)
 
 
 			if err == nil {
 			if err == nil {
@@ -171,159 +106,3 @@ func (e MysqlQueryEndpoint) getTypedRowData(rows *core.Rows) (tsdb.RowValues, er
 
 
 	return values, nil
 	return values, nil
 }
 }
-
-func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
-	pointsBySeries := make(map[string]*tsdb.TimeSeries)
-	seriesByQueryOrder := list.New()
-
-	columnNames, err := rows.Columns()
-	if err != nil {
-		return err
-	}
-
-	columnTypes, err := rows.ColumnTypes()
-	if err != nil {
-		return err
-	}
-
-	rowLimit := 1000000
-	rowCount := 0
-	timeIndex := -1
-	metricIndex := -1
-
-	// check columns of resultset: a column named time is mandatory
-	// the first text column is treated as metric name unless a column named metric is present
-	for i, col := range columnNames {
-		switch col {
-		case "time", "time_sec":
-			timeIndex = i
-		case "metric":
-			metricIndex = i
-		default:
-			if metricIndex == -1 {
-				switch columnTypes[i].DatabaseTypeName() {
-				case "CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT":
-					metricIndex = i
-				}
-			}
-		}
-	}
-
-	if timeIndex == -1 {
-		return fmt.Errorf("Found no column named time or time_sec")
-	}
-
-	fillMissing := query.Model.Get("fill").MustBool(false)
-	var fillInterval float64
-	fillValue := null.Float{}
-	if fillMissing {
-		fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000
-		if !query.Model.Get("fillNull").MustBool(false) {
-			fillValue.Float64 = query.Model.Get("fillValue").MustFloat64()
-			fillValue.Valid = true
-		}
-	}
-
-	for rows.Next() {
-		var timestamp float64
-		var value null.Float
-		var metric string
-
-		if rowCount > rowLimit {
-			return fmt.Errorf("PostgreSQL query row limit exceeded, limit %d", rowLimit)
-		}
-
-		values, err := e.getTypedRowData(rows)
-		if err != nil {
-			return err
-		}
-
-		// converts column named time to unix timestamp in milliseconds to make
-		// native mysql datetime types and epoch dates work in
-		// annotation and table queries.
-		tsdb.ConvertSqlTimeColumnToEpochMs(values, timeIndex)
-
-		switch columnValue := values[timeIndex].(type) {
-		case int64:
-			timestamp = float64(columnValue)
-		case float64:
-			timestamp = columnValue
-		default:
-			return fmt.Errorf("Invalid type for column time/time_sec, must be of type timestamp or unix timestamp, got: %T %v", columnValue, columnValue)
-		}
-
-		if metricIndex >= 0 {
-			if columnValue, ok := values[metricIndex].(string); ok {
-				metric = columnValue
-			} else {
-				return fmt.Errorf("Column metric must be of type char,varchar or text, got: %T %v", values[metricIndex], values[metricIndex])
-			}
-		}
-
-		for i, col := range columnNames {
-			if i == timeIndex || i == metricIndex {
-				continue
-			}
-
-			if value, err = tsdb.ConvertSqlValueColumnToFloat(col, values[i]); err != nil {
-				return err
-			}
-
-			if metricIndex == -1 {
-				metric = col
-			}
-
-			series, exist := pointsBySeries[metric]
-			if !exist {
-				series = &tsdb.TimeSeries{Name: metric}
-				pointsBySeries[metric] = series
-				seriesByQueryOrder.PushBack(metric)
-			}
-
-			if fillMissing {
-				var intervalStart float64
-				if !exist {
-					intervalStart = float64(tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6)
-				} else {
-					intervalStart = series.Points[len(series.Points)-1][1].Float64 + fillInterval
-				}
-
-				// align interval start
-				intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
-
-				for i := intervalStart; i < timestamp; i += fillInterval {
-					series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
-					rowCount++
-				}
-			}
-
-			series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)})
-
-			e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value)
-			rowCount++
-
-		}
-	}
-
-	for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() {
-		key := elem.Value.(string)
-		result.Series = append(result.Series, pointsBySeries[key])
-
-		if fillMissing {
-			series := pointsBySeries[key]
-			// fill in values from last fetched value till interval end
-			intervalStart := series.Points[len(series.Points)-1][1].Float64
-			intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6)
-
-			// align interval start
-			intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
-			for i := intervalStart + fillInterval; i < intervalEnd; i += fillInterval {
-				series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
-				rowCount++
-			}
-		}
-	}
-
-	result.Meta.Set("rowCount", rowCount)
-	return nil
-}

+ 19 - 11
pkg/tsdb/mysql/mysql_test.go

@@ -8,8 +8,9 @@ import (
 	"time"
 	"time"
 
 
 	"github.com/go-xorm/xorm"
 	"github.com/go-xorm/xorm"
+	"github.com/grafana/grafana/pkg/components/securejsondata"
 	"github.com/grafana/grafana/pkg/components/simplejson"
 	"github.com/grafana/grafana/pkg/components/simplejson"
-	"github.com/grafana/grafana/pkg/log"
+	"github.com/grafana/grafana/pkg/models"
 	"github.com/grafana/grafana/pkg/services/sqlstore"
 	"github.com/grafana/grafana/pkg/services/sqlstore"
 	"github.com/grafana/grafana/pkg/services/sqlstore/sqlutil"
 	"github.com/grafana/grafana/pkg/services/sqlstore/sqlutil"
 	"github.com/grafana/grafana/pkg/tsdb"
 	"github.com/grafana/grafana/pkg/tsdb"
@@ -21,8 +22,9 @@ import (
 // The tests require a MySQL db named grafana_ds_tests and a user/password grafana/password
 // The tests require a MySQL db named grafana_ds_tests and a user/password grafana/password
 // Use the docker/blocks/mysql_tests/docker-compose.yaml to spin up a
 // Use the docker/blocks/mysql_tests/docker-compose.yaml to spin up a
 // preconfigured MySQL server suitable for running these tests.
 // preconfigured MySQL server suitable for running these tests.
-// There is also a dashboard.json in same directory that you can import to Grafana
-// once you've created a datasource for the test server/database.
+// There is also a datasource and dashboard provisioned by devenv scripts that you can
+// use to verify that the generated data are vizualized as expected, see
+// devenv/README.md for setup instructions.
 func TestMySQL(t *testing.T) {
 func TestMySQL(t *testing.T) {
 	// change to true to run the MySQL tests
 	// change to true to run the MySQL tests
 	runMySqlTests := false
 	runMySqlTests := false
@@ -35,19 +37,25 @@ func TestMySQL(t *testing.T) {
 	Convey("MySQL", t, func() {
 	Convey("MySQL", t, func() {
 		x := InitMySQLTestDB(t)
 		x := InitMySQLTestDB(t)
 
 
-		endpoint := &MysqlQueryEndpoint{
-			sqlEngine: &tsdb.DefaultSqlEngine{
-				MacroEngine: NewMysqlMacroEngine(),
-				XormEngine:  x,
-			},
-			log: log.New("tsdb.mysql"),
+		origXormEngine := tsdb.NewXormEngine
+		tsdb.NewXormEngine = func(d, c string) (*xorm.Engine, error) {
+			return x, nil
 		}
 		}
 
 
-		sess := x.NewSession()
-		defer sess.Close()
+		endpoint, err := newMysqlQueryEndpoint(&models.DataSource{
+			JsonData:       simplejson.New(),
+			SecureJsonData: securejsondata.SecureJsonData{},
+		})
+		So(err, ShouldBeNil)
 
 
+		sess := x.NewSession()
 		fromStart := time.Date(2018, 3, 15, 13, 0, 0, 0, time.UTC)
 		fromStart := time.Date(2018, 3, 15, 13, 0, 0, 0, time.UTC)
 
 
+		Reset(func() {
+			sess.Close()
+			tsdb.NewXormEngine = origXormEngine
+		})
+
 		Convey("Given a table with different native data types", func() {
 		Convey("Given a table with different native data types", func() {
 			if exists, err := sess.IsTableExist("mysql_types"); err != nil || exists {
 			if exists, err := sess.IsTableExist("mysql_types"); err != nil || exists {
 				So(err, ShouldBeNil)
 				So(err, ShouldBeNil)