sql_engine.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. package tsdb
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/grafana/grafana/pkg/components/null"
  8. "github.com/go-xorm/core"
  9. "github.com/go-xorm/xorm"
  10. "github.com/grafana/grafana/pkg/components/simplejson"
  11. "github.com/grafana/grafana/pkg/models"
  12. )
  13. // SqlEngine is a wrapper class around xorm for relational database data sources.
  14. type SqlEngine interface {
  15. InitEngine(driverName string, dsInfo *models.DataSource, cnnstr string) error
  16. Query(
  17. ctx context.Context,
  18. ds *models.DataSource,
  19. query *TsdbQuery,
  20. transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
  21. transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
  22. ) (*Response, error)
  23. }
  24. // SqlMacroEngine interpolates macros into sql. It takes in the Query to have access to query context and
  25. // timeRange to be able to generate queries that use from and to.
  26. type SqlMacroEngine interface {
  27. Interpolate(query *Query, timeRange *TimeRange, sql string) (string, error)
  28. }
  29. type DefaultSqlEngine struct {
  30. MacroEngine SqlMacroEngine
  31. XormEngine *xorm.Engine
  32. }
  33. type engineCacheType struct {
  34. cache map[int64]*xorm.Engine
  35. versions map[int64]int
  36. sync.Mutex
  37. }
  38. var engineCache = engineCacheType{
  39. cache: make(map[int64]*xorm.Engine),
  40. versions: make(map[int64]int),
  41. }
  42. // InitEngine creates the db connection and inits the xorm engine or loads it from the engine cache
  43. func (e *DefaultSqlEngine) InitEngine(driverName string, dsInfo *models.DataSource, cnnstr string) error {
  44. engineCache.Lock()
  45. defer engineCache.Unlock()
  46. if engine, present := engineCache.cache[dsInfo.Id]; present {
  47. if version := engineCache.versions[dsInfo.Id]; version == dsInfo.Version {
  48. e.XormEngine = engine
  49. return nil
  50. }
  51. }
  52. engine, err := xorm.NewEngine(driverName, cnnstr)
  53. if err != nil {
  54. return err
  55. }
  56. engine.SetMaxOpenConns(10)
  57. engine.SetMaxIdleConns(10)
  58. engineCache.versions[dsInfo.Id] = dsInfo.Version
  59. engineCache.cache[dsInfo.Id] = engine
  60. e.XormEngine = engine
  61. return nil
  62. }
  63. // Query is a default implementation of the Query method for an SQL data source.
  64. // The caller of this function must implement transformToTimeSeries and transformToTable and
  65. // pass them in as parameters.
  66. func (e *DefaultSqlEngine) Query(
  67. ctx context.Context,
  68. dsInfo *models.DataSource,
  69. tsdbQuery *TsdbQuery,
  70. transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
  71. transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
  72. ) (*Response, error) {
  73. result := &Response{
  74. Results: make(map[string]*QueryResult),
  75. }
  76. session := e.XormEngine.NewSession()
  77. defer session.Close()
  78. db := session.DB()
  79. for _, query := range tsdbQuery.Queries {
  80. rawSql := query.Model.Get("rawSql").MustString()
  81. if rawSql == "" {
  82. continue
  83. }
  84. queryResult := &QueryResult{Meta: simplejson.New(), RefId: query.RefId}
  85. result.Results[query.RefId] = queryResult
  86. rawSql, err := e.MacroEngine.Interpolate(query, tsdbQuery.TimeRange, rawSql)
  87. if err != nil {
  88. queryResult.Error = err
  89. continue
  90. }
  91. queryResult.Meta.Set("sql", rawSql)
  92. rows, err := db.Query(rawSql)
  93. if err != nil {
  94. queryResult.Error = err
  95. continue
  96. }
  97. defer rows.Close()
  98. format := query.Model.Get("format").MustString("time_series")
  99. switch format {
  100. case "time_series":
  101. err := transformToTimeSeries(query, rows, queryResult, tsdbQuery)
  102. if err != nil {
  103. queryResult.Error = err
  104. continue
  105. }
  106. case "table":
  107. err := transformToTable(query, rows, queryResult, tsdbQuery)
  108. if err != nil {
  109. queryResult.Error = err
  110. continue
  111. }
  112. }
  113. }
  114. return result, nil
  115. }
  116. // ConvertSqlTimeColumnToEpochMs converts column named time to unix timestamp in milliseconds
  117. // to make native datetime types and epoch dates work in annotation and table queries.
  118. func ConvertSqlTimeColumnToEpochMs(values RowValues, timeIndex int) {
  119. if timeIndex >= 0 {
  120. switch value := values[timeIndex].(type) {
  121. case time.Time:
  122. values[timeIndex] = float64(value.UnixNano()) / float64(time.Millisecond)
  123. case *time.Time:
  124. if value != nil {
  125. values[timeIndex] = float64((*value).UnixNano()) / float64(time.Millisecond)
  126. }
  127. case int64:
  128. values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
  129. case *int64:
  130. if value != nil {
  131. values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
  132. }
  133. case uint64:
  134. values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
  135. case *uint64:
  136. if value != nil {
  137. values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
  138. }
  139. case int32:
  140. values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
  141. case *int32:
  142. if value != nil {
  143. values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
  144. }
  145. case uint32:
  146. values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
  147. case *uint32:
  148. if value != nil {
  149. values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
  150. }
  151. case float64:
  152. values[timeIndex] = EpochPrecisionToMs(value)
  153. case *float64:
  154. if value != nil {
  155. values[timeIndex] = EpochPrecisionToMs(*value)
  156. }
  157. case float32:
  158. values[timeIndex] = EpochPrecisionToMs(float64(value))
  159. case *float32:
  160. if value != nil {
  161. values[timeIndex] = EpochPrecisionToMs(float64(*value))
  162. }
  163. }
  164. }
  165. }
  166. // ConvertSqlValueColumnToFloat converts timeseries value column to float.
  167. func ConvertSqlValueColumnToFloat(columnName string, columnValue interface{}) (null.Float, error) {
  168. var value null.Float
  169. switch typedValue := columnValue.(type) {
  170. case int:
  171. value = null.FloatFrom(float64(typedValue))
  172. case *int:
  173. if typedValue == nil {
  174. value.Valid = false
  175. } else {
  176. value = null.FloatFrom(float64(*typedValue))
  177. }
  178. case int64:
  179. value = null.FloatFrom(float64(typedValue))
  180. case *int64:
  181. if typedValue == nil {
  182. value.Valid = false
  183. } else {
  184. value = null.FloatFrom(float64(*typedValue))
  185. }
  186. case int32:
  187. value = null.FloatFrom(float64(typedValue))
  188. case *int32:
  189. if typedValue == nil {
  190. value.Valid = false
  191. } else {
  192. value = null.FloatFrom(float64(*typedValue))
  193. }
  194. case int16:
  195. value = null.FloatFrom(float64(typedValue))
  196. case *int16:
  197. if typedValue == nil {
  198. value.Valid = false
  199. } else {
  200. value = null.FloatFrom(float64(*typedValue))
  201. }
  202. case int8:
  203. value = null.FloatFrom(float64(typedValue))
  204. case *int8:
  205. if typedValue == nil {
  206. value.Valid = false
  207. } else {
  208. value = null.FloatFrom(float64(*typedValue))
  209. }
  210. case uint:
  211. value = null.FloatFrom(float64(typedValue))
  212. case *uint:
  213. if typedValue == nil {
  214. value.Valid = false
  215. } else {
  216. value = null.FloatFrom(float64(*typedValue))
  217. }
  218. case uint64:
  219. value = null.FloatFrom(float64(typedValue))
  220. case *uint64:
  221. if typedValue == nil {
  222. value.Valid = false
  223. } else {
  224. value = null.FloatFrom(float64(*typedValue))
  225. }
  226. case uint32:
  227. value = null.FloatFrom(float64(typedValue))
  228. case *uint32:
  229. if typedValue == nil {
  230. value.Valid = false
  231. } else {
  232. value = null.FloatFrom(float64(*typedValue))
  233. }
  234. case uint16:
  235. value = null.FloatFrom(float64(typedValue))
  236. case *uint16:
  237. if typedValue == nil {
  238. value.Valid = false
  239. } else {
  240. value = null.FloatFrom(float64(*typedValue))
  241. }
  242. case uint8:
  243. value = null.FloatFrom(float64(typedValue))
  244. case *uint8:
  245. if typedValue == nil {
  246. value.Valid = false
  247. } else {
  248. value = null.FloatFrom(float64(*typedValue))
  249. }
  250. case float64:
  251. value = null.FloatFrom(typedValue)
  252. case *float64:
  253. value = null.FloatFromPtr(typedValue)
  254. case float32:
  255. value = null.FloatFrom(float64(typedValue))
  256. case *float32:
  257. if typedValue == nil {
  258. value.Valid = false
  259. } else {
  260. value = null.FloatFrom(float64(*typedValue))
  261. }
  262. case nil:
  263. value.Valid = false
  264. default:
  265. return null.NewFloat(0, false), fmt.Errorf("Value column must have numeric datatype, column: %s type: %T value: %v", columnName, typedValue, typedValue)
  266. }
  267. return value, nil
  268. }