sql_engine.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. package tsdb
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/grafana/grafana/pkg/components/null"
  8. "github.com/go-xorm/core"
  9. "github.com/go-xorm/xorm"
  10. "github.com/grafana/grafana/pkg/components/simplejson"
  11. "github.com/grafana/grafana/pkg/models"
  12. )
  13. // SqlEngine is a wrapper class around xorm for relational database data sources.
  14. type SqlEngine interface {
  15. InitEngine(driverName string, dsInfo *models.DataSource, cnnstr string) error
  16. Query(
  17. ctx context.Context,
  18. ds *models.DataSource,
  19. query *TsdbQuery,
  20. transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
  21. transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
  22. ) (*Response, error)
  23. }
  24. // SqlMacroEngine interpolates macros into sql. It takes in the Query to have access to query context and
  25. // timeRange to be able to generate queries that use from and to.
  26. type SqlMacroEngine interface {
  27. Interpolate(query *Query, timeRange *TimeRange, sql string) (string, error)
  28. }
  29. type DefaultSqlEngine struct {
  30. MacroEngine SqlMacroEngine
  31. XormEngine *xorm.Engine
  32. }
  33. type engineCacheType struct {
  34. cache map[int64]*xorm.Engine
  35. versions map[int64]int
  36. sync.Mutex
  37. }
  38. var engineCache = engineCacheType{
  39. cache: make(map[int64]*xorm.Engine),
  40. versions: make(map[int64]int),
  41. }
  42. // InitEngine creates the db connection and inits the xorm engine or loads it from the engine cache
  43. func (e *DefaultSqlEngine) InitEngine(driverName string, dsInfo *models.DataSource, cnnstr string) error {
  44. engineCache.Lock()
  45. defer engineCache.Unlock()
  46. if engine, present := engineCache.cache[dsInfo.Id]; present {
  47. if version := engineCache.versions[dsInfo.Id]; version == dsInfo.Version {
  48. e.XormEngine = engine
  49. return nil
  50. }
  51. }
  52. engine, err := xorm.NewEngine(driverName, cnnstr)
  53. if err != nil {
  54. return err
  55. }
  56. engine.SetMaxOpenConns(10)
  57. engine.SetMaxIdleConns(10)
  58. engineCache.cache[dsInfo.Id] = engine
  59. e.XormEngine = engine
  60. return nil
  61. }
  62. // Query is a default implementation of the Query method for an SQL data source.
  63. // The caller of this function must implement transformToTimeSeries and transformToTable and
  64. // pass them in as parameters.
  65. func (e *DefaultSqlEngine) Query(
  66. ctx context.Context,
  67. dsInfo *models.DataSource,
  68. tsdbQuery *TsdbQuery,
  69. transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
  70. transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
  71. ) (*Response, error) {
  72. result := &Response{
  73. Results: make(map[string]*QueryResult),
  74. }
  75. session := e.XormEngine.NewSession()
  76. defer session.Close()
  77. db := session.DB()
  78. for _, query := range tsdbQuery.Queries {
  79. rawSql := query.Model.Get("rawSql").MustString()
  80. if rawSql == "" {
  81. continue
  82. }
  83. queryResult := &QueryResult{Meta: simplejson.New(), RefId: query.RefId}
  84. result.Results[query.RefId] = queryResult
  85. rawSql, err := e.MacroEngine.Interpolate(query, tsdbQuery.TimeRange, rawSql)
  86. if err != nil {
  87. queryResult.Error = err
  88. continue
  89. }
  90. queryResult.Meta.Set("sql", rawSql)
  91. rows, err := db.Query(rawSql)
  92. if err != nil {
  93. queryResult.Error = err
  94. continue
  95. }
  96. defer rows.Close()
  97. format := query.Model.Get("format").MustString("time_series")
  98. switch format {
  99. case "time_series":
  100. err := transformToTimeSeries(query, rows, queryResult, tsdbQuery)
  101. if err != nil {
  102. queryResult.Error = err
  103. continue
  104. }
  105. case "table":
  106. err := transformToTable(query, rows, queryResult, tsdbQuery)
  107. if err != nil {
  108. queryResult.Error = err
  109. continue
  110. }
  111. }
  112. }
  113. return result, nil
  114. }
  115. // ConvertSqlTimeColumnToEpochMs converts column named time to unix timestamp in milliseconds
  116. // to make native datetime types and epoch dates work in annotation and table queries.
  117. func ConvertSqlTimeColumnToEpochMs(values RowValues, timeIndex int) {
  118. if timeIndex >= 0 {
  119. switch value := values[timeIndex].(type) {
  120. case time.Time:
  121. values[timeIndex] = EpochPrecisionToMs(float64(value.UnixNano()))
  122. case *time.Time:
  123. if value != nil {
  124. values[timeIndex] = EpochPrecisionToMs(float64((*value).UnixNano()))
  125. }
  126. case int64:
  127. values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
  128. case *int64:
  129. if value != nil {
  130. values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
  131. }
  132. case uint64:
  133. values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
  134. case *uint64:
  135. if value != nil {
  136. values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
  137. }
  138. case int32:
  139. values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
  140. case *int32:
  141. if value != nil {
  142. values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
  143. }
  144. case uint32:
  145. values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
  146. case *uint32:
  147. if value != nil {
  148. values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
  149. }
  150. case float64:
  151. values[timeIndex] = EpochPrecisionToMs(value)
  152. case *float64:
  153. if value != nil {
  154. values[timeIndex] = EpochPrecisionToMs(*value)
  155. }
  156. case float32:
  157. values[timeIndex] = EpochPrecisionToMs(float64(value))
  158. case *float32:
  159. if value != nil {
  160. values[timeIndex] = EpochPrecisionToMs(float64(*value))
  161. }
  162. }
  163. }
  164. }
  165. // ConvertSqlValueColumnToFloat converts timeseries value column to float.
  166. func ConvertSqlValueColumnToFloat(columnName string, columnValue interface{}) (null.Float, error) {
  167. var value null.Float
  168. switch typedValue := columnValue.(type) {
  169. case int:
  170. value = null.FloatFrom(float64(typedValue))
  171. case *int:
  172. if typedValue == nil {
  173. value.Valid = false
  174. } else {
  175. value = null.FloatFrom(float64(*typedValue))
  176. }
  177. case int64:
  178. value = null.FloatFrom(float64(typedValue))
  179. case *int64:
  180. if typedValue == nil {
  181. value.Valid = false
  182. } else {
  183. value = null.FloatFrom(float64(*typedValue))
  184. }
  185. case int32:
  186. value = null.FloatFrom(float64(typedValue))
  187. case *int32:
  188. if typedValue == nil {
  189. value.Valid = false
  190. } else {
  191. value = null.FloatFrom(float64(*typedValue))
  192. }
  193. case int16:
  194. value = null.FloatFrom(float64(typedValue))
  195. case *int16:
  196. if typedValue == nil {
  197. value.Valid = false
  198. } else {
  199. value = null.FloatFrom(float64(*typedValue))
  200. }
  201. case int8:
  202. value = null.FloatFrom(float64(typedValue))
  203. case *int8:
  204. if typedValue == nil {
  205. value.Valid = false
  206. } else {
  207. value = null.FloatFrom(float64(*typedValue))
  208. }
  209. case uint:
  210. value = null.FloatFrom(float64(typedValue))
  211. case *uint:
  212. if typedValue == nil {
  213. value.Valid = false
  214. } else {
  215. value = null.FloatFrom(float64(*typedValue))
  216. }
  217. case uint64:
  218. value = null.FloatFrom(float64(typedValue))
  219. case *uint64:
  220. if typedValue == nil {
  221. value.Valid = false
  222. } else {
  223. value = null.FloatFrom(float64(*typedValue))
  224. }
  225. case uint32:
  226. value = null.FloatFrom(float64(typedValue))
  227. case *uint32:
  228. if typedValue == nil {
  229. value.Valid = false
  230. } else {
  231. value = null.FloatFrom(float64(*typedValue))
  232. }
  233. case uint16:
  234. value = null.FloatFrom(float64(typedValue))
  235. case *uint16:
  236. if typedValue == nil {
  237. value.Valid = false
  238. } else {
  239. value = null.FloatFrom(float64(*typedValue))
  240. }
  241. case uint8:
  242. value = null.FloatFrom(float64(typedValue))
  243. case *uint8:
  244. if typedValue == nil {
  245. value.Valid = false
  246. } else {
  247. value = null.FloatFrom(float64(*typedValue))
  248. }
  249. case float64:
  250. value = null.FloatFrom(typedValue)
  251. case *float64:
  252. value = null.FloatFromPtr(typedValue)
  253. case float32:
  254. value = null.FloatFrom(float64(typedValue))
  255. case *float32:
  256. if typedValue == nil {
  257. value.Valid = false
  258. } else {
  259. value = null.FloatFrom(float64(*typedValue))
  260. }
  261. case nil:
  262. value.Valid = false
  263. default:
  264. return null.NewFloat(0, false), fmt.Errorf("Value column must have numeric datatype, column: %s type: %T value: %v", columnName, typedValue, typedValue)
  265. }
  266. return value, nil
  267. }