mysql.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package mysql
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "reflect"
  6. "strconv"
  7. "strings"
  8. "github.com/grafana/grafana/pkg/setting"
  9. "github.com/go-sql-driver/mysql"
  10. "github.com/go-xorm/core"
  11. "github.com/grafana/grafana/pkg/infra/log"
  12. "github.com/grafana/grafana/pkg/models"
  13. "github.com/grafana/grafana/pkg/tsdb"
  14. "github.com/grafana/grafana/pkg/tsdb/sqleng"
  15. )
  16. func init() {
  17. tsdb.RegisterTsdbQueryEndpoint("mysql", newMysqlQueryEndpoint)
  18. }
  19. func newMysqlQueryEndpoint(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
  20. logger := log.New("tsdb.mysql")
  21. protocol := "tcp"
  22. if strings.HasPrefix(datasource.Url, "/") {
  23. protocol = "unix"
  24. }
  25. cnnstr := fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&parseTime=true&loc=UTC&allowNativePasswords=true",
  26. datasource.User,
  27. datasource.DecryptedPassword(),
  28. protocol,
  29. datasource.Url,
  30. datasource.Database,
  31. )
  32. tlsConfig, err := datasource.GetTLSConfig()
  33. if err != nil {
  34. return nil, err
  35. }
  36. if tlsConfig.RootCAs != nil || len(tlsConfig.Certificates) > 0 {
  37. tlsConfigString := fmt.Sprintf("ds%d", datasource.Id)
  38. mysql.RegisterTLSConfig(tlsConfigString, tlsConfig)
  39. cnnstr += "&tls=" + tlsConfigString
  40. }
  41. if setting.Env == setting.DEV {
  42. logger.Debug("getEngine", "connection", cnnstr)
  43. }
  44. config := sqleng.SqlQueryEndpointConfiguration{
  45. DriverName: "mysql",
  46. ConnectionString: cnnstr,
  47. Datasource: datasource,
  48. TimeColumnNames: []string{"time", "time_sec"},
  49. MetricColumnTypes: []string{"CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT"},
  50. }
  51. rowTransformer := mysqlRowTransformer{
  52. log: logger,
  53. }
  54. return sqleng.NewSqlQueryEndpoint(&config, &rowTransformer, newMysqlMacroEngine(), logger)
  55. }
  56. type mysqlRowTransformer struct {
  57. log log.Logger
  58. }
  59. func (t *mysqlRowTransformer) Transform(columnTypes []*sql.ColumnType, rows *core.Rows) (tsdb.RowValues, error) {
  60. values := make([]interface{}, len(columnTypes))
  61. for i := range values {
  62. scanType := columnTypes[i].ScanType()
  63. values[i] = reflect.New(scanType).Interface()
  64. if columnTypes[i].DatabaseTypeName() == "BIT" {
  65. values[i] = new([]byte)
  66. }
  67. }
  68. if err := rows.Scan(values...); err != nil {
  69. return nil, err
  70. }
  71. for i := 0; i < len(columnTypes); i++ {
  72. typeName := reflect.ValueOf(values[i]).Type().String()
  73. switch typeName {
  74. case "*sql.RawBytes":
  75. values[i] = string(*values[i].(*sql.RawBytes))
  76. case "*mysql.NullTime":
  77. sqlTime := (*values[i].(*mysql.NullTime))
  78. if sqlTime.Valid {
  79. values[i] = sqlTime.Time
  80. } else {
  81. values[i] = nil
  82. }
  83. case "*sql.NullInt64":
  84. nullInt64 := (*values[i].(*sql.NullInt64))
  85. if nullInt64.Valid {
  86. values[i] = nullInt64.Int64
  87. } else {
  88. values[i] = nil
  89. }
  90. case "*sql.NullFloat64":
  91. nullFloat64 := (*values[i].(*sql.NullFloat64))
  92. if nullFloat64.Valid {
  93. values[i] = nullFloat64.Float64
  94. } else {
  95. values[i] = nil
  96. }
  97. }
  98. if columnTypes[i].DatabaseTypeName() == "DECIMAL" {
  99. f, err := strconv.ParseFloat(values[i].(string), 64)
  100. if err == nil {
  101. values[i] = f
  102. } else {
  103. values[i] = nil
  104. }
  105. }
  106. }
  107. return values, nil
  108. }