|
|
@@ -99,40 +99,49 @@ func (e *MysqlExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, co
|
|
|
|
|
|
defer rows.Close()
|
|
|
|
|
|
- result.QueryResults[query.RefId] = e.TransformToTimeSeries(query, rows)
|
|
|
+ res, err := e.TransformToTimeSeries(query, rows)
|
|
|
+ if err != nil {
|
|
|
+ result.Error = err
|
|
|
+ return result
|
|
|
+ }
|
|
|
+
|
|
|
+ result.QueryResults[query.RefId] = &tsdb.QueryResult{RefId: query.RefId, Series: res}
|
|
|
}
|
|
|
|
|
|
return result
|
|
|
}
|
|
|
|
|
|
-func (e MysqlExecutor) TransformToTimeSeries(query *tsdb.Query, rows *core.Rows) *tsdb.QueryResult {
|
|
|
- result := &tsdb.QueryResult{RefId: query.RefId}
|
|
|
+func (e MysqlExecutor) TransformToTimeSeries(query *tsdb.Query, rows *core.Rows) (tsdb.TimeSeriesSlice, error) {
|
|
|
pointsBySeries := make(map[string]*tsdb.TimeSeries)
|
|
|
columnNames, err := rows.Columns()
|
|
|
|
|
|
if err != nil {
|
|
|
- result.Error = err
|
|
|
- return result
|
|
|
+ return nil, err
|
|
|
}
|
|
|
|
|
|
rowData := NewStringStringScan(columnNames)
|
|
|
- for rows.Next() {
|
|
|
+ rowLimit := 1000000
|
|
|
+ rowCount := 0
|
|
|
+
|
|
|
+ for ; rows.Next(); rowCount += 1 {
|
|
|
+ if rowCount > rowLimit {
|
|
|
+ return nil, fmt.Errorf("MySQL query row limit exceeded, limit %d", rowLimit)
|
|
|
+ }
|
|
|
+
|
|
|
err := rowData.Update(rows.Rows)
|
|
|
if err != nil {
|
|
|
- e.log.Error("Mysql response parsing", "error", err)
|
|
|
- result.Error = err
|
|
|
- return result
|
|
|
+ e.log.Error("MySQL response parsing", "error", err)
|
|
|
+ return nil, fmt.Errorf("MySQL response parsing error %v", err)
|
|
|
}
|
|
|
|
|
|
if rowData.metric == "" {
|
|
|
rowData.metric = "Unknown"
|
|
|
}
|
|
|
|
|
|
- e.log.Info("Rows", "metric", rowData.metric, "time", rowData.time, "value", rowData.value)
|
|
|
+ //e.log.Debug("Rows", "metric", rowData.metric, "time", rowData.time, "value", rowData.value)
|
|
|
|
|
|
if !rowData.time.Valid {
|
|
|
- result.Error = fmt.Errorf("Found row with no time value")
|
|
|
- return result
|
|
|
+ return nil, fmt.Errorf("Found row with no time value")
|
|
|
}
|
|
|
|
|
|
if series, exist := pointsBySeries[rowData.metric]; exist {
|
|
|
@@ -144,11 +153,13 @@ func (e MysqlExecutor) TransformToTimeSeries(query *tsdb.Query, rows *core.Rows)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ seriesList := make(tsdb.TimeSeriesSlice, 0)
|
|
|
for _, value := range pointsBySeries {
|
|
|
- result.Series = append(result.Series, value)
|
|
|
+ seriesList = append(seriesList, value)
|
|
|
}
|
|
|
|
|
|
- return result
|
|
|
+ e.log.Debug("TransformToTimeSeries", "rowCount", rowCount, "timeSeriesCount", len(seriesList))
|
|
|
+ return seriesList, nil
|
|
|
}
|
|
|
|
|
|
type stringStringScan struct {
|