mysql.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. package mysql
  2. import (
  3. "container/list"
  4. "context"
  5. "database/sql"
  6. "fmt"
  7. "strconv"
  8. "time"
  9. "github.com/go-sql-driver/mysql"
  10. "github.com/go-xorm/core"
  11. "github.com/grafana/grafana/pkg/components/null"
  12. "github.com/grafana/grafana/pkg/log"
  13. "github.com/grafana/grafana/pkg/models"
  14. "github.com/grafana/grafana/pkg/tsdb"
  15. )
  16. type MysqlQueryEndpoint struct {
  17. sqlEngine tsdb.SqlEngine
  18. log log.Logger
  19. }
  20. func init() {
  21. tsdb.RegisterTsdbQueryEndpoint("mysql", NewMysqlQueryEndpoint)
  22. }
  23. func NewMysqlQueryEndpoint(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
  24. endpoint := &MysqlQueryEndpoint{
  25. log: log.New("tsdb.mysql"),
  26. }
  27. endpoint.sqlEngine = &tsdb.DefaultSqlEngine{
  28. MacroEngine: NewMysqlMacroEngine(),
  29. }
  30. cnnstr := fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&parseTime=true&loc=UTC",
  31. datasource.User,
  32. datasource.Password,
  33. "tcp",
  34. datasource.Url,
  35. datasource.Database,
  36. )
  37. endpoint.log.Debug("getEngine", "connection", cnnstr)
  38. if err := endpoint.sqlEngine.InitEngine("mysql", datasource, cnnstr); err != nil {
  39. return nil, err
  40. }
  41. return endpoint, nil
  42. }
  43. // Query is the main function for the MysqlExecutor
  44. func (e *MysqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) {
  45. return e.sqlEngine.Query(ctx, dsInfo, tsdbQuery, e.transformToTimeSeries, e.transformToTable)
  46. }
  47. func (e MysqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult) error {
  48. columnNames, err := rows.Columns()
  49. columnCount := len(columnNames)
  50. if err != nil {
  51. return err
  52. }
  53. table := &tsdb.Table{
  54. Columns: make([]tsdb.TableColumn, columnCount),
  55. Rows: make([]tsdb.RowValues, 0),
  56. }
  57. for i, name := range columnNames {
  58. table.Columns[i].Text = name
  59. }
  60. columnTypes, err := rows.ColumnTypes()
  61. if err != nil {
  62. return err
  63. }
  64. rowLimit := 1000000
  65. rowCount := 0
  66. for ; rows.Next(); rowCount++ {
  67. if rowCount > rowLimit {
  68. return fmt.Errorf("MySQL query row limit exceeded, limit %d", rowLimit)
  69. }
  70. values, err := e.getTypedRowData(columnTypes, rows)
  71. if err != nil {
  72. return err
  73. }
  74. table.Rows = append(table.Rows, values)
  75. }
  76. result.Tables = append(result.Tables, table)
  77. result.Meta.Set("rowCount", rowCount)
  78. return nil
  79. }
  80. func (e MysqlQueryEndpoint) getTypedRowData(types []*sql.ColumnType, rows *core.Rows) (tsdb.RowValues, error) {
  81. values := make([]interface{}, len(types))
  82. for i, stype := range types {
  83. e.log.Debug("type", "type", stype)
  84. switch stype.DatabaseTypeName() {
  85. case mysql.FieldTypeNameTiny:
  86. values[i] = new(int8)
  87. case mysql.FieldTypeNameInt24:
  88. values[i] = new(int32)
  89. case mysql.FieldTypeNameShort:
  90. values[i] = new(int16)
  91. case mysql.FieldTypeNameVarString:
  92. values[i] = new(string)
  93. case mysql.FieldTypeNameVarChar:
  94. values[i] = new(string)
  95. case mysql.FieldTypeNameLong:
  96. values[i] = new(int)
  97. case mysql.FieldTypeNameLongLong:
  98. values[i] = new(int64)
  99. case mysql.FieldTypeNameDouble:
  100. values[i] = new(float64)
  101. case mysql.FieldTypeNameDecimal:
  102. values[i] = new(float32)
  103. case mysql.FieldTypeNameNewDecimal:
  104. values[i] = new(float64)
  105. case mysql.FieldTypeNameFloat:
  106. values[i] = new(float64)
  107. case mysql.FieldTypeNameTimestamp:
  108. values[i] = new(time.Time)
  109. case mysql.FieldTypeNameDateTime:
  110. values[i] = new(time.Time)
  111. case mysql.FieldTypeNameTime:
  112. values[i] = new(string)
  113. case mysql.FieldTypeNameYear:
  114. values[i] = new(int16)
  115. case mysql.FieldTypeNameNULL:
  116. values[i] = nil
  117. case mysql.FieldTypeNameBit:
  118. values[i] = new([]byte)
  119. case mysql.FieldTypeNameBLOB:
  120. values[i] = new(string)
  121. case mysql.FieldTypeNameTinyBLOB:
  122. values[i] = new(string)
  123. case mysql.FieldTypeNameMediumBLOB:
  124. values[i] = new(string)
  125. case mysql.FieldTypeNameLongBLOB:
  126. values[i] = new(string)
  127. case mysql.FieldTypeNameString:
  128. values[i] = new(string)
  129. case mysql.FieldTypeNameDate:
  130. values[i] = new(string)
  131. default:
  132. return nil, fmt.Errorf("Database type %s not supported", stype.DatabaseTypeName())
  133. }
  134. }
  135. if err := rows.Scan(values...); err != nil {
  136. return nil, err
  137. }
  138. return values, nil
  139. }
  140. func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.Rows, result *tsdb.QueryResult) error {
  141. pointsBySeries := make(map[string]*tsdb.TimeSeries)
  142. seriesByQueryOrder := list.New()
  143. columnNames, err := rows.Columns()
  144. if err != nil {
  145. return err
  146. }
  147. rowData := NewStringStringScan(columnNames)
  148. rowLimit := 1000000
  149. rowCount := 0
  150. for ; rows.Next(); rowCount++ {
  151. if rowCount > rowLimit {
  152. return fmt.Errorf("MySQL query row limit exceeded, limit %d", rowLimit)
  153. }
  154. err := rowData.Update(rows.Rows)
  155. if err != nil {
  156. e.log.Error("MySQL response parsing", "error", err)
  157. return fmt.Errorf("MySQL response parsing error %v", err)
  158. }
  159. if rowData.metric == "" {
  160. rowData.metric = "Unknown"
  161. }
  162. if !rowData.time.Valid {
  163. return fmt.Errorf("Found row with no time value")
  164. }
  165. if series, exist := pointsBySeries[rowData.metric]; exist {
  166. series.Points = append(series.Points, tsdb.TimePoint{rowData.value, rowData.time})
  167. } else {
  168. series := &tsdb.TimeSeries{Name: rowData.metric}
  169. series.Points = append(series.Points, tsdb.TimePoint{rowData.value, rowData.time})
  170. pointsBySeries[rowData.metric] = series
  171. seriesByQueryOrder.PushBack(rowData.metric)
  172. }
  173. }
  174. for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() {
  175. key := elem.Value.(string)
  176. result.Series = append(result.Series, pointsBySeries[key])
  177. }
  178. result.Meta.Set("rowCount", rowCount)
  179. return nil
  180. }
  181. type stringStringScan struct {
  182. rowPtrs []interface{}
  183. rowValues []string
  184. columnNames []string
  185. columnCount int
  186. time null.Float
  187. value null.Float
  188. metric string
  189. }
  190. func NewStringStringScan(columnNames []string) *stringStringScan {
  191. s := &stringStringScan{
  192. columnCount: len(columnNames),
  193. columnNames: columnNames,
  194. rowPtrs: make([]interface{}, len(columnNames)),
  195. rowValues: make([]string, len(columnNames)),
  196. }
  197. for i := 0; i < s.columnCount; i++ {
  198. s.rowPtrs[i] = new(sql.RawBytes)
  199. }
  200. return s
  201. }
  202. func (s *stringStringScan) Update(rows *sql.Rows) error {
  203. if err := rows.Scan(s.rowPtrs...); err != nil {
  204. return err
  205. }
  206. s.time = null.FloatFromPtr(nil)
  207. s.value = null.FloatFromPtr(nil)
  208. for i := 0; i < s.columnCount; i++ {
  209. if rb, ok := s.rowPtrs[i].(*sql.RawBytes); ok {
  210. s.rowValues[i] = string(*rb)
  211. switch s.columnNames[i] {
  212. case "time_sec":
  213. if sec, err := strconv.ParseInt(s.rowValues[i], 10, 64); err == nil {
  214. s.time = null.FloatFrom(float64(sec * 1000))
  215. }
  216. case "value":
  217. if value, err := strconv.ParseFloat(s.rowValues[i], 64); err == nil {
  218. s.value = null.FloatFrom(value)
  219. }
  220. case "metric":
  221. s.metric = s.rowValues[i]
  222. }
  223. *rb = nil // reset pointer to discard current value to avoid a bug
  224. } else {
  225. return fmt.Errorf("Cannot convert index %d column %s to type *sql.RawBytes", i, s.columnNames[i])
  226. }
  227. }
  228. return nil
  229. }