|
|
@@ -86,9 +86,10 @@ func (e *CloudWatchExecutor) Query(ctx context.Context, dsInfo *models.DataSourc
|
|
|
}
|
|
|
|
|
|
func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
|
|
|
- result := &tsdb.Response{
|
|
|
+ results := &tsdb.Response{
|
|
|
Results: make(map[string]*tsdb.QueryResult),
|
|
|
}
|
|
|
+ resultChan := make(chan *tsdb.QueryResult, len(queryContext.Queries))
|
|
|
|
|
|
eg, ectx := errgroup.WithContext(ctx)
|
|
|
|
|
|
@@ -102,10 +103,10 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo
|
|
|
RefId := queryContext.Queries[i].RefId
|
|
|
query, err := parseQuery(queryContext.Queries[i].Model)
|
|
|
if err != nil {
|
|
|
- result.Results[RefId] = &tsdb.QueryResult{
|
|
|
+ results.Results[RefId] = &tsdb.QueryResult{
|
|
|
Error: err,
|
|
|
}
|
|
|
- return result, nil
|
|
|
+ return results, nil
|
|
|
}
|
|
|
query.RefId = RefId
|
|
|
|
|
|
@@ -118,10 +119,10 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo
|
|
|
}
|
|
|
|
|
|
if query.Id == "" && query.Expression != "" {
|
|
|
- result.Results[query.RefId] = &tsdb.QueryResult{
|
|
|
+ results.Results[query.RefId] = &tsdb.QueryResult{
|
|
|
Error: fmt.Errorf("Invalid query: id should be set if using expression"),
|
|
|
}
|
|
|
- return result, nil
|
|
|
+ return results, nil
|
|
|
}
|
|
|
|
|
|
eg.Go(func() error {
|
|
|
@@ -130,12 +131,13 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo
|
|
|
return err
|
|
|
}
|
|
|
if err != nil {
|
|
|
- result.Results[query.RefId] = &tsdb.QueryResult{
|
|
|
+ resultChan <- &tsdb.QueryResult{
|
|
|
+ RefId: query.RefId,
|
|
|
Error: err,
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
|
- result.Results[queryRes.RefId] = queryRes
|
|
|
+ resultChan <- queryRes
|
|
|
return nil
|
|
|
})
|
|
|
}
|
|
|
@@ -149,10 +151,10 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo
|
|
|
return err
|
|
|
}
|
|
|
for _, queryRes := range queryResponses {
|
|
|
- result.Results[queryRes.RefId] = queryRes
|
|
|
if err != nil {
|
|
|
- result.Results[queryRes.RefId].Error = err
|
|
|
+ queryRes.Error = err
|
|
|
}
|
|
|
+ resultChan <- queryRes
|
|
|
}
|
|
|
return nil
|
|
|
})
|
|
|
@@ -162,8 +164,12 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo
|
|
|
if err := eg.Wait(); err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
+ close(resultChan)
|
|
|
+ for result := range resultChan {
|
|
|
+ results.Results[result.RefId] = result
|
|
|
+ }
|
|
|
|
|
|
- return result, nil
|
|
|
+ return results, nil
|
|
|
}
|
|
|
|
|
|
func (e *CloudWatchExecutor) executeQuery(ctx context.Context, query *CloudWatchQuery, queryContext *tsdb.TsdbQuery) (*tsdb.QueryResult, error) {
|