sql_engine.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package tsdb
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/go-xorm/core"
  7. "github.com/go-xorm/xorm"
  8. "github.com/grafana/grafana/pkg/components/simplejson"
  9. "github.com/grafana/grafana/pkg/models"
  10. )
  11. // SqlEngine is a wrapper class around xorm for relational database data sources.
  12. type SqlEngine interface {
  13. InitEngine(driverName string, dsInfo *models.DataSource, cnnstr string) error
  14. Query(
  15. ctx context.Context,
  16. ds *models.DataSource,
  17. query *TsdbQuery,
  18. transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
  19. transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
  20. ) (*Response, error)
  21. }
  22. // SqlMacroEngine interpolates macros into sql. It takes in the Query to have access to query context and
  23. // timeRange to be able to generate queries that use from and to.
  24. type SqlMacroEngine interface {
  25. Interpolate(query *Query, timeRange *TimeRange, sql string) (string, error)
  26. }
  27. type DefaultSqlEngine struct {
  28. MacroEngine SqlMacroEngine
  29. XormEngine *xorm.Engine
  30. }
  31. type engineCacheType struct {
  32. cache map[int64]*xorm.Engine
  33. versions map[int64]int
  34. sync.Mutex
  35. }
  36. var engineCache = engineCacheType{
  37. cache: make(map[int64]*xorm.Engine),
  38. versions: make(map[int64]int),
  39. }
  40. // InitEngine creates the db connection and inits the xorm engine or loads it from the engine cache
  41. func (e *DefaultSqlEngine) InitEngine(driverName string, dsInfo *models.DataSource, cnnstr string) error {
  42. engineCache.Lock()
  43. defer engineCache.Unlock()
  44. if engine, present := engineCache.cache[dsInfo.Id]; present {
  45. if version := engineCache.versions[dsInfo.Id]; version == dsInfo.Version {
  46. e.XormEngine = engine
  47. return nil
  48. }
  49. }
  50. engine, err := xorm.NewEngine(driverName, cnnstr)
  51. if err != nil {
  52. return err
  53. }
  54. engine.SetMaxOpenConns(10)
  55. engine.SetMaxIdleConns(10)
  56. engineCache.cache[dsInfo.Id] = engine
  57. e.XormEngine = engine
  58. return nil
  59. }
  60. // Query is a default implementation of the Query method for an SQL data source.
  61. // The caller of this function must implement transformToTimeSeries and transformToTable and
  62. // pass them in as parameters.
  63. func (e *DefaultSqlEngine) Query(
  64. ctx context.Context,
  65. dsInfo *models.DataSource,
  66. tsdbQuery *TsdbQuery,
  67. transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
  68. transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
  69. ) (*Response, error) {
  70. result := &Response{
  71. Results: make(map[string]*QueryResult),
  72. }
  73. session := e.XormEngine.NewSession()
  74. defer session.Close()
  75. db := session.DB()
  76. for _, query := range tsdbQuery.Queries {
  77. rawSql := query.Model.Get("rawSql").MustString()
  78. if rawSql == "" {
  79. continue
  80. }
  81. queryResult := &QueryResult{Meta: simplejson.New(), RefId: query.RefId}
  82. result.Results[query.RefId] = queryResult
  83. rawSql, err := e.MacroEngine.Interpolate(query, tsdbQuery.TimeRange, rawSql)
  84. if err != nil {
  85. queryResult.Error = err
  86. continue
  87. }
  88. queryResult.Meta.Set("sql", rawSql)
  89. rows, err := db.Query(rawSql)
  90. if err != nil {
  91. queryResult.Error = err
  92. continue
  93. }
  94. defer rows.Close()
  95. format := query.Model.Get("format").MustString("time_series")
  96. switch format {
  97. case "time_series":
  98. err := transformToTimeSeries(query, rows, queryResult, tsdbQuery)
  99. if err != nil {
  100. queryResult.Error = err
  101. continue
  102. }
  103. case "table":
  104. err := transformToTable(query, rows, queryResult, tsdbQuery)
  105. if err != nil {
  106. queryResult.Error = err
  107. continue
  108. }
  109. }
  110. }
  111. return result, nil
  112. }
  113. // ConvertSqlTimeColumnToEpochMs converts column named time to unix timestamp in milliseconds
  114. // to make native datetime types and epoch dates work in annotation and table queries.
  115. func ConvertSqlTimeColumnToEpochMs(values RowValues, timeIndex int) {
  116. if timeIndex >= 0 {
  117. switch value := values[timeIndex].(type) {
  118. case time.Time:
  119. values[timeIndex] = EpochPrecisionToMs(float64(value.UnixNano()))
  120. case *time.Time:
  121. if value != nil {
  122. values[timeIndex] = EpochPrecisionToMs(float64((*value).UnixNano()))
  123. }
  124. case int64:
  125. values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
  126. case *int64:
  127. if value != nil {
  128. values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
  129. }
  130. case uint64:
  131. values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
  132. case *uint64:
  133. if value != nil {
  134. values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
  135. }
  136. case int32:
  137. values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
  138. case *int32:
  139. if value != nil {
  140. values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
  141. }
  142. case uint32:
  143. values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
  144. case *uint32:
  145. if value != nil {
  146. values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
  147. }
  148. case float64:
  149. values[timeIndex] = EpochPrecisionToMs(value)
  150. case *float64:
  151. if value != nil {
  152. values[timeIndex] = EpochPrecisionToMs(*value)
  153. }
  154. case float32:
  155. values[timeIndex] = EpochPrecisionToMs(float64(value))
  156. case *float32:
  157. if value != nil {
  158. values[timeIndex] = EpochPrecisionToMs(float64(*value))
  159. }
  160. }
  161. }
  162. }