|
@@ -98,8 +98,12 @@ var NewSqlQueryEndpoint = func(config *SqlQueryEndpointConfiguration, rowTransfo
|
|
|
return nil, err
|
|
return nil, err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- engine.SetMaxOpenConns(10)
|
|
|
|
|
- engine.SetMaxIdleConns(10)
|
|
|
|
|
|
|
+ maxOpenConns := config.Datasource.JsonData.Get("maxOpenConns").MustInt(0)
|
|
|
|
|
+ engine.SetMaxOpenConns(maxOpenConns)
|
|
|
|
|
+ maxIdleConns := config.Datasource.JsonData.Get("maxIdleConns").MustInt(2)
|
|
|
|
|
+ engine.SetMaxIdleConns(maxIdleConns)
|
|
|
|
|
+ connMaxLifetime := config.Datasource.JsonData.Get("connMaxLifetime").MustInt(14400)
|
|
|
|
|
+ engine.SetConnMaxLifetime(time.Duration(connMaxLifetime) * time.Second)
|
|
|
|
|
|
|
|
engineCache.versions[config.Datasource.Id] = config.Datasource.Version
|
|
engineCache.versions[config.Datasource.Id] = config.Datasource.Version
|
|
|
engineCache.cache[config.Datasource.Id] = engine
|
|
engineCache.cache[config.Datasource.Id] = engine
|
|
@@ -116,9 +120,7 @@ func (e *sqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource,
|
|
|
Results: make(map[string]*QueryResult),
|
|
Results: make(map[string]*QueryResult),
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- session := e.engine.NewSession()
|
|
|
|
|
- defer session.Close()
|
|
|
|
|
- db := session.DB()
|
|
|
|
|
|
|
+ var wg sync.WaitGroup
|
|
|
|
|
|
|
|
for _, query := range tsdbQuery.Queries {
|
|
for _, query := range tsdbQuery.Queries {
|
|
|
rawSQL := query.Model.Get("rawSql").MustString()
|
|
rawSQL := query.Model.Get("rawSql").MustString()
|
|
@@ -145,31 +147,41 @@ func (e *sqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource,
|
|
|
|
|
|
|
|
queryResult.Meta.Set("sql", rawSQL)
|
|
queryResult.Meta.Set("sql", rawSQL)
|
|
|
|
|
|
|
|
- rows, err := db.Query(rawSQL)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- queryResult.Error = err
|
|
|
|
|
- continue
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ wg.Add(1)
|
|
|
|
|
|
|
|
- defer rows.Close()
|
|
|
|
|
|
|
+ go func(rawSQL string, query *Query, queryResult *QueryResult) {
|
|
|
|
|
+ defer wg.Done()
|
|
|
|
|
+ session := e.engine.NewSession()
|
|
|
|
|
+ defer session.Close()
|
|
|
|
|
+ db := session.DB()
|
|
|
|
|
|
|
|
- format := query.Model.Get("format").MustString("time_series")
|
|
|
|
|
-
|
|
|
|
|
- switch format {
|
|
|
|
|
- case "time_series":
|
|
|
|
|
- err := e.transformToTimeSeries(query, rows, queryResult, tsdbQuery)
|
|
|
|
|
|
|
+ rows, err := db.Query(rawSQL)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
queryResult.Error = err
|
|
queryResult.Error = err
|
|
|
- continue
|
|
|
|
|
|
|
+ return
|
|
|
}
|
|
}
|
|
|
- case "table":
|
|
|
|
|
- err := e.transformToTable(query, rows, queryResult, tsdbQuery)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- queryResult.Error = err
|
|
|
|
|
- continue
|
|
|
|
|
|
|
+
|
|
|
|
|
+ defer rows.Close()
|
|
|
|
|
+
|
|
|
|
|
+ format := query.Model.Get("format").MustString("time_series")
|
|
|
|
|
+
|
|
|
|
|
+ switch format {
|
|
|
|
|
+ case "time_series":
|
|
|
|
|
+ err := e.transformToTimeSeries(query, rows, queryResult, tsdbQuery)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ queryResult.Error = err
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ case "table":
|
|
|
|
|
+ err := e.transformToTable(query, rows, queryResult, tsdbQuery)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ queryResult.Error = err
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
- }
|
|
|
|
|
|
|
+ }(rawSQL, query, queryResult)
|
|
|
}
|
|
}
|
|
|
|
|
+ wg.Wait()
|
|
|
|
|
|
|
|
return result, nil
|
|
return result, nil
|
|
|
}
|
|
}
|