mysql.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package mysql
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "github.com/go-xorm/core"
  8. "github.com/go-xorm/xorm"
  9. "github.com/grafana/grafana/pkg/log"
  10. "github.com/grafana/grafana/pkg/models"
  11. "github.com/grafana/grafana/pkg/tsdb"
  12. )
  13. type MysqlExecutor struct {
  14. *models.DataSource
  15. engine *xorm.Engine
  16. log log.Logger
  17. }
  18. type engineCacheType struct {
  19. cache map[int64]*xorm.Engine
  20. versions map[int64]int
  21. sync.Mutex
  22. }
  23. var engineCache = engineCacheType{
  24. cache: make(map[int64]*xorm.Engine),
  25. versions: make(map[int64]int),
  26. }
  27. func NewMysqlExecutor(datasource *models.DataSource) (tsdb.Executor, error) {
  28. engine, err := getEngineFor(datasource)
  29. if err != nil {
  30. return nil, err
  31. }
  32. return &MysqlExecutor{
  33. log: log.New("tsdb.mysql"),
  34. engine: engine,
  35. }, nil
  36. }
  37. func getEngineFor(ds *models.DataSource) (*xorm.Engine, error) {
  38. engineCache.Lock()
  39. defer engineCache.Unlock()
  40. if engine, present := engineCache.cache[ds.Id]; present {
  41. if version, _ := engineCache.versions[ds.Id]; version == ds.Version {
  42. return engine, nil
  43. }
  44. }
  45. cnnstr := fmt.Sprintf("%s:%s@%s(%s)/%s?charset=utf8mb4", ds.User, ds.Password, "tcp", ds.Url, ds.Database)
  46. engine, err := xorm.NewEngine("mysql", cnnstr)
  47. engine.SetMaxConns(10)
  48. engine.SetMaxIdleConns(10)
  49. if err != nil {
  50. return nil, err
  51. }
  52. engineCache.cache[ds.Id] = engine
  53. return engine, nil
  54. }
  55. func init() {
  56. tsdb.RegisterExecutor("graphite", NewMysqlExecutor)
  57. }
  58. func (e *MysqlExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
  59. result := &tsdb.BatchResult{}
  60. session := engine.NewSession()
  61. defer session.Close()
  62. db := session.DB()
  63. result, err := getData(db, &req)
  64. if err != nil {
  65. return
  66. }
  67. }
  68. func getData(db *core.DB, req *sqlDataRequest) (interface{}, error) {
  69. queries := strings.Split(req.Query, ";")
  70. data := dataStruct{}
  71. data.Results = make([]resultsStruct, 1)
  72. data.Results[0].Series = make([]seriesStruct, 0)
  73. for i := range queries {
  74. if queries[i] == "" {
  75. continue
  76. }
  77. rows, err := db.Query(queries[i])
  78. if err != nil {
  79. return nil, err
  80. }
  81. defer rows.Close()
  82. name := fmt.Sprintf("table_%d", i+1)
  83. series, err := arrangeResult(rows, name)
  84. if err != nil {
  85. return nil, err
  86. }
  87. data.Results[0].Series = append(data.Results[0].Series, series.(seriesStruct))
  88. }
  89. return data, nil
  90. }
  91. func arrangeResult(rows *core.Rows, name string) (interface{}, error) {
  92. columnNames, err := rows.Columns()
  93. series := seriesStruct{}
  94. series.Columns = columnNames
  95. series.Name = name
  96. for rows.Next() {
  97. columnValues := make([]interface{}, len(columnNames))
  98. err = rows.ScanSlice(&columnValues)
  99. if err != nil {
  100. return nil, err
  101. }
  102. // bytes -> string
  103. for i := range columnValues {
  104. switch columnValues[i].(type) {
  105. case []byte:
  106. columnValues[i] = fmt.Sprintf("%s", columnValues[i])
  107. }
  108. }
  109. series.Values = append(series.Values, columnValues)
  110. }
  111. return series, err
  112. }
  113. type sqlDataRequest struct {
  114. Query string `json:"query"`
  115. Body []byte `json:"-"`
  116. }
  117. type seriesStruct struct {
  118. Columns []string `json:"columns"`
  119. Name string `json:"name"`
  120. Values [][]interface{} `json:"values"`
  121. }
  122. type resultsStruct struct {
  123. Series []seriesStruct `json:"series"`
  124. }
  125. type dataStruct struct {
  126. Results []resultsStruct `json:"results"`
  127. }