|
|
@@ -4,9 +4,12 @@ import (
|
|
|
"context"
|
|
|
"database/sql"
|
|
|
"fmt"
|
|
|
+ "strconv"
|
|
|
"sync"
|
|
|
|
|
|
+ "github.com/go-xorm/core"
|
|
|
"github.com/go-xorm/xorm"
|
|
|
+ "github.com/grafana/grafana/pkg/components/null"
|
|
|
"github.com/grafana/grafana/pkg/log"
|
|
|
"github.com/grafana/grafana/pkg/models"
|
|
|
"github.com/grafana/grafana/pkg/tsdb"
|
|
|
@@ -74,19 +77,14 @@ func (e *MysqlExecutor) initEngine() error {
|
|
|
}
|
|
|
|
|
|
func (e *MysqlExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
|
|
|
- result := &tsdb.BatchResult{}
|
|
|
+ result := &tsdb.BatchResult{
|
|
|
+ QueryResults: make(map[string]*tsdb.QueryResult),
|
|
|
+ }
|
|
|
|
|
|
session := e.engine.NewSession()
|
|
|
defer session.Close()
|
|
|
-
|
|
|
db := session.DB()
|
|
|
|
|
|
- // queries := strings.Split(req.Query, ";")
|
|
|
- //
|
|
|
- // data := dataStruct{}
|
|
|
- // data.Results = make([]resultsStruct, 1)
|
|
|
- // data.Results[0].Series = make([]seriesStruct, 0)
|
|
|
-
|
|
|
for _, query := range queries {
|
|
|
rawSql := query.Model.Get("rawSql").MustString()
|
|
|
if rawSql == "" {
|
|
|
@@ -100,118 +98,119 @@ func (e *MysqlExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, co
|
|
|
}
|
|
|
defer rows.Close()
|
|
|
|
|
|
- columnNames, err := rows.Columns()
|
|
|
+ result.QueryResults[query.RefId] = e.TransformToTimeSeries(query, rows)
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, value := range result.QueryResults {
|
|
|
+ if value.Error != nil {
|
|
|
+ e.log.Error("error", "error", value.Error)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return result
|
|
|
+}
|
|
|
+
|
|
|
+func (e MysqlExecutor) TransformToTimeSeries(query *tsdb.Query, rows *core.Rows) *tsdb.QueryResult {
|
|
|
+ result := &tsdb.QueryResult{RefId: query.RefId}
|
|
|
+ pointsBySeries := make(map[string]*tsdb.TimeSeries)
|
|
|
+ columnNames, err := rows.Columns()
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ result.Error = err
|
|
|
+ return result
|
|
|
+ }
|
|
|
+
|
|
|
+ rowData := NewStringStringScan(columnNames)
|
|
|
+ for rows.Next() {
|
|
|
+ err := rowData.Update(rows.Rows)
|
|
|
if err != nil {
|
|
|
+ e.log.Error("Mysql response parsing", "error", err)
|
|
|
result.Error = err
|
|
|
return result
|
|
|
}
|
|
|
|
|
|
- rc := NewStringStringScan(columnNames)
|
|
|
- for rows.Next() {
|
|
|
- err := rc.Update(rows.Rows)
|
|
|
- if err != nil {
|
|
|
- e.log.Error("Mysql response parsing", "error", err)
|
|
|
- result.Error = err
|
|
|
- return result
|
|
|
- }
|
|
|
+ if rowData.metric == "" {
|
|
|
+ rowData.metric = "Unknown"
|
|
|
+ }
|
|
|
+
|
|
|
+ e.log.Info("Rows", "metric", rowData.metric, "time", rowData.time, "value", rowData.value)
|
|
|
+
|
|
|
+ if !rowData.time.Valid {
|
|
|
+ result.Error = fmt.Errorf("Found row with no time value")
|
|
|
+ return result
|
|
|
+ }
|
|
|
|
|
|
- rowValues := rc.Get()
|
|
|
- e.log.Info("Rows", "row", rowValues)
|
|
|
+ if series, exist := pointsBySeries[rowData.metric]; exist {
|
|
|
+ series.Points = append(series.Points, tsdb.TimePoint{rowData.value, rowData.time})
|
|
|
+ } else {
|
|
|
+ series := &tsdb.TimeSeries{Name: rowData.metric}
|
|
|
+ series.Points = append(series.Points, tsdb.TimePoint{rowData.value, rowData.time})
|
|
|
+ pointsBySeries[rowData.metric] = series
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- // for rows.Next() {
|
|
|
- // columnValues := make([]interface{}, len(columnNames))
|
|
|
- //
|
|
|
- // err = rows.ScanSlice(&columnValues)
|
|
|
- // if err != nil {
|
|
|
- // result.Error = err
|
|
|
- // return result
|
|
|
- // }
|
|
|
- //
|
|
|
- // // bytes -> string
|
|
|
- // for i := range columnValues {
|
|
|
- // rowType := reflect.TypeOf(columnValues[i])
|
|
|
- // e.log.Info("row", "type", rowType)
|
|
|
- //
|
|
|
- // rawValue := reflect.Indirect(reflect.ValueOf(columnValues[i]))
|
|
|
- //
|
|
|
- // // if rawValue is null then ignore
|
|
|
- // if rawValue.Interface() == nil {
|
|
|
- // continue
|
|
|
- // }
|
|
|
- //
|
|
|
- // rawValueType := reflect.TypeOf(rawValue.Interface())
|
|
|
- // vv := reflect.ValueOf(rawValue.Interface())
|
|
|
- // e.log.Info("column type", "name", columnNames[i], "type", rawValueType, "vv", vv)
|
|
|
- // }
|
|
|
- // }
|
|
|
+ for _, value := range pointsBySeries {
|
|
|
+ result.Series = append(result.Series, value)
|
|
|
}
|
|
|
|
|
|
return result
|
|
|
}
|
|
|
|
|
|
type stringStringScan struct {
|
|
|
- // cp are the column pointers
|
|
|
- cp []interface{}
|
|
|
- // row contains the final result
|
|
|
- row []string
|
|
|
- colCount int
|
|
|
- colNames []string
|
|
|
+ rowPtrs []interface{}
|
|
|
+ rowValues []string
|
|
|
+ columnNames []string
|
|
|
+ columnCount int
|
|
|
+
|
|
|
+ time null.Float
|
|
|
+ value null.Float
|
|
|
+ metric string
|
|
|
}
|
|
|
|
|
|
func NewStringStringScan(columnNames []string) *stringStringScan {
|
|
|
- lenCN := len(columnNames)
|
|
|
s := &stringStringScan{
|
|
|
- cp: make([]interface{}, lenCN),
|
|
|
- row: make([]string, lenCN*2),
|
|
|
- colCount: lenCN,
|
|
|
- colNames: columnNames,
|
|
|
+ columnCount: len(columnNames),
|
|
|
+ columnNames: columnNames,
|
|
|
+ rowPtrs: make([]interface{}, len(columnNames)),
|
|
|
+ rowValues: make([]string, len(columnNames)),
|
|
|
}
|
|
|
- j := 0
|
|
|
- for i := 0; i < lenCN; i++ {
|
|
|
- s.cp[i] = new(sql.RawBytes)
|
|
|
- s.row[j] = s.colNames[i]
|
|
|
- j = j + 2
|
|
|
+
|
|
|
+ for i := 0; i < s.columnCount; i++ {
|
|
|
+ s.rowPtrs[i] = new(sql.RawBytes)
|
|
|
}
|
|
|
+
|
|
|
return s
|
|
|
}
|
|
|
|
|
|
func (s *stringStringScan) Update(rows *sql.Rows) error {
|
|
|
- if err := rows.Scan(s.cp...); err != nil {
|
|
|
+ if err := rows.Scan(s.rowPtrs...); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- j := 0
|
|
|
- for i := 0; i < s.colCount; i++ {
|
|
|
- if rb, ok := s.cp[i].(*sql.RawBytes); ok {
|
|
|
- s.row[j+1] = string(*rb)
|
|
|
+
|
|
|
+ for i := 0; i < s.columnCount; i++ {
|
|
|
+ if rb, ok := s.rowPtrs[i].(*sql.RawBytes); ok {
|
|
|
+ s.rowValues[i] = string(*rb)
|
|
|
+ fmt.Printf("column %s = %s", s.columnNames[i], s.rowValues[i])
|
|
|
+
|
|
|
+ switch s.columnNames[i] {
|
|
|
+ case "time_sec":
|
|
|
+ if sec, err := strconv.ParseInt(s.rowValues[i], 10, 64); err == nil {
|
|
|
+ s.time = null.FloatFrom(float64(sec * 1000))
|
|
|
+ }
|
|
|
+ case "value":
|
|
|
+ if value, err := strconv.ParseFloat(s.rowValues[i], 64); err == nil {
|
|
|
+ s.value = null.FloatFrom(value)
|
|
|
+ }
|
|
|
+ case "metric":
|
|
|
+ if value, err := strconv.ParseFloat(s.rowValues[i], 64); err == nil {
|
|
|
+ s.value = null.FloatFrom(value)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
*rb = nil // reset pointer to discard current value to avoid a bug
|
|
|
} else {
|
|
|
- return fmt.Errorf("Cannot convert index %d column %s to type *sql.RawBytes", i, s.colNames[i])
|
|
|
+ return fmt.Errorf("Cannot convert index %d column %s to type *sql.RawBytes", i, s.columnNames[i])
|
|
|
}
|
|
|
- j = j + 2
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
|
-
|
|
|
-func (s *stringStringScan) Get() []string {
|
|
|
- return s.row
|
|
|
-}
|
|
|
-
|
|
|
-// type sqlDataRequest struct {
|
|
|
-// Query string `json:"query"`
|
|
|
-// Body []byte `json:"-"`
|
|
|
-// }
|
|
|
-//
|
|
|
-// type seriesStruct struct {
|
|
|
-// Columns []string `json:"columns"`
|
|
|
-// Name string `json:"name"`
|
|
|
-// Values [][]interface{} `json:"values"`
|
|
|
-// }
|
|
|
-//
|
|
|
-// type resultsStruct struct {
|
|
|
-// Series []seriesStruct `json:"series"`
|
|
|
-// }
|
|
|
-//
|
|
|
-// type dataStruct struct {
|
|
|
-// Results []resultsStruct `json:"results"`
|
|
|
-// }
|