mysql.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package mysql
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "github.com/grafana/grafana/pkg/setting"
  6. "reflect"
  7. "strconv"
  8. "strings"
  9. "github.com/go-sql-driver/mysql"
  10. "github.com/go-xorm/core"
  11. "github.com/grafana/grafana/pkg/infra/log"
  12. "github.com/grafana/grafana/pkg/models"
  13. "github.com/grafana/grafana/pkg/tsdb"
  14. )
  15. func init() {
  16. tsdb.RegisterTsdbQueryEndpoint("mysql", newMysqlQueryEndpoint)
  17. }
  18. func newMysqlQueryEndpoint(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
  19. logger := log.New("tsdb.mysql")
  20. protocol := "tcp"
  21. if strings.HasPrefix(datasource.Url, "/") {
  22. protocol = "unix"
  23. }
  24. cnnstr := fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&parseTime=true&loc=UTC&allowNativePasswords=true",
  25. datasource.User,
  26. datasource.DecryptedPassword(),
  27. protocol,
  28. datasource.Url,
  29. datasource.Database,
  30. )
  31. tlsConfig, err := datasource.GetTLSConfig()
  32. if err != nil {
  33. return nil, err
  34. }
  35. if tlsConfig.RootCAs != nil || len(tlsConfig.Certificates) > 0 {
  36. tlsConfigString := fmt.Sprintf("ds%d", datasource.Id)
  37. mysql.RegisterTLSConfig(tlsConfigString, tlsConfig)
  38. cnnstr += "&tls=" + tlsConfigString
  39. }
  40. if setting.Env == setting.DEV {
  41. logger.Debug("getEngine", "connection", cnnstr)
  42. }
  43. config := tsdb.SqlQueryEndpointConfiguration{
  44. DriverName: "mysql",
  45. ConnectionString: cnnstr,
  46. Datasource: datasource,
  47. TimeColumnNames: []string{"time", "time_sec"},
  48. MetricColumnTypes: []string{"CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT"},
  49. }
  50. rowTransformer := mysqlRowTransformer{
  51. log: logger,
  52. }
  53. return tsdb.NewSqlQueryEndpoint(&config, &rowTransformer, newMysqlMacroEngine(), logger)
  54. }
  55. type mysqlRowTransformer struct {
  56. log log.Logger
  57. }
  58. func (t *mysqlRowTransformer) Transform(columnTypes []*sql.ColumnType, rows *core.Rows) (tsdb.RowValues, error) {
  59. values := make([]interface{}, len(columnTypes))
  60. for i := range values {
  61. scanType := columnTypes[i].ScanType()
  62. values[i] = reflect.New(scanType).Interface()
  63. if columnTypes[i].DatabaseTypeName() == "BIT" {
  64. values[i] = new([]byte)
  65. }
  66. }
  67. if err := rows.Scan(values...); err != nil {
  68. return nil, err
  69. }
  70. for i := 0; i < len(columnTypes); i++ {
  71. typeName := reflect.ValueOf(values[i]).Type().String()
  72. switch typeName {
  73. case "*sql.RawBytes":
  74. values[i] = string(*values[i].(*sql.RawBytes))
  75. case "*mysql.NullTime":
  76. sqlTime := (*values[i].(*mysql.NullTime))
  77. if sqlTime.Valid {
  78. values[i] = sqlTime.Time
  79. } else {
  80. values[i] = nil
  81. }
  82. case "*sql.NullInt64":
  83. nullInt64 := (*values[i].(*sql.NullInt64))
  84. if nullInt64.Valid {
  85. values[i] = nullInt64.Int64
  86. } else {
  87. values[i] = nil
  88. }
  89. case "*sql.NullFloat64":
  90. nullFloat64 := (*values[i].(*sql.NullFloat64))
  91. if nullFloat64.Valid {
  92. values[i] = nullFloat64.Float64
  93. } else {
  94. values[i] = nil
  95. }
  96. }
  97. if columnTypes[i].DatabaseTypeName() == "DECIMAL" {
  98. f, err := strconv.ParseFloat(values[i].(string), 64)
  99. if err == nil {
  100. values[i] = f
  101. } else {
  102. values[i] = nil
  103. }
  104. }
  105. }
  106. return values, nil
  107. }