|
|
@@ -7,6 +7,7 @@ import (
|
|
|
"strconv"
|
|
|
"sync"
|
|
|
|
|
|
+ "github.com/go-sql-driver/mysql"
|
|
|
"github.com/go-xorm/core"
|
|
|
"github.com/go-xorm/xorm"
|
|
|
"github.com/grafana/grafana/pkg/components/null"
|
|
|
@@ -114,34 +115,97 @@ func (e *MysqlExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, co
|
|
|
|
|
|
format := query.Model.Get("format").MustString("time_series")
|
|
|
|
|
|
- if format == "time_series" {
|
|
|
- res, err := e.TransformToTimeSeries(query, rows)
|
|
|
+ switch format {
|
|
|
+ case "time_series":
|
|
|
+ err := e.TransformToTimeSeries(query, rows, queryResult)
|
|
|
if err != nil {
|
|
|
queryResult.Error = err
|
|
|
- return result
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ case "table":
|
|
|
+ err := e.TransformToTable(query, rows, queryResult)
|
|
|
+ if err != nil {
|
|
|
+ queryResult.Error = err
|
|
|
+ continue
|
|
|
}
|
|
|
-
|
|
|
- queryResult.Series = res
|
|
|
- queryResult.Meta.Set("rowCount", countPointsInAllSeries(res))
|
|
|
}
|
|
|
}
|
|
|
|
|
|
return result
|
|
|
}
|
|
|
|
|
|
-func countPointsInAllSeries(seriesList tsdb.TimeSeriesSlice) (count int) {
|
|
|
- for _, series := range seriesList {
|
|
|
- count += len(series.Points)
|
|
|
+func (e MysqlExecutor) TransformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult) error {
|
|
|
+ columnNames, err := rows.Columns()
|
|
|
+ columnCount := len(columnNames)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
}
|
|
|
- return count
|
|
|
+
|
|
|
+ table := &tsdb.Table{
|
|
|
+ Columns: make([]tsdb.TableColumn, columnCount),
|
|
|
+ Rows: make([]tsdb.RowValues, 0),
|
|
|
+ }
|
|
|
+
|
|
|
+ for i, name := range columnNames {
|
|
|
+ table.Columns[i].Text = name
|
|
|
+ }
|
|
|
+
|
|
|
+ columnTypes, err := rows.ColumnTypes()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ rowLimit := 1000000
|
|
|
+ rowCount := 0
|
|
|
+
|
|
|
+ for ; rows.Next(); rowCount += 1 {
|
|
|
+ if rowCount > rowLimit {
|
|
|
+ return fmt.Errorf("MySQL query row limit exceeded, limit %d", rowLimit)
|
|
|
+ }
|
|
|
+
|
|
|
+ values, err := e.getTypedRowData(columnTypes, rows)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ table.Rows = append(table.Rows, values)
|
|
|
+ }
|
|
|
+
|
|
|
+ result.Tables = append(result.Tables, table)
|
|
|
+ result.Meta.Set("rowCount", rowCount)
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (e MysqlExecutor) getTypedRowData(types []*sql.ColumnType, rows *core.Rows) (tsdb.RowValues, error) {
|
|
|
+ values := make([]interface{}, len(types))
|
|
|
+
|
|
|
+ for i, stype := range types {
|
|
|
+ switch stype.DatabaseTypeName() {
|
|
|
+ case mysql.FieldTypeNameVarString:
|
|
|
+ values[i] = new(string)
|
|
|
+ case mysql.FieldTypeNameLongLong:
|
|
|
+ values[i] = new(int64)
|
|
|
+ case mysql.FieldTypeNameDouble:
|
|
|
+ values[i] = new(float64)
|
|
|
+ default:
|
|
|
+ return nil, fmt.Errorf("Database type %s not supported", stype)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := rows.Scan(values...); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ return values, nil
|
|
|
}
|
|
|
|
|
|
-func (e MysqlExecutor) TransformToTimeSeries(query *tsdb.Query, rows *core.Rows) (tsdb.TimeSeriesSlice, error) {
|
|
|
+func (e MysqlExecutor) TransformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult) error {
|
|
|
pointsBySeries := make(map[string]*tsdb.TimeSeries)
|
|
|
columnNames, err := rows.Columns()
|
|
|
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return err
|
|
|
}
|
|
|
|
|
|
rowData := NewStringStringScan(columnNames)
|
|
|
@@ -150,13 +214,13 @@ func (e MysqlExecutor) TransformToTimeSeries(query *tsdb.Query, rows *core.Rows)
|
|
|
|
|
|
for ; rows.Next(); rowCount += 1 {
|
|
|
if rowCount > rowLimit {
|
|
|
- return nil, fmt.Errorf("MySQL query row limit exceeded, limit %d", rowLimit)
|
|
|
+ return fmt.Errorf("MySQL query row limit exceeded, limit %d", rowLimit)
|
|
|
}
|
|
|
|
|
|
err := rowData.Update(rows.Rows)
|
|
|
if err != nil {
|
|
|
e.log.Error("MySQL response parsing", "error", err)
|
|
|
- return nil, fmt.Errorf("MySQL response parsing error %v", err)
|
|
|
+ return fmt.Errorf("MySQL response parsing error %v", err)
|
|
|
}
|
|
|
|
|
|
if rowData.metric == "" {
|
|
|
@@ -166,7 +230,7 @@ func (e MysqlExecutor) TransformToTimeSeries(query *tsdb.Query, rows *core.Rows)
|
|
|
//e.log.Debug("Rows", "metric", rowData.metric, "time", rowData.time, "value", rowData.value)
|
|
|
|
|
|
if !rowData.time.Valid {
|
|
|
- return nil, fmt.Errorf("Found row with no time value")
|
|
|
+ return fmt.Errorf("Found row with no time value")
|
|
|
}
|
|
|
|
|
|
if series, exist := pointsBySeries[rowData.metric]; exist {
|
|
|
@@ -178,13 +242,12 @@ func (e MysqlExecutor) TransformToTimeSeries(query *tsdb.Query, rows *core.Rows)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- seriesList := make(tsdb.TimeSeriesSlice, 0)
|
|
|
for _, value := range pointsBySeries {
|
|
|
- seriesList = append(seriesList, value)
|
|
|
+ result.Series = append(result.Series, value)
|
|
|
}
|
|
|
|
|
|
- e.log.Debug("TransformToTimeSeries", "rowCount", rowCount, "timeSeriesCount", len(seriesList))
|
|
|
- return seriesList, nil
|
|
|
+ result.Meta.Set("rowCount", rowCount)
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
type stringStringScan struct {
|