| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297 |
- package tsdb
- import (
- "context"
- "fmt"
- "sync"
- "time"
- "github.com/grafana/grafana/pkg/components/null"
- "github.com/go-xorm/core"
- "github.com/go-xorm/xorm"
- "github.com/grafana/grafana/pkg/components/simplejson"
- "github.com/grafana/grafana/pkg/models"
- )
- // SqlEngine is a wrapper class around xorm for relational database data sources.
- type SqlEngine interface {
- InitEngine(driverName string, dsInfo *models.DataSource, cnnstr string) error
- Query(
- ctx context.Context,
- ds *models.DataSource,
- query *TsdbQuery,
- transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
- transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
- ) (*Response, error)
- }
- // SqlMacroEngine interpolates macros into sql. It takes in the Query to have access to query context and
- // timeRange to be able to generate queries that use from and to.
- type SqlMacroEngine interface {
- Interpolate(query *Query, timeRange *TimeRange, sql string) (string, error)
- }
- type DefaultSqlEngine struct {
- MacroEngine SqlMacroEngine
- XormEngine *xorm.Engine
- }
- type engineCacheType struct {
- cache map[int64]*xorm.Engine
- versions map[int64]int
- sync.Mutex
- }
- var engineCache = engineCacheType{
- cache: make(map[int64]*xorm.Engine),
- versions: make(map[int64]int),
- }
- // InitEngine creates the db connection and inits the xorm engine or loads it from the engine cache
- func (e *DefaultSqlEngine) InitEngine(driverName string, dsInfo *models.DataSource, cnnstr string) error {
- engineCache.Lock()
- defer engineCache.Unlock()
- if engine, present := engineCache.cache[dsInfo.Id]; present {
- if version := engineCache.versions[dsInfo.Id]; version == dsInfo.Version {
- e.XormEngine = engine
- return nil
- }
- }
- engine, err := xorm.NewEngine(driverName, cnnstr)
- if err != nil {
- return err
- }
- engine.SetMaxOpenConns(10)
- engine.SetMaxIdleConns(10)
- engineCache.versions[dsInfo.Id] = dsInfo.Version
- engineCache.cache[dsInfo.Id] = engine
- e.XormEngine = engine
- return nil
- }
- // Query is a default implementation of the Query method for an SQL data source.
- // The caller of this function must implement transformToTimeSeries and transformToTable and
- // pass them in as parameters.
- func (e *DefaultSqlEngine) Query(
- ctx context.Context,
- dsInfo *models.DataSource,
- tsdbQuery *TsdbQuery,
- transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
- transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
- ) (*Response, error) {
- result := &Response{
- Results: make(map[string]*QueryResult),
- }
- session := e.XormEngine.NewSession()
- defer session.Close()
- db := session.DB()
- for _, query := range tsdbQuery.Queries {
- rawSql := query.Model.Get("rawSql").MustString()
- if rawSql == "" {
- continue
- }
- queryResult := &QueryResult{Meta: simplejson.New(), RefId: query.RefId}
- result.Results[query.RefId] = queryResult
- rawSql, err := e.MacroEngine.Interpolate(query, tsdbQuery.TimeRange, rawSql)
- if err != nil {
- queryResult.Error = err
- continue
- }
- queryResult.Meta.Set("sql", rawSql)
- rows, err := db.Query(rawSql)
- if err != nil {
- queryResult.Error = err
- continue
- }
- defer rows.Close()
- format := query.Model.Get("format").MustString("time_series")
- switch format {
- case "time_series":
- err := transformToTimeSeries(query, rows, queryResult, tsdbQuery)
- if err != nil {
- queryResult.Error = err
- continue
- }
- case "table":
- err := transformToTable(query, rows, queryResult, tsdbQuery)
- if err != nil {
- queryResult.Error = err
- continue
- }
- }
- }
- return result, nil
- }
- // ConvertSqlTimeColumnToEpochMs converts column named time to unix timestamp in milliseconds
- // to make native datetime types and epoch dates work in annotation and table queries.
- func ConvertSqlTimeColumnToEpochMs(values RowValues, timeIndex int) {
- if timeIndex >= 0 {
- switch value := values[timeIndex].(type) {
- case time.Time:
- values[timeIndex] = float64(value.UnixNano()) / float64(time.Millisecond)
- case *time.Time:
- if value != nil {
- values[timeIndex] = float64((*value).UnixNano()) / float64(time.Millisecond)
- }
- case int64:
- values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
- case *int64:
- if value != nil {
- values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
- }
- case uint64:
- values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
- case *uint64:
- if value != nil {
- values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
- }
- case int32:
- values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
- case *int32:
- if value != nil {
- values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
- }
- case uint32:
- values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
- case *uint32:
- if value != nil {
- values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
- }
- case float64:
- values[timeIndex] = EpochPrecisionToMs(value)
- case *float64:
- if value != nil {
- values[timeIndex] = EpochPrecisionToMs(*value)
- }
- case float32:
- values[timeIndex] = EpochPrecisionToMs(float64(value))
- case *float32:
- if value != nil {
- values[timeIndex] = EpochPrecisionToMs(float64(*value))
- }
- }
- }
- }
- // ConvertSqlValueColumnToFloat converts timeseries value column to float.
- func ConvertSqlValueColumnToFloat(columnName string, columnValue interface{}) (null.Float, error) {
- var value null.Float
- switch typedValue := columnValue.(type) {
- case int:
- value = null.FloatFrom(float64(typedValue))
- case *int:
- if typedValue == nil {
- value.Valid = false
- } else {
- value = null.FloatFrom(float64(*typedValue))
- }
- case int64:
- value = null.FloatFrom(float64(typedValue))
- case *int64:
- if typedValue == nil {
- value.Valid = false
- } else {
- value = null.FloatFrom(float64(*typedValue))
- }
- case int32:
- value = null.FloatFrom(float64(typedValue))
- case *int32:
- if typedValue == nil {
- value.Valid = false
- } else {
- value = null.FloatFrom(float64(*typedValue))
- }
- case int16:
- value = null.FloatFrom(float64(typedValue))
- case *int16:
- if typedValue == nil {
- value.Valid = false
- } else {
- value = null.FloatFrom(float64(*typedValue))
- }
- case int8:
- value = null.FloatFrom(float64(typedValue))
- case *int8:
- if typedValue == nil {
- value.Valid = false
- } else {
- value = null.FloatFrom(float64(*typedValue))
- }
- case uint:
- value = null.FloatFrom(float64(typedValue))
- case *uint:
- if typedValue == nil {
- value.Valid = false
- } else {
- value = null.FloatFrom(float64(*typedValue))
- }
- case uint64:
- value = null.FloatFrom(float64(typedValue))
- case *uint64:
- if typedValue == nil {
- value.Valid = false
- } else {
- value = null.FloatFrom(float64(*typedValue))
- }
- case uint32:
- value = null.FloatFrom(float64(typedValue))
- case *uint32:
- if typedValue == nil {
- value.Valid = false
- } else {
- value = null.FloatFrom(float64(*typedValue))
- }
- case uint16:
- value = null.FloatFrom(float64(typedValue))
- case *uint16:
- if typedValue == nil {
- value.Valid = false
- } else {
- value = null.FloatFrom(float64(*typedValue))
- }
- case uint8:
- value = null.FloatFrom(float64(typedValue))
- case *uint8:
- if typedValue == nil {
- value.Valid = false
- } else {
- value = null.FloatFrom(float64(*typedValue))
- }
- case float64:
- value = null.FloatFrom(typedValue)
- case *float64:
- value = null.FloatFromPtr(typedValue)
- case float32:
- value = null.FloatFrom(float64(typedValue))
- case *float32:
- if typedValue == nil {
- value.Valid = false
- } else {
- value = null.FloatFrom(float64(*typedValue))
- }
- case nil:
- value.Valid = false
- default:
- return null.NewFloat(0, false), fmt.Errorf("Value column must have numeric datatype, column: %s type: %T value: %v", columnName, typedValue, typedValue)
- }
- return value, nil
- }
|