| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- package tsdb
- import (
- "context"
- "sync"
- "github.com/go-xorm/core"
- "github.com/go-xorm/xorm"
- "github.com/grafana/grafana/pkg/components/simplejson"
- "github.com/grafana/grafana/pkg/models"
- )
- // SqlEngine is a wrapper class around xorm for relational database data sources.
- type SqlEngine interface {
- InitEngine(driverName string, dsInfo *models.DataSource, cnnstr string) error
- Query(
- ctx context.Context,
- ds *models.DataSource,
- query *TsdbQuery,
- transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult) error,
- transformToTable func(query *Query, rows *core.Rows, result *QueryResult) error,
- ) (*Response, error)
- }
- // SqlMacroEngine interpolates macros into sql. It takes in the Query to have access to query context and
- // timeRange to be able to generate queries that use from and to.
- type SqlMacroEngine interface {
- Interpolate(query *Query, timeRange *TimeRange, sql string) (string, error)
- }
- type DefaultSqlEngine struct {
- MacroEngine SqlMacroEngine
- XormEngine *xorm.Engine
- }
- type engineCacheType struct {
- cache map[int64]*xorm.Engine
- versions map[int64]int
- sync.Mutex
- }
- var engineCache = engineCacheType{
- cache: make(map[int64]*xorm.Engine),
- versions: make(map[int64]int),
- }
- // InitEngine creates the db connection and inits the xorm engine or loads it from the engine cache
- func (e *DefaultSqlEngine) InitEngine(driverName string, dsInfo *models.DataSource, cnnstr string) error {
- engineCache.Lock()
- defer engineCache.Unlock()
- if engine, present := engineCache.cache[dsInfo.Id]; present {
- if version, _ := engineCache.versions[dsInfo.Id]; version == dsInfo.Version {
- e.XormEngine = engine
- return nil
- }
- }
- engine, err := xorm.NewEngine(driverName, cnnstr)
- if err != nil {
- return err
- }
- engine.SetMaxOpenConns(10)
- engine.SetMaxIdleConns(10)
- engineCache.cache[dsInfo.Id] = engine
- e.XormEngine = engine
- return nil
- }
- // Query is a default implementation of the Query method for an SQL data source.
- // The caller of this function must implement transformToTimeSeries and transformToTable and
- // pass them in as parameters.
- func (e *DefaultSqlEngine) Query(
- ctx context.Context,
- dsInfo *models.DataSource,
- tsdbQuery *TsdbQuery,
- transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult) error,
- transformToTable func(query *Query, rows *core.Rows, result *QueryResult) error,
- ) (*Response, error) {
- result := &Response{
- Results: make(map[string]*QueryResult),
- }
- session := e.XormEngine.NewSession()
- defer session.Close()
- db := session.DB()
- for _, query := range tsdbQuery.Queries {
- rawSql := query.Model.Get("rawSql").MustString()
- if rawSql == "" {
- continue
- }
- queryResult := &QueryResult{Meta: simplejson.New(), RefId: query.RefId}
- result.Results[query.RefId] = queryResult
- rawSql, err := e.MacroEngine.Interpolate(query, tsdbQuery.TimeRange, rawSql)
- if err != nil {
- queryResult.Error = err
- continue
- }
- queryResult.Meta.Set("sql", rawSql)
- rows, err := db.Query(rawSql)
- if err != nil {
- queryResult.Error = err
- continue
- }
- defer rows.Close()
- format := query.Model.Get("format").MustString("time_series")
- switch format {
- case "time_series":
- err := transformToTimeSeries(query, rows, queryResult)
- if err != nil {
- queryResult.Error = err
- continue
- }
- case "table":
- err := transformToTable(query, rows, queryResult)
- if err != nil {
- queryResult.Error = err
- continue
- }
- }
- }
- return result, nil
- }
|