sql_engine.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package tsdb
  2. import (
  3. "context"
  4. "sync"
  5. "github.com/go-xorm/core"
  6. "github.com/go-xorm/xorm"
  7. "github.com/grafana/grafana/pkg/components/simplejson"
  8. "github.com/grafana/grafana/pkg/models"
  9. )
  10. // SqlEngine is a wrapper class around xorm for relational database data sources.
  11. type SqlEngine interface {
  12. InitEngine(driverName string, dsInfo *models.DataSource, cnnstr string) error
  13. Query(
  14. ctx context.Context,
  15. ds *models.DataSource,
  16. query *TsdbQuery,
  17. transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult) error,
  18. transformToTable func(query *Query, rows *core.Rows, result *QueryResult) error,
  19. ) (*Response, error)
  20. }
  21. // SqlMacroEngine interpolates macros into sql. It takes in the Query to have access to query context and
  22. // timeRange to be able to generate queries that use from and to.
  23. type SqlMacroEngine interface {
  24. Interpolate(query *Query, timeRange *TimeRange, sql string) (string, error)
  25. }
  26. type DefaultSqlEngine struct {
  27. MacroEngine SqlMacroEngine
  28. XormEngine *xorm.Engine
  29. }
  30. type engineCacheType struct {
  31. cache map[int64]*xorm.Engine
  32. versions map[int64]int
  33. sync.Mutex
  34. }
  35. var engineCache = engineCacheType{
  36. cache: make(map[int64]*xorm.Engine),
  37. versions: make(map[int64]int),
  38. }
  39. // InitEngine creates the db connection and inits the xorm engine or loads it from the engine cache
  40. func (e *DefaultSqlEngine) InitEngine(driverName string, dsInfo *models.DataSource, cnnstr string) error {
  41. engineCache.Lock()
  42. defer engineCache.Unlock()
  43. if engine, present := engineCache.cache[dsInfo.Id]; present {
  44. if version, _ := engineCache.versions[dsInfo.Id]; version == dsInfo.Version {
  45. e.XormEngine = engine
  46. return nil
  47. }
  48. }
  49. engine, err := xorm.NewEngine(driverName, cnnstr)
  50. if err != nil {
  51. return err
  52. }
  53. engine.SetMaxOpenConns(10)
  54. engine.SetMaxIdleConns(10)
  55. engineCache.cache[dsInfo.Id] = engine
  56. e.XormEngine = engine
  57. return nil
  58. }
  59. // Query is a default implementation of the Query method for an SQL data source.
  60. // The caller of this function must implement transformToTimeSeries and transformToTable and
  61. // pass them in as parameters.
  62. func (e *DefaultSqlEngine) Query(
  63. ctx context.Context,
  64. dsInfo *models.DataSource,
  65. tsdbQuery *TsdbQuery,
  66. transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult) error,
  67. transformToTable func(query *Query, rows *core.Rows, result *QueryResult) error,
  68. ) (*Response, error) {
  69. result := &Response{
  70. Results: make(map[string]*QueryResult),
  71. }
  72. session := e.XormEngine.NewSession()
  73. defer session.Close()
  74. db := session.DB()
  75. for _, query := range tsdbQuery.Queries {
  76. rawSql := query.Model.Get("rawSql").MustString()
  77. if rawSql == "" {
  78. continue
  79. }
  80. queryResult := &QueryResult{Meta: simplejson.New(), RefId: query.RefId}
  81. result.Results[query.RefId] = queryResult
  82. rawSql, err := e.MacroEngine.Interpolate(query, tsdbQuery.TimeRange, rawSql)
  83. if err != nil {
  84. queryResult.Error = err
  85. continue
  86. }
  87. queryResult.Meta.Set("sql", rawSql)
  88. rows, err := db.Query(rawSql)
  89. if err != nil {
  90. queryResult.Error = err
  91. continue
  92. }
  93. defer rows.Close()
  94. format := query.Model.Get("format").MustString("time_series")
  95. switch format {
  96. case "time_series":
  97. err := transformToTimeSeries(query, rows, queryResult)
  98. if err != nil {
  99. queryResult.Error = err
  100. continue
  101. }
  102. case "table":
  103. err := transformToTable(query, rows, queryResult)
  104. if err != nil {
  105. queryResult.Error = err
  106. continue
  107. }
  108. }
  109. }
  110. return result, nil
  111. }