浏览代码

Add the datasource of RDBMS (PostgreSQL and MySQL)

Nozomi Anzai 9 年之前
父节点
当前提交
6586cc4029
共有 3 个文件被更改,包括 172 次插入0 次删除
  1. 6 0
      pkg/api/dataproxy.go
  2. 164 0
      pkg/api/sqldb/sqldb.go
  3. 2 0
      pkg/models/datasource.go

+ 6 - 0
pkg/api/dataproxy.go

@@ -9,6 +9,7 @@ import (
 	"time"
 
 	"github.com/grafana/grafana/pkg/api/cloudwatch"
+	"github.com/grafana/grafana/pkg/api/sqldb"
 	"github.com/grafana/grafana/pkg/bus"
 	"github.com/grafana/grafana/pkg/metrics"
 	"github.com/grafana/grafana/pkg/middleware"
@@ -43,6 +44,7 @@ func NewReverseProxy(ds *m.DataSource, proxyPath string, targetUrl *url.URL) *ht
 		} else if ds.Type == m.DS_INFLUXDB {
 			req.URL.Path = util.JoinUrlFragments(targetUrl.Path, proxyPath)
 			req.URL.RawQuery = reqQueryVals.Encode()
+			reqQueryVals.Add("db", ds.Database)
 			if !ds.BasicAuth {
 				req.Header.Del("Authorization")
 				req.Header.Add("Authorization", util.GetBasicAuthHeader(ds.User, ds.Password))
@@ -100,6 +102,10 @@ func ProxyDataSourceRequest(c *middleware.Context) {
 
 	if ds.Type == m.DS_CLOUDWATCH {
 		cloudwatch.HandleRequest(c, ds)
+
+	} else if ds.Type == m.DS_SQLDB {
+		sqldb.HandleRequest(c, ds)
+
 	} else {
 		proxyPath := c.Params("*")
 		proxy := NewReverseProxy(ds, proxyPath, targetUrl)

+ 164 - 0
pkg/api/sqldb/sqldb.go

@@ -0,0 +1,164 @@
+package sqldb
+
+import (
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io/ioutil"
+	"strings"
+
+	"github.com/grafana/grafana/pkg/log"
+	"github.com/grafana/grafana/pkg/middleware"
+	m "github.com/grafana/grafana/pkg/models"
+
+	_ "github.com/go-sql-driver/mysql"
+	"github.com/go-xorm/core"
+	"github.com/go-xorm/xorm"
+	_ "github.com/lib/pq"
+	_ "github.com/mattn/go-sqlite3"
+)
+
+type sqlDataRequest struct {
+	Query string `json:"query"`
+	Body  []byte `json:"-"`
+}
+
+type seriesStruct struct {
+	Columns []string        `json:"columns"`
+	Name    string          `json:"name"`
+	Values  [][]interface{} `json:"values"`
+}
+
+type resultsStruct struct {
+	Series []seriesStruct `json:"series"`
+}
+
+type dataStruct struct {
+	Results []resultsStruct `json:"results"`
+}
+
+func getEngine(ds *m.DataSource) (*xorm.Engine, error) {
+	dbms, err := ds.JsonData.Get("dbms").String()
+	if err != nil {
+		return nil, errors.New("Invalid DBMS")
+	}
+
+	host, err := ds.JsonData.Get("host").String()
+	if err != nil {
+		return nil, errors.New("Invalid host")
+	}
+
+	port, err := ds.JsonData.Get("port").String()
+	if err != nil {
+		return nil, errors.New("Invalid port")
+	}
+
+	constr := ""
+
+	switch dbms {
+	case "mysql":
+		constr = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8",
+			ds.User, ds.Password, host, port, ds.Database)
+
+	case "postgres":
+		sslEnabled, _ := ds.JsonData.Get("ssl").Bool()
+		sslMode := "disable"
+		if sslEnabled {
+			sslMode = "require"
+		}
+
+		constr = fmt.Sprintf("user=%s password=%s host=%s port=%s dbname=%s sslmode=%s",
+			ds.User, ds.Password, host, port, ds.Database, sslMode)
+
+	default:
+		return nil, fmt.Errorf("Unknown DBMS: %s", dbms)
+	}
+
+	return xorm.NewEngine(dbms, constr)
+}
+
+func getData(db *core.DB, req *sqlDataRequest) (interface{}, error) {
+	queries := strings.Split(req.Query, ";")
+
+	data := dataStruct{}
+	data.Results = make([]resultsStruct, 1)
+	data.Results[0].Series = make([]seriesStruct, 0)
+
+	for i := range queries {
+		if queries[i] == "" {
+			continue
+		}
+
+		rows, err := db.Query(queries[i])
+		if err != nil {
+			return nil, err
+		}
+		defer rows.Close()
+
+		name := fmt.Sprintf("table_%d", i+1)
+		series, err := arrangeResult(rows, name)
+		if err != nil {
+			return nil, err
+		}
+		data.Results[0].Series = append(data.Results[0].Series, series.(seriesStruct))
+	}
+
+	return data, nil
+}
+
+func arrangeResult(rows *core.Rows, name string) (interface{}, error) {
+	columnNames, err := rows.Columns()
+
+	series := seriesStruct{}
+	series.Columns = columnNames
+	series.Name = name
+
+	for rows.Next() {
+		columnValues := make([]interface{}, len(columnNames))
+
+		err = rows.ScanSlice(&columnValues)
+		if err != nil {
+			return nil, err
+		}
+
+		// bytes -> string
+		for i := range columnValues {
+			switch columnValues[i].(type) {
+			case []byte:
+				columnValues[i] = fmt.Sprintf("%s", columnValues[i])
+			}
+		}
+
+		series.Values = append(series.Values, columnValues)
+	}
+
+	return series, err
+}
+
+func HandleRequest(c *middleware.Context, ds *m.DataSource) {
+	var req sqlDataRequest
+	req.Body, _ = ioutil.ReadAll(c.Req.Request.Body)
+	json.Unmarshal(req.Body, &req)
+
+	log.Debug("SQL request: query='%v'", req.Query)
+
+	engine, err := getEngine(ds)
+	if err != nil {
+		c.JsonApiErr(500, "Unable to open SQL connection", err)
+		return
+	}
+	defer engine.Close()
+
+	session := engine.NewSession()
+	defer session.Close()
+
+	db := session.DB()
+
+	result, err := getData(db, &req)
+	if err != nil {
+		c.JsonApiErr(500, fmt.Sprintf("Data error: %v, Query: %s", err.Error(), req.Query), err)
+		return
+	}
+
+	c.JSON(200, result)
+}

+ 2 - 0
pkg/models/datasource.go

@@ -16,6 +16,7 @@ const (
 	DS_CLOUDWATCH    = "cloudwatch"
 	DS_KAIROSDB      = "kairosdb"
 	DS_PROMETHEUS    = "prometheus"
+	DS_SQLDB         = "sqldb"
 	DS_ACCESS_DIRECT = "direct"
 	DS_ACCESS_PROXY  = "proxy"
 )
@@ -59,6 +60,7 @@ var knownDatasourcePlugins map[string]bool = map[string]bool{
 	DS_CLOUDWATCH:  true,
 	DS_PROMETHEUS:  true,
 	DS_OPENTSDB:    true,
+	DS_SQLDB:       true,
 	"opennms":      true,
 	"druid":        true,
 	"dalmatinerdb": true,