mysql.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. package mysql
  2. import (
  3. "container/list"
  4. "context"
  5. "database/sql"
  6. "fmt"
  7. "math"
  8. "reflect"
  9. "strconv"
  10. "time"
  11. "github.com/go-sql-driver/mysql"
  12. "github.com/go-xorm/core"
  13. "github.com/grafana/grafana/pkg/components/null"
  14. "github.com/grafana/grafana/pkg/log"
  15. "github.com/grafana/grafana/pkg/models"
  16. "github.com/grafana/grafana/pkg/tsdb"
  17. )
  18. type MysqlQueryEndpoint struct {
  19. sqlEngine tsdb.SqlEngine
  20. log log.Logger
  21. }
  22. func init() {
  23. tsdb.RegisterTsdbQueryEndpoint("mysql", NewMysqlQueryEndpoint)
  24. }
  25. func NewMysqlQueryEndpoint(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
  26. endpoint := &MysqlQueryEndpoint{
  27. log: log.New("tsdb.mysql"),
  28. }
  29. endpoint.sqlEngine = &tsdb.DefaultSqlEngine{
  30. MacroEngine: NewMysqlMacroEngine(),
  31. }
  32. cnnstr := fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&parseTime=true&loc=UTC&allowNativePasswords=true",
  33. datasource.User,
  34. datasource.Password,
  35. "tcp",
  36. datasource.Url,
  37. datasource.Database,
  38. )
  39. endpoint.log.Debug("getEngine", "connection", cnnstr)
  40. if err := endpoint.sqlEngine.InitEngine("mysql", datasource, cnnstr); err != nil {
  41. return nil, err
  42. }
  43. return endpoint, nil
  44. }
  45. // Query is the main function for the MysqlExecutor
  46. func (e *MysqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) {
  47. return e.sqlEngine.Query(ctx, dsInfo, tsdbQuery, e.transformToTimeSeries, e.transformToTable)
  48. }
  49. func (e MysqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
  50. columnNames, err := rows.Columns()
  51. columnCount := len(columnNames)
  52. if err != nil {
  53. return err
  54. }
  55. table := &tsdb.Table{
  56. Columns: make([]tsdb.TableColumn, columnCount),
  57. Rows: make([]tsdb.RowValues, 0),
  58. }
  59. for i, name := range columnNames {
  60. table.Columns[i].Text = name
  61. }
  62. rowLimit := 1000000
  63. rowCount := 0
  64. timeIndex := -1
  65. // check if there is a column named time
  66. for i, col := range columnNames {
  67. switch col {
  68. case "time_sec":
  69. timeIndex = i
  70. }
  71. }
  72. for ; rows.Next(); rowCount++ {
  73. if rowCount > rowLimit {
  74. return fmt.Errorf("MySQL query row limit exceeded, limit %d", rowLimit)
  75. }
  76. values, err := e.getTypedRowData(rows)
  77. if err != nil {
  78. return err
  79. }
  80. // for annotations, convert to epoch
  81. if timeIndex != -1 {
  82. switch value := values[timeIndex].(type) {
  83. case time.Time:
  84. values[timeIndex] = float64(value.UnixNano() / 1e9)
  85. }
  86. }
  87. table.Rows = append(table.Rows, values)
  88. }
  89. result.Tables = append(result.Tables, table)
  90. result.Meta.Set("rowCount", rowCount)
  91. return nil
  92. }
  93. func (e MysqlQueryEndpoint) getTypedRowData(rows *core.Rows) (tsdb.RowValues, error) {
  94. types, err := rows.ColumnTypes()
  95. if err != nil {
  96. return nil, err
  97. }
  98. values := make([]interface{}, len(types))
  99. for i := range values {
  100. scanType := types[i].ScanType()
  101. values[i] = reflect.New(scanType).Interface()
  102. if types[i].DatabaseTypeName() == "BIT" {
  103. values[i] = new([]byte)
  104. }
  105. }
  106. if err := rows.Scan(values...); err != nil {
  107. return nil, err
  108. }
  109. for i := 0; i < len(types); i++ {
  110. typeName := reflect.ValueOf(values[i]).Type().String()
  111. switch typeName {
  112. case "*sql.RawBytes":
  113. values[i] = string(*values[i].(*sql.RawBytes))
  114. case "*mysql.NullTime":
  115. sqlTime := (*values[i].(*mysql.NullTime))
  116. if sqlTime.Valid {
  117. values[i] = sqlTime.Time
  118. } else {
  119. values[i] = nil
  120. }
  121. case "*sql.NullInt64":
  122. nullInt64 := (*values[i].(*sql.NullInt64))
  123. if nullInt64.Valid {
  124. values[i] = nullInt64.Int64
  125. } else {
  126. values[i] = nil
  127. }
  128. case "*sql.NullFloat64":
  129. nullFloat64 := (*values[i].(*sql.NullFloat64))
  130. if nullFloat64.Valid {
  131. values[i] = nullFloat64.Float64
  132. } else {
  133. values[i] = nil
  134. }
  135. }
  136. if types[i].DatabaseTypeName() == "DECIMAL" {
  137. f, err := strconv.ParseFloat(values[i].(string), 64)
  138. if err == nil {
  139. values[i] = f
  140. } else {
  141. values[i] = nil
  142. }
  143. }
  144. }
  145. return values, nil
  146. }
  147. func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult, tsdbQuery *tsdb.TsdbQuery) error {
  148. pointsBySeries := make(map[string]*tsdb.TimeSeries)
  149. seriesByQueryOrder := list.New()
  150. columnNames, err := rows.Columns()
  151. if err != nil {
  152. return err
  153. }
  154. rowData := NewStringStringScan(columnNames)
  155. rowLimit := 1000000
  156. rowCount := 0
  157. fillMissing := query.Model.Get("fill").MustBool(false)
  158. var fillInterval float64
  159. fillValue := null.Float{}
  160. if fillMissing {
  161. fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000
  162. if query.Model.Get("fillNull").MustBool(false) == false {
  163. fillValue.Float64 = query.Model.Get("fillValue").MustFloat64()
  164. fillValue.Valid = true
  165. }
  166. }
  167. for ; rows.Next(); rowCount++ {
  168. if rowCount > rowLimit {
  169. return fmt.Errorf("MySQL query row limit exceeded, limit %d", rowLimit)
  170. }
  171. err := rowData.Update(rows.Rows)
  172. if err != nil {
  173. e.log.Error("MySQL response parsing", "error", err)
  174. return fmt.Errorf("MySQL response parsing error %v", err)
  175. }
  176. if rowData.metric == "" {
  177. rowData.metric = "Unknown"
  178. }
  179. if !rowData.time.Valid {
  180. return fmt.Errorf("Found row with no time value")
  181. }
  182. series, exist := pointsBySeries[rowData.metric]
  183. if exist == false {
  184. series = &tsdb.TimeSeries{Name: rowData.metric}
  185. pointsBySeries[rowData.metric] = series
  186. seriesByQueryOrder.PushBack(rowData.metric)
  187. }
  188. if fillMissing {
  189. var intervalStart float64
  190. if exist == false {
  191. intervalStart = float64(tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6)
  192. } else {
  193. intervalStart = series.Points[len(series.Points)-1][1].Float64 + fillInterval
  194. }
  195. // align interval start
  196. intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
  197. for i := intervalStart; i < rowData.time.Float64; i += fillInterval {
  198. series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
  199. rowCount++
  200. }
  201. }
  202. series.Points = append(series.Points, tsdb.TimePoint{rowData.value, rowData.time})
  203. }
  204. for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() {
  205. key := elem.Value.(string)
  206. result.Series = append(result.Series, pointsBySeries[key])
  207. if fillMissing {
  208. series := pointsBySeries[key]
  209. // fill in values from last fetched value till interval end
  210. intervalStart := series.Points[len(series.Points)-1][1].Float64
  211. intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6)
  212. // align interval start
  213. intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
  214. for i := intervalStart + fillInterval; i < intervalEnd; i += fillInterval {
  215. series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
  216. rowCount++
  217. }
  218. }
  219. }
  220. result.Meta.Set("rowCount", rowCount)
  221. return nil
  222. }
  223. type stringStringScan struct {
  224. rowPtrs []interface{}
  225. rowValues []string
  226. columnNames []string
  227. columnCount int
  228. time null.Float
  229. value null.Float
  230. metric string
  231. }
  232. func NewStringStringScan(columnNames []string) *stringStringScan {
  233. s := &stringStringScan{
  234. columnCount: len(columnNames),
  235. columnNames: columnNames,
  236. rowPtrs: make([]interface{}, len(columnNames)),
  237. rowValues: make([]string, len(columnNames)),
  238. }
  239. for i := 0; i < s.columnCount; i++ {
  240. s.rowPtrs[i] = new(sql.RawBytes)
  241. }
  242. return s
  243. }
  244. func (s *stringStringScan) Update(rows *sql.Rows) error {
  245. if err := rows.Scan(s.rowPtrs...); err != nil {
  246. return err
  247. }
  248. s.time = null.FloatFromPtr(nil)
  249. s.value = null.FloatFromPtr(nil)
  250. for i := 0; i < s.columnCount; i++ {
  251. if rb, ok := s.rowPtrs[i].(*sql.RawBytes); ok {
  252. s.rowValues[i] = string(*rb)
  253. switch s.columnNames[i] {
  254. case "time_sec":
  255. if sec, err := strconv.ParseInt(s.rowValues[i], 10, 64); err == nil {
  256. s.time = null.FloatFrom(float64(sec * 1000))
  257. }
  258. case "value":
  259. if value, err := strconv.ParseFloat(s.rowValues[i], 64); err == nil {
  260. s.value = null.FloatFrom(value)
  261. }
  262. case "metric":
  263. s.metric = s.rowValues[i]
  264. }
  265. *rb = nil // reset pointer to discard current value to avoid a bug
  266. } else {
  267. return fmt.Errorf("Cannot convert index %d column %s to type *sql.RawBytes", i, s.columnNames[i])
  268. }
  269. }
  270. return nil
  271. }