mysql.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package mysql
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "reflect"
  6. "strconv"
  7. "strings"
  8. "github.com/go-sql-driver/mysql"
  9. "github.com/go-xorm/core"
  10. "github.com/grafana/grafana/pkg/log"
  11. "github.com/grafana/grafana/pkg/models"
  12. "github.com/grafana/grafana/pkg/tsdb"
  13. )
  14. func init() {
  15. tsdb.RegisterTsdbQueryEndpoint("mysql", newMysqlQueryEndpoint)
  16. }
  17. func newMysqlQueryEndpoint(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
  18. logger := log.New("tsdb.mysql")
  19. protocol := "tcp"
  20. if strings.HasPrefix(datasource.Url, "/") {
  21. protocol = "unix"
  22. }
  23. cnnstr := fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&parseTime=true&loc=UTC&allowNativePasswords=true",
  24. datasource.User,
  25. datasource.Password,
  26. protocol,
  27. datasource.Url,
  28. datasource.Database,
  29. )
  30. tlsConfig, err := datasource.GetTLSConfig()
  31. if err != nil {
  32. return nil, err
  33. }
  34. if tlsConfig.RootCAs != nil || len(tlsConfig.Certificates) > 0 {
  35. tlsConfigString := fmt.Sprintf("ds%d", datasource.Id)
  36. mysql.RegisterTLSConfig(tlsConfigString, tlsConfig)
  37. cnnstr += "&tls=" + tlsConfigString
  38. }
  39. logger.Debug("getEngine", "connection", cnnstr)
  40. config := tsdb.SqlQueryEndpointConfiguration{
  41. DriverName: "mysql",
  42. ConnectionString: cnnstr,
  43. Datasource: datasource,
  44. TimeColumnNames: []string{"time", "time_sec"},
  45. MetricColumnTypes: []string{"CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT"},
  46. }
  47. rowTransformer := mysqlRowTransformer{
  48. log: logger,
  49. }
  50. return tsdb.NewSqlQueryEndpoint(&config, &rowTransformer, newMysqlMacroEngine(), logger)
  51. }
  52. type mysqlRowTransformer struct {
  53. log log.Logger
  54. }
  55. func (t *mysqlRowTransformer) Transform(columnTypes []*sql.ColumnType, rows *core.Rows) (tsdb.RowValues, error) {
  56. values := make([]interface{}, len(columnTypes))
  57. for i := range values {
  58. scanType := columnTypes[i].ScanType()
  59. values[i] = reflect.New(scanType).Interface()
  60. if columnTypes[i].DatabaseTypeName() == "BIT" {
  61. values[i] = new([]byte)
  62. }
  63. }
  64. if err := rows.Scan(values...); err != nil {
  65. return nil, err
  66. }
  67. for i := 0; i < len(columnTypes); i++ {
  68. typeName := reflect.ValueOf(values[i]).Type().String()
  69. switch typeName {
  70. case "*sql.RawBytes":
  71. values[i] = string(*values[i].(*sql.RawBytes))
  72. case "*mysql.NullTime":
  73. sqlTime := (*values[i].(*mysql.NullTime))
  74. if sqlTime.Valid {
  75. values[i] = sqlTime.Time
  76. } else {
  77. values[i] = nil
  78. }
  79. case "*sql.NullInt64":
  80. nullInt64 := (*values[i].(*sql.NullInt64))
  81. if nullInt64.Valid {
  82. values[i] = nullInt64.Int64
  83. } else {
  84. values[i] = nil
  85. }
  86. case "*sql.NullFloat64":
  87. nullFloat64 := (*values[i].(*sql.NullFloat64))
  88. if nullFloat64.Valid {
  89. values[i] = nullFloat64.Float64
  90. } else {
  91. values[i] = nil
  92. }
  93. }
  94. if columnTypes[i].DatabaseTypeName() == "DECIMAL" {
  95. f, err := strconv.ParseFloat(values[i].(string), 64)
  96. if err == nil {
  97. values[i] = f
  98. } else {
  99. values[i] = nil
  100. }
  101. }
  102. }
  103. return values, nil
  104. }