mysql.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package mysql
  2. import (
  3. "database/sql"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "strconv"
  8. "strings"
  9. "crypto/tls"
  10. "crypto/x509"
  11. "github.com/go-sql-driver/mysql"
  12. "github.com/go-xorm/core"
  13. "github.com/grafana/grafana/pkg/log"
  14. "github.com/grafana/grafana/pkg/models"
  15. "github.com/grafana/grafana/pkg/tsdb"
  16. )
  17. func init() {
  18. tsdb.RegisterTsdbQueryEndpoint("mysql", newMysqlQueryEndpoint)
  19. }
  20. func newMysqlQueryEndpoint(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
  21. logger := log.New("tsdb.mysql")
  22. protocol := "tcp"
  23. if strings.HasPrefix(datasource.Url, "/") {
  24. protocol = "unix"
  25. }
  26. cnnstr := fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&parseTime=true&loc=UTC&allowNativePasswords=true",
  27. datasource.User,
  28. datasource.Password,
  29. protocol,
  30. datasource.Url,
  31. datasource.Database,
  32. )
  33. var tlsSkipVerify, tlsAuth, tlsAuthWithCACert bool
  34. if datasource.JsonData != nil {
  35. tlsAuth = datasource.JsonData.Get("tlsAuth").MustBool(false)
  36. tlsAuthWithCACert = datasource.JsonData.Get("tlsAuthWithCACert").MustBool(false)
  37. tlsSkipVerify = datasource.JsonData.Get("tlsSkipVerify").MustBool(false)
  38. }
  39. if tlsAuth || tlsAuthWithCACert {
  40. secureJsonData := datasource.SecureJsonData.Decrypt()
  41. tlsConfig := tls.Config{
  42. InsecureSkipVerify: tlsSkipVerify,
  43. }
  44. if tlsAuthWithCACert && len(secureJsonData["tlsCACert"]) > 0 {
  45. caPool := x509.NewCertPool()
  46. if ok := caPool.AppendCertsFromPEM([]byte(secureJsonData["tlsCACert"])); !ok {
  47. return nil, errors.New("Failed to parse TLS CA PEM certificate")
  48. }
  49. tlsConfig.RootCAs = caPool
  50. }
  51. if tlsAuth {
  52. certs, err := tls.X509KeyPair([]byte(secureJsonData["tlsClientCert"]), []byte(secureJsonData["tlsClientKey"]))
  53. if err != nil {
  54. return nil, err
  55. }
  56. clientCert := make([]tls.Certificate, 0, 1)
  57. clientCert = append(clientCert, certs)
  58. tlsConfig.Certificates = clientCert
  59. }
  60. mysql.RegisterTLSConfig(datasource.Name, &tlsConfig)
  61. cnnstr += "&tls=" + datasource.Name
  62. }
  63. logger.Debug("getEngine", "connection", cnnstr)
  64. config := tsdb.SqlQueryEndpointConfiguration{
  65. DriverName: "mysql",
  66. ConnectionString: cnnstr,
  67. Datasource: datasource,
  68. TimeColumnNames: []string{"time", "time_sec"},
  69. MetricColumnTypes: []string{"CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT"},
  70. }
  71. rowTransformer := mysqlRowTransformer{
  72. log: logger,
  73. }
  74. return tsdb.NewSqlQueryEndpoint(&config, &rowTransformer, newMysqlMacroEngine(), logger)
  75. }
  76. type mysqlRowTransformer struct {
  77. log log.Logger
  78. }
  79. func (t *mysqlRowTransformer) Transform(columnTypes []*sql.ColumnType, rows *core.Rows) (tsdb.RowValues, error) {
  80. values := make([]interface{}, len(columnTypes))
  81. for i := range values {
  82. scanType := columnTypes[i].ScanType()
  83. values[i] = reflect.New(scanType).Interface()
  84. if columnTypes[i].DatabaseTypeName() == "BIT" {
  85. values[i] = new([]byte)
  86. }
  87. }
  88. if err := rows.Scan(values...); err != nil {
  89. return nil, err
  90. }
  91. for i := 0; i < len(columnTypes); i++ {
  92. typeName := reflect.ValueOf(values[i]).Type().String()
  93. switch typeName {
  94. case "*sql.RawBytes":
  95. values[i] = string(*values[i].(*sql.RawBytes))
  96. case "*mysql.NullTime":
  97. sqlTime := (*values[i].(*mysql.NullTime))
  98. if sqlTime.Valid {
  99. values[i] = sqlTime.Time
  100. } else {
  101. values[i] = nil
  102. }
  103. case "*sql.NullInt64":
  104. nullInt64 := (*values[i].(*sql.NullInt64))
  105. if nullInt64.Valid {
  106. values[i] = nullInt64.Int64
  107. } else {
  108. values[i] = nil
  109. }
  110. case "*sql.NullFloat64":
  111. nullFloat64 := (*values[i].(*sql.NullFloat64))
  112. if nullFloat64.Valid {
  113. values[i] = nullFloat64.Float64
  114. } else {
  115. values[i] = nil
  116. }
  117. }
  118. if columnTypes[i].DatabaseTypeName() == "DECIMAL" {
  119. f, err := strconv.ParseFloat(values[i].(string), 64)
  120. if err == nil {
  121. values[i] = f
  122. } else {
  123. values[i] = nil
  124. }
  125. }
  126. }
  127. return values, nil
  128. }