mysql.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package mysql
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "reflect"
  6. "strconv"
  7. "strings"
  8. "github.com/go-sql-driver/mysql"
  9. "github.com/go-xorm/core"
  10. "github.com/grafana/grafana/pkg/log"
  11. "github.com/grafana/grafana/pkg/models"
  12. "github.com/grafana/grafana/pkg/tsdb"
  13. )
  14. func init() {
  15. tsdb.RegisterTsdbQueryEndpoint("mysql", newMysqlQueryEndpoint)
  16. }
  17. func newMysqlQueryEndpoint(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
  18. logger := log.New("tsdb.mysql")
  19. protocol := "tcp"
  20. if strings.HasPrefix(datasource.Url, "/") {
  21. protocol = "unix"
  22. }
  23. cnnstr := fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&parseTime=true&loc=UTC&allowNativePasswords=true",
  24. datasource.User,
  25. datasource.Password,
  26. protocol,
  27. datasource.Url,
  28. datasource.Database,
  29. )
  30. logger.Debug("getEngine", "connection", cnnstr)
  31. config := tsdb.SqlQueryEndpointConfiguration{
  32. DriverName: "mysql",
  33. ConnectionString: cnnstr,
  34. Datasource: datasource,
  35. TimeColumnNames: []string{"time", "time_sec"},
  36. MetricColumnTypes: []string{"CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT"},
  37. }
  38. rowTransformer := mysqlRowTransformer{
  39. log: logger,
  40. }
  41. return tsdb.NewSqlQueryEndpoint(&config, &rowTransformer, newMysqlMacroEngine(), logger)
  42. }
  43. type mysqlRowTransformer struct {
  44. log log.Logger
  45. }
  46. func (t *mysqlRowTransformer) Transform(columnTypes []*sql.ColumnType, rows *core.Rows) (tsdb.RowValues, error) {
  47. values := make([]interface{}, len(columnTypes))
  48. for i := range values {
  49. scanType := columnTypes[i].ScanType()
  50. values[i] = reflect.New(scanType).Interface()
  51. if columnTypes[i].DatabaseTypeName() == "BIT" {
  52. values[i] = new([]byte)
  53. }
  54. }
  55. if err := rows.Scan(values...); err != nil {
  56. return nil, err
  57. }
  58. for i := 0; i < len(columnTypes); i++ {
  59. typeName := reflect.ValueOf(values[i]).Type().String()
  60. switch typeName {
  61. case "*sql.RawBytes":
  62. values[i] = string(*values[i].(*sql.RawBytes))
  63. case "*mysql.NullTime":
  64. sqlTime := (*values[i].(*mysql.NullTime))
  65. if sqlTime.Valid {
  66. values[i] = sqlTime.Time
  67. } else {
  68. values[i] = nil
  69. }
  70. case "*sql.NullInt64":
  71. nullInt64 := (*values[i].(*sql.NullInt64))
  72. if nullInt64.Valid {
  73. values[i] = nullInt64.Int64
  74. } else {
  75. values[i] = nil
  76. }
  77. case "*sql.NullFloat64":
  78. nullFloat64 := (*values[i].(*sql.NullFloat64))
  79. if nullFloat64.Valid {
  80. values[i] = nullFloat64.Float64
  81. } else {
  82. values[i] = nil
  83. }
  84. }
  85. if columnTypes[i].DatabaseTypeName() == "DECIMAL" {
  86. f, err := strconv.ParseFloat(values[i].(string), 64)
  87. if err == nil {
  88. values[i] = f
  89. } else {
  90. values[i] = nil
  91. }
  92. }
  93. }
  94. return values, nil
  95. }