mysql.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. package mysql
  2. import (
  3. "container/list"
  4. "context"
  5. "database/sql"
  6. "fmt"
  7. "math"
  8. "reflect"
  9. "strconv"
  10. "github.com/go-sql-driver/mysql"
  11. "github.com/go-xorm/core"
  12. "github.com/grafana/grafana/pkg/components/null"
  13. "github.com/grafana/grafana/pkg/log"
  14. "github.com/grafana/grafana/pkg/models"
  15. "github.com/grafana/grafana/pkg/tsdb"
  16. )
  17. type MysqlQueryEndpoint struct {
  18. sqlEngine tsdb.SqlEngine
  19. log log.Logger
  20. }
  21. func init() {
  22. tsdb.RegisterTsdbQueryEndpoint("mysql", NewMysqlQueryEndpoint)
  23. }
  24. func NewMysqlQueryEndpoint(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
  25. endpoint := &MysqlQueryEndpoint{
  26. log: log.New("tsdb.mysql"),
  27. }
  28. endpoint.sqlEngine = &tsdb.DefaultSqlEngine{
  29. MacroEngine: NewMysqlMacroEngine(),
  30. }
  31. cnnstr := fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&parseTime=true&loc=UTC&allowNativePasswords=true",
  32. datasource.User,
  33. datasource.Password,
  34. "tcp",
  35. datasource.Url,
  36. datasource.Database,
  37. )
  38. endpoint.log.Debug("getEngine", "connection", cnnstr)
  39. if err := endpoint.sqlEngine.InitEngine("mysql", datasource, cnnstr); err != nil {
  40. return nil, err
  41. }
  42. return endpoint, nil
  43. }
  44. // Query is the main function for the MysqlExecutor
  45. func (e *MysqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) {
  46. return e.sqlEngine.Query(ctx, dsInfo, tsdbQuery, e.transformToTimeSeries, e.transformToTable)
  47. }
  48. func (e MysqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
  49. columnNames, err := rows.Columns()
  50. columnCount := len(columnNames)
  51. if err != nil {
  52. return err
  53. }
  54. table := &tsdb.Table{
  55. Columns: make([]tsdb.TableColumn, columnCount),
  56. Rows: make([]tsdb.RowValues, 0),
  57. }
  58. for i, name := range columnNames {
  59. table.Columns[i].Text = name
  60. }
  61. rowLimit := 1000000
  62. rowCount := 0
  63. timeIndex := -1
  64. // check if there is a column named time
  65. for i, col := range columnNames {
  66. switch col {
  67. case "time", "time_sec":
  68. timeIndex = i
  69. }
  70. }
  71. for ; rows.Next(); rowCount++ {
  72. if rowCount > rowLimit {
  73. return fmt.Errorf("MySQL query row limit exceeded, limit %d", rowLimit)
  74. }
  75. values, err := e.getTypedRowData(rows)
  76. if err != nil {
  77. return err
  78. }
  79. // converts column named time to unix timestamp in milliseconds to make
  80. // native mysql datetime types and epoch dates work in
  81. // annotation and table queries.
  82. tsdb.ConvertSqlTimeColumnToEpochMs(values, timeIndex)
  83. table.Rows = append(table.Rows, values)
  84. }
  85. result.Tables = append(result.Tables, table)
  86. result.Meta.Set("rowCount", rowCount)
  87. return nil
  88. }
  89. func (e MysqlQueryEndpoint) getTypedRowData(rows *core.Rows) (tsdb.RowValues, error) {
  90. types, err := rows.ColumnTypes()
  91. if err != nil {
  92. return nil, err
  93. }
  94. values := make([]interface{}, len(types))
  95. for i := range values {
  96. scanType := types[i].ScanType()
  97. values[i] = reflect.New(scanType).Interface()
  98. if types[i].DatabaseTypeName() == "BIT" {
  99. values[i] = new([]byte)
  100. }
  101. }
  102. if err := rows.Scan(values...); err != nil {
  103. return nil, err
  104. }
  105. for i := 0; i < len(types); i++ {
  106. typeName := reflect.ValueOf(values[i]).Type().String()
  107. switch typeName {
  108. case "*sql.RawBytes":
  109. values[i] = string(*values[i].(*sql.RawBytes))
  110. case "*mysql.NullTime":
  111. sqlTime := (*values[i].(*mysql.NullTime))
  112. if sqlTime.Valid {
  113. values[i] = sqlTime.Time
  114. } else {
  115. values[i] = nil
  116. }
  117. case "*sql.NullInt64":
  118. nullInt64 := (*values[i].(*sql.NullInt64))
  119. if nullInt64.Valid {
  120. values[i] = nullInt64.Int64
  121. } else {
  122. values[i] = nil
  123. }
  124. case "*sql.NullFloat64":
  125. nullFloat64 := (*values[i].(*sql.NullFloat64))
  126. if nullFloat64.Valid {
  127. values[i] = nullFloat64.Float64
  128. } else {
  129. values[i] = nil
  130. }
  131. }
  132. if types[i].DatabaseTypeName() == "DECIMAL" {
  133. f, err := strconv.ParseFloat(values[i].(string), 64)
  134. if err == nil {
  135. values[i] = f
  136. } else {
  137. values[i] = nil
  138. }
  139. }
  140. }
  141. return values, nil
  142. }
  143. func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
  144. pointsBySeries := make(map[string]*tsdb.TimeSeries)
  145. seriesByQueryOrder := list.New()
  146. columnNames, err := rows.Columns()
  147. if err != nil {
  148. return err
  149. }
  150. columnTypes, err := rows.ColumnTypes()
  151. if err != nil {
  152. return err
  153. }
  154. rowLimit := 1000000
  155. rowCount := 0
  156. timeIndex := -1
  157. metricIndex := -1
  158. // check columns of resultset: a column named time is mandatory
  159. // the first text column is treated as metric name unless a column named metric is present
  160. for i, col := range columnNames {
  161. switch col {
  162. case "time", "time_sec":
  163. timeIndex = i
  164. case "metric":
  165. metricIndex = i
  166. default:
  167. if metricIndex == -1 {
  168. switch columnTypes[i].DatabaseTypeName() {
  169. case "CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT":
  170. metricIndex = i
  171. }
  172. }
  173. }
  174. }
  175. if timeIndex == -1 {
  176. return fmt.Errorf("Found no column named time or time_sec")
  177. }
  178. fillMissing := query.Model.Get("fill").MustBool(false)
  179. var fillInterval float64
  180. fillValue := null.Float{}
  181. if fillMissing {
  182. fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000
  183. if !query.Model.Get("fillNull").MustBool(false) {
  184. fillValue.Float64 = query.Model.Get("fillValue").MustFloat64()
  185. fillValue.Valid = true
  186. }
  187. }
  188. for rows.Next() {
  189. var timestamp float64
  190. var value null.Float
  191. var metric string
  192. if rowCount > rowLimit {
  193. return fmt.Errorf("PostgreSQL query row limit exceeded, limit %d", rowLimit)
  194. }
  195. values, err := e.getTypedRowData(rows)
  196. if err != nil {
  197. return err
  198. }
  199. // converts column named time to unix timestamp in milliseconds to make
  200. // native mysql datetime types and epoch dates work in
  201. // annotation and table queries.
  202. tsdb.ConvertSqlTimeColumnToEpochMs(values, timeIndex)
  203. switch columnValue := values[timeIndex].(type) {
  204. case int64:
  205. timestamp = float64(columnValue)
  206. case float64:
  207. timestamp = columnValue
  208. default:
  209. return fmt.Errorf("Invalid type for column time/time_sec, must be of type timestamp or unix timestamp, got: %T %v", columnValue, columnValue)
  210. }
  211. if metricIndex >= 0 {
  212. if columnValue, ok := values[metricIndex].(string); ok {
  213. metric = columnValue
  214. } else {
  215. return fmt.Errorf("Column metric must be of type char,varchar or text, got: %T %v", values[metricIndex], values[metricIndex])
  216. }
  217. }
  218. for i, col := range columnNames {
  219. if i == timeIndex || i == metricIndex {
  220. continue
  221. }
  222. switch columnValue := values[i].(type) {
  223. case int64:
  224. value = null.FloatFrom(float64(columnValue))
  225. case float64:
  226. value = null.FloatFrom(columnValue)
  227. case nil:
  228. value.Valid = false
  229. default:
  230. return fmt.Errorf("Value column must have numeric datatype, column: %s type: %T value: %v", col, columnValue, columnValue)
  231. }
  232. if metricIndex == -1 {
  233. metric = col
  234. }
  235. series, exist := pointsBySeries[metric]
  236. if !exist {
  237. series = &tsdb.TimeSeries{Name: metric}
  238. pointsBySeries[metric] = series
  239. seriesByQueryOrder.PushBack(metric)
  240. }
  241. if fillMissing {
  242. var intervalStart float64
  243. if !exist {
  244. intervalStart = float64(tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6)
  245. } else {
  246. intervalStart = series.Points[len(series.Points)-1][1].Float64 + fillInterval
  247. }
  248. // align interval start
  249. intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
  250. for i := intervalStart; i < timestamp; i += fillInterval {
  251. series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
  252. rowCount++
  253. }
  254. }
  255. series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)})
  256. e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value)
  257. rowCount++
  258. }
  259. }
  260. for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() {
  261. key := elem.Value.(string)
  262. result.Series = append(result.Series, pointsBySeries[key])
  263. if fillMissing {
  264. series := pointsBySeries[key]
  265. // fill in values from last fetched value till interval end
  266. intervalStart := series.Points[len(series.Points)-1][1].Float64
  267. intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6)
  268. // align interval start
  269. intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
  270. for i := intervalStart + fillInterval; i < intervalEnd; i += fillInterval {
  271. series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
  272. rowCount++
  273. }
  274. }
  275. }
  276. result.Meta.Set("rowCount", rowCount)
  277. return nil
  278. }