|
|
@@ -1,4 +1,4 @@
|
|
|
-package tsdb
|
|
|
+package sqleng
|
|
|
|
|
|
import (
|
|
|
"container/list"
|
|
|
@@ -13,6 +13,7 @@ import (
|
|
|
"time"
|
|
|
|
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
|
|
+ "github.com/grafana/grafana/pkg/tsdb"
|
|
|
|
|
|
"github.com/grafana/grafana/pkg/components/null"
|
|
|
|
|
|
@@ -25,12 +26,12 @@ import (
|
|
|
// SqlMacroEngine interpolates macros into sql. It takes in the Query to have access to query context and
|
|
|
// timeRange to be able to generate queries that use from and to.
|
|
|
type SqlMacroEngine interface {
|
|
|
- Interpolate(query *Query, timeRange *TimeRange, sql string) (string, error)
|
|
|
+ Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error)
|
|
|
}
|
|
|
|
|
|
// SqlTableRowTransformer transforms a query result row to RowValues with proper types.
|
|
|
type SqlTableRowTransformer interface {
|
|
|
- Transform(columnTypes []*sql.ColumnType, rows *core.Rows) (RowValues, error)
|
|
|
+ Transform(columnTypes []*sql.ColumnType, rows *core.Rows) (tsdb.RowValues, error)
|
|
|
}
|
|
|
|
|
|
type engineCacheType struct {
|
|
|
@@ -44,7 +45,7 @@ var engineCache = engineCacheType{
|
|
|
versions: make(map[int64]int),
|
|
|
}
|
|
|
|
|
|
-var sqlIntervalCalculator = NewIntervalCalculator(nil)
|
|
|
+var sqlIntervalCalculator = tsdb.NewIntervalCalculator(nil)
|
|
|
|
|
|
var NewXormEngine = func(driverName string, connectionString string) (*xorm.Engine, error) {
|
|
|
return xorm.NewEngine(driverName, connectionString)
|
|
|
@@ -67,7 +68,7 @@ type SqlQueryEndpointConfiguration struct {
|
|
|
MetricColumnTypes []string
|
|
|
}
|
|
|
|
|
|
-var NewSqlQueryEndpoint = func(config *SqlQueryEndpointConfiguration, rowTransformer SqlTableRowTransformer, macroEngine SqlMacroEngine, log log.Logger) (TsdbQueryEndpoint, error) {
|
|
|
+var NewSqlQueryEndpoint = func(config *SqlQueryEndpointConfiguration, rowTransformer SqlTableRowTransformer, macroEngine SqlMacroEngine, log log.Logger) (tsdb.TsdbQueryEndpoint, error) {
|
|
|
queryEndpoint := sqlQueryEndpoint{
|
|
|
rowTransformer: rowTransformer,
|
|
|
macroEngine: macroEngine,
|
|
|
@@ -115,9 +116,9 @@ var NewSqlQueryEndpoint = func(config *SqlQueryEndpointConfiguration, rowTransfo
|
|
|
const rowLimit = 1000000
|
|
|
|
|
|
// Query is the main function for the SqlQueryEndpoint
|
|
|
-func (e *sqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *TsdbQuery) (*Response, error) {
|
|
|
- result := &Response{
|
|
|
- Results: make(map[string]*QueryResult),
|
|
|
+func (e *sqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) {
|
|
|
+ result := &tsdb.Response{
|
|
|
+ Results: make(map[string]*tsdb.QueryResult),
|
|
|
}
|
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
@@ -128,7 +129,7 @@ func (e *sqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource,
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- queryResult := &QueryResult{Meta: simplejson.New(), RefId: query.RefId}
|
|
|
+ queryResult := &tsdb.QueryResult{Meta: simplejson.New(), RefId: query.RefId}
|
|
|
result.Results[query.RefId] = queryResult
|
|
|
|
|
|
// global substitutions
|
|
|
@@ -149,7 +150,7 @@ func (e *sqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource,
|
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
|
- go func(rawSQL string, query *Query, queryResult *QueryResult) {
|
|
|
+ go func(rawSQL string, query *tsdb.Query, queryResult *tsdb.QueryResult) {
|
|
|
defer wg.Done()
|
|
|
session := e.engine.NewSession()
|
|
|
defer session.Close()
|
|
|
@@ -187,8 +188,8 @@ func (e *sqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource,
|
|
|
}
|
|
|
|
|
|
// global macros/substitutions for all sql datasources
|
|
|
-var Interpolate = func(query *Query, timeRange *TimeRange, sql string) (string, error) {
|
|
|
- minInterval, err := GetIntervalFrom(query.DataSource, query.Model, time.Second*60)
|
|
|
+var Interpolate = func(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) {
|
|
|
+ minInterval, err := tsdb.GetIntervalFrom(query.DataSource, query.Model, time.Second*60)
|
|
|
if err != nil {
|
|
|
return sql, nil
|
|
|
}
|
|
|
@@ -202,7 +203,7 @@ var Interpolate = func(query *Query, timeRange *TimeRange, sql string) (string,
|
|
|
return sql, nil
|
|
|
}
|
|
|
|
|
|
-func (e *sqlQueryEndpoint) transformToTable(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error {
|
|
|
+func (e *sqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
|
|
|
columnNames, err := rows.Columns()
|
|
|
columnCount := len(columnNames)
|
|
|
|
|
|
@@ -213,9 +214,9 @@ func (e *sqlQueryEndpoint) transformToTable(query *Query, rows *core.Rows, resul
|
|
|
rowCount := 0
|
|
|
timeIndex := -1
|
|
|
|
|
|
- table := &Table{
|
|
|
- Columns: make([]TableColumn, columnCount),
|
|
|
- Rows: make([]RowValues, 0),
|
|
|
+ table := &tsdb.Table{
|
|
|
+ Columns: make([]tsdb.TableColumn, columnCount),
|
|
|
+ Rows: make([]tsdb.RowValues, 0),
|
|
|
}
|
|
|
|
|
|
for i, name := range columnNames {
|
|
|
@@ -256,8 +257,8 @@ func (e *sqlQueryEndpoint) transformToTable(query *Query, rows *core.Rows, resul
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (e *sqlQueryEndpoint) transformToTimeSeries(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error {
|
|
|
- pointsBySeries := make(map[string]*TimeSeries)
|
|
|
+func (e *sqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
|
|
|
+ pointsBySeries := make(map[string]*tsdb.TimeSeries)
|
|
|
seriesByQueryOrder := list.New()
|
|
|
|
|
|
columnNames, err := rows.Columns()
|
|
|
@@ -385,7 +386,7 @@ func (e *sqlQueryEndpoint) transformToTimeSeries(query *Query, rows *core.Rows,
|
|
|
|
|
|
series, exist := pointsBySeries[metric]
|
|
|
if !exist {
|
|
|
- series = &TimeSeries{Name: metric}
|
|
|
+ series = &tsdb.TimeSeries{Name: metric}
|
|
|
pointsBySeries[metric] = series
|
|
|
seriesByQueryOrder.PushBack(metric)
|
|
|
}
|
|
|
@@ -410,12 +411,12 @@ func (e *sqlQueryEndpoint) transformToTimeSeries(query *Query, rows *core.Rows,
|
|
|
intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
|
|
|
|
|
|
for i := intervalStart; i < timestamp; i += fillInterval {
|
|
|
- series.Points = append(series.Points, TimePoint{fillValue, null.FloatFrom(i)})
|
|
|
+ series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
|
|
|
rowCount++
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- series.Points = append(series.Points, TimePoint{value, null.FloatFrom(timestamp)})
|
|
|
+ series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)})
|
|
|
|
|
|
e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value)
|
|
|
}
|
|
|
@@ -442,7 +443,7 @@ func (e *sqlQueryEndpoint) transformToTimeSeries(query *Query, rows *core.Rows,
|
|
|
// align interval start
|
|
|
intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
|
|
|
for i := intervalStart + fillInterval; i < intervalEnd; i += fillInterval {
|
|
|
- series.Points = append(series.Points, TimePoint{fillValue, null.FloatFrom(i)})
|
|
|
+ series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
|
|
|
rowCount++
|
|
|
}
|
|
|
}
|
|
|
@@ -454,7 +455,7 @@ func (e *sqlQueryEndpoint) transformToTimeSeries(query *Query, rows *core.Rows,
|
|
|
|
|
|
// ConvertSqlTimeColumnToEpochMs converts column named time to unix timestamp in milliseconds
|
|
|
// to make native datetime types and epoch dates work in annotation and table queries.
|
|
|
-func ConvertSqlTimeColumnToEpochMs(values RowValues, timeIndex int) {
|
|
|
+func ConvertSqlTimeColumnToEpochMs(values tsdb.RowValues, timeIndex int) {
|
|
|
if timeIndex >= 0 {
|
|
|
switch value := values[timeIndex].(type) {
|
|
|
case time.Time:
|
|
|
@@ -464,40 +465,40 @@ func ConvertSqlTimeColumnToEpochMs(values RowValues, timeIndex int) {
|
|
|
values[timeIndex] = float64((*value).UnixNano()) / float64(time.Millisecond)
|
|
|
}
|
|
|
case int64:
|
|
|
- values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
|
|
|
+ values[timeIndex] = int64(tsdb.EpochPrecisionToMs(float64(value)))
|
|
|
case *int64:
|
|
|
if value != nil {
|
|
|
- values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
|
|
|
+ values[timeIndex] = int64(tsdb.EpochPrecisionToMs(float64(*value)))
|
|
|
}
|
|
|
case uint64:
|
|
|
- values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
|
|
|
+ values[timeIndex] = int64(tsdb.EpochPrecisionToMs(float64(value)))
|
|
|
case *uint64:
|
|
|
if value != nil {
|
|
|
- values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
|
|
|
+ values[timeIndex] = int64(tsdb.EpochPrecisionToMs(float64(*value)))
|
|
|
}
|
|
|
case int32:
|
|
|
- values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
|
|
|
+ values[timeIndex] = int64(tsdb.EpochPrecisionToMs(float64(value)))
|
|
|
case *int32:
|
|
|
if value != nil {
|
|
|
- values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
|
|
|
+ values[timeIndex] = int64(tsdb.EpochPrecisionToMs(float64(*value)))
|
|
|
}
|
|
|
case uint32:
|
|
|
- values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
|
|
|
+ values[timeIndex] = int64(tsdb.EpochPrecisionToMs(float64(value)))
|
|
|
case *uint32:
|
|
|
if value != nil {
|
|
|
- values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
|
|
|
+ values[timeIndex] = int64(tsdb.EpochPrecisionToMs(float64(*value)))
|
|
|
}
|
|
|
case float64:
|
|
|
- values[timeIndex] = EpochPrecisionToMs(value)
|
|
|
+ values[timeIndex] = tsdb.EpochPrecisionToMs(value)
|
|
|
case *float64:
|
|
|
if value != nil {
|
|
|
- values[timeIndex] = EpochPrecisionToMs(*value)
|
|
|
+ values[timeIndex] = tsdb.EpochPrecisionToMs(*value)
|
|
|
}
|
|
|
case float32:
|
|
|
- values[timeIndex] = EpochPrecisionToMs(float64(value))
|
|
|
+ values[timeIndex] = tsdb.EpochPrecisionToMs(float64(value))
|
|
|
case *float32:
|
|
|
if value != nil {
|
|
|
- values[timeIndex] = EpochPrecisionToMs(float64(*value))
|
|
|
+ values[timeIndex] = tsdb.EpochPrecisionToMs(float64(*value))
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -609,7 +610,7 @@ func ConvertSqlValueColumnToFloat(columnName string, columnValue interface{}) (n
|
|
|
return value, nil
|
|
|
}
|
|
|
|
|
|
-func SetupFillmode(query *Query, interval time.Duration, fillmode string) error {
|
|
|
+func SetupFillmode(query *tsdb.Query, interval time.Duration, fillmode string) error {
|
|
|
query.Model.Set("fill", true)
|
|
|
query.Model.Set("fillInterval", interval.Seconds())
|
|
|
switch fillmode {
|