mysql.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. package mysql
  2. import (
  3. "container/list"
  4. "context"
  5. "database/sql"
  6. "fmt"
  7. "math"
  8. "reflect"
  9. "strconv"
  10. "time"
  11. "github.com/go-sql-driver/mysql"
  12. "github.com/go-xorm/core"
  13. "github.com/grafana/grafana/pkg/components/null"
  14. "github.com/grafana/grafana/pkg/log"
  15. "github.com/grafana/grafana/pkg/models"
  16. "github.com/grafana/grafana/pkg/tsdb"
  17. )
  18. type MysqlQueryEndpoint struct {
  19. sqlEngine tsdb.SqlEngine
  20. log log.Logger
  21. }
  22. func init() {
  23. tsdb.RegisterTsdbQueryEndpoint("mysql", NewMysqlQueryEndpoint)
  24. }
  25. func NewMysqlQueryEndpoint(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
  26. endpoint := &MysqlQueryEndpoint{
  27. log: log.New("tsdb.mysql"),
  28. }
  29. endpoint.sqlEngine = &tsdb.DefaultSqlEngine{
  30. MacroEngine: NewMysqlMacroEngine(),
  31. }
  32. cnnstr := fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&parseTime=true&loc=UTC&allowNativePasswords=true",
  33. datasource.User,
  34. datasource.Password,
  35. "tcp",
  36. datasource.Url,
  37. datasource.Database,
  38. )
  39. endpoint.log.Debug("getEngine", "connection", cnnstr)
  40. if err := endpoint.sqlEngine.InitEngine("mysql", datasource, cnnstr); err != nil {
  41. return nil, err
  42. }
  43. return endpoint, nil
  44. }
  45. // Query is the main function for the MysqlExecutor
  46. func (e *MysqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) {
  47. return e.sqlEngine.Query(ctx, dsInfo, tsdbQuery, e.transformToTimeSeries, e.transformToTable)
  48. }
  49. func (e MysqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
  50. columnNames, err := rows.Columns()
  51. columnCount := len(columnNames)
  52. if err != nil {
  53. return err
  54. }
  55. table := &tsdb.Table{
  56. Columns: make([]tsdb.TableColumn, columnCount),
  57. Rows: make([]tsdb.RowValues, 0),
  58. }
  59. for i, name := range columnNames {
  60. table.Columns[i].Text = name
  61. }
  62. rowLimit := 1000000
  63. rowCount := 0
  64. timeIndex := -1
  65. // check if there is a column named time
  66. for i, col := range columnNames {
  67. switch col {
  68. case "time", "time_sec":
  69. timeIndex = i
  70. }
  71. }
  72. for ; rows.Next(); rowCount++ {
  73. if rowCount > rowLimit {
  74. return fmt.Errorf("MySQL query row limit exceeded, limit %d", rowLimit)
  75. }
  76. values, err := e.getTypedRowData(rows)
  77. if err != nil {
  78. return err
  79. }
  80. // converts column named time to unix timestamp in milliseconds to make
  81. // native mysql datetime types and epoch dates work in
  82. // annotation and table queries.
  83. tsdb.ConvertSqlTimeColumnToEpochMs(values, timeIndex)
  84. table.Rows = append(table.Rows, values)
  85. }
  86. result.Tables = append(result.Tables, table)
  87. result.Meta.Set("rowCount", rowCount)
  88. return nil
  89. }
  90. func (e MysqlQueryEndpoint) getTypedRowData(rows *core.Rows) (tsdb.RowValues, error) {
  91. types, err := rows.ColumnTypes()
  92. if err != nil {
  93. return nil, err
  94. }
  95. values := make([]interface{}, len(types))
  96. for i := range values {
  97. scanType := types[i].ScanType()
  98. values[i] = reflect.New(scanType).Interface()
  99. if types[i].DatabaseTypeName() == "BIT" {
  100. values[i] = new([]byte)
  101. }
  102. }
  103. if err := rows.Scan(values...); err != nil {
  104. return nil, err
  105. }
  106. for i := 0; i < len(types); i++ {
  107. typeName := reflect.ValueOf(values[i]).Type().String()
  108. switch typeName {
  109. case "*sql.RawBytes":
  110. values[i] = string(*values[i].(*sql.RawBytes))
  111. case "*mysql.NullTime":
  112. sqlTime := (*values[i].(*mysql.NullTime))
  113. if sqlTime.Valid {
  114. values[i] = sqlTime.Time
  115. } else {
  116. values[i] = nil
  117. }
  118. case "*sql.NullInt64":
  119. nullInt64 := (*values[i].(*sql.NullInt64))
  120. if nullInt64.Valid {
  121. values[i] = nullInt64.Int64
  122. } else {
  123. values[i] = nil
  124. }
  125. case "*sql.NullFloat64":
  126. nullFloat64 := (*values[i].(*sql.NullFloat64))
  127. if nullFloat64.Valid {
  128. values[i] = nullFloat64.Float64
  129. } else {
  130. values[i] = nil
  131. }
  132. }
  133. if types[i].DatabaseTypeName() == "DECIMAL" {
  134. f, err := strconv.ParseFloat(values[i].(string), 64)
  135. if err == nil {
  136. values[i] = f
  137. } else {
  138. values[i] = nil
  139. }
  140. }
  141. }
  142. return values, nil
  143. }
  144. func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
  145. pointsBySeries := make(map[string]*tsdb.TimeSeries)
  146. seriesByQueryOrder := list.New()
  147. columnNames, err := rows.Columns()
  148. if err != nil {
  149. return err
  150. }
  151. columnTypes, err := rows.ColumnTypes()
  152. if err != nil {
  153. return err
  154. }
  155. rowLimit := 1000000
  156. rowCount := 0
  157. timeIndex := -1
  158. metricIndex := -1
  159. // check columns of resultset: a column named time is mandatory
  160. // the first text column is treated as metric name unless a column named metric is present
  161. for i, col := range columnNames {
  162. switch col {
  163. case "time", "time_sec":
  164. timeIndex = i
  165. case "metric":
  166. metricIndex = i
  167. default:
  168. if metricIndex == -1 {
  169. switch columnTypes[i].DatabaseTypeName() {
  170. case "CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT":
  171. metricIndex = i
  172. }
  173. }
  174. }
  175. }
  176. if timeIndex == -1 {
  177. return fmt.Errorf("Found no column named time or time_sec")
  178. }
  179. fillMissing := query.Model.Get("fill").MustBool(false)
  180. var fillInterval float64
  181. fillValue := null.Float{}
  182. if fillMissing {
  183. fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000
  184. if query.Model.Get("fillNull").MustBool(false) == false {
  185. fillValue.Float64 = query.Model.Get("fillValue").MustFloat64()
  186. fillValue.Valid = true
  187. }
  188. }
  189. for rows.Next() {
  190. var timestamp float64
  191. var value null.Float
  192. var metric string
  193. if rowCount > rowLimit {
  194. return fmt.Errorf("PostgreSQL query row limit exceeded, limit %d", rowLimit)
  195. }
  196. values, err := e.getTypedRowData(rows)
  197. if err != nil {
  198. return err
  199. }
  200. switch columnValue := values[timeIndex].(type) {
  201. case int64:
  202. timestamp = float64(columnValue * 1000)
  203. case float64:
  204. timestamp = columnValue * 1000
  205. case time.Time:
  206. timestamp = float64(columnValue.UnixNano() / 1e6)
  207. default:
  208. return fmt.Errorf("Invalid type for column time, must be of type timestamp or unix timestamp, got: %T %v", columnValue, columnValue)
  209. }
  210. if metricIndex >= 0 {
  211. if columnValue, ok := values[metricIndex].(string); ok == true {
  212. metric = columnValue
  213. } else {
  214. return fmt.Errorf("Column metric must be of type char,varchar or text, got: %T %v", values[metricIndex], values[metricIndex])
  215. }
  216. }
  217. for i, col := range columnNames {
  218. if i == timeIndex || i == metricIndex {
  219. continue
  220. }
  221. switch columnValue := values[i].(type) {
  222. case int64:
  223. value = null.FloatFrom(float64(columnValue))
  224. case float64:
  225. value = null.FloatFrom(columnValue)
  226. case nil:
  227. value.Valid = false
  228. default:
  229. return fmt.Errorf("Value column must have numeric datatype, column: %s type: %T value: %v", col, columnValue, columnValue)
  230. }
  231. if metricIndex == -1 {
  232. metric = col
  233. }
  234. series, exist := pointsBySeries[metric]
  235. if exist == false {
  236. series = &tsdb.TimeSeries{Name: metric}
  237. pointsBySeries[metric] = series
  238. seriesByQueryOrder.PushBack(metric)
  239. }
  240. if fillMissing {
  241. var intervalStart float64
  242. if exist == false {
  243. intervalStart = float64(tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6)
  244. } else {
  245. intervalStart = series.Points[len(series.Points)-1][1].Float64 + fillInterval
  246. }
  247. // align interval start
  248. intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
  249. for i := intervalStart; i < timestamp; i += fillInterval {
  250. series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
  251. rowCount++
  252. }
  253. }
  254. series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)})
  255. e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value)
  256. rowCount++
  257. }
  258. }
  259. for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() {
  260. key := elem.Value.(string)
  261. result.Series = append(result.Series, pointsBySeries[key])
  262. if fillMissing {
  263. series := pointsBySeries[key]
  264. // fill in values from last fetched value till interval end
  265. intervalStart := series.Points[len(series.Points)-1][1].Float64
  266. intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6)
  267. // align interval start
  268. intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
  269. for i := intervalStart + fillInterval; i < intervalEnd; i += fillInterval {
  270. series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
  271. rowCount++
  272. }
  273. }
  274. }
  275. result.Meta.Set("rowCount", rowCount)
  276. return nil
  277. }