request.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package tsdb
  2. import (
  3. "context"
  4. )
  5. type HandleRequestFunc func(ctx context.Context, req *TsdbQuery) (*Response, error)
  6. func HandleRequest(ctx context.Context, req *TsdbQuery) (*Response, error) {
  7. tsdbQuery := &TsdbQuery{
  8. Queries: req.Queries,
  9. TimeRange: req.TimeRange,
  10. }
  11. batches, err := getBatches(req)
  12. if err != nil {
  13. return nil, err
  14. }
  15. currentlyExecuting := 0
  16. resultsChan := make(chan *BatchResult)
  17. for _, batch := range batches {
  18. if len(batch.Depends) == 0 {
  19. currentlyExecuting += 1
  20. batch.Started = true
  21. go batch.process(ctx, resultsChan, tsdbQuery)
  22. }
  23. }
  24. response := &Response{
  25. Results: make(map[string]*QueryResult),
  26. }
  27. for currentlyExecuting != 0 {
  28. select {
  29. case batchResult := <-resultsChan:
  30. currentlyExecuting -= 1
  31. response.BatchTimings = append(response.BatchTimings, batchResult.Timings)
  32. if batchResult.Error != nil {
  33. return nil, batchResult.Error
  34. }
  35. for refId, result := range batchResult.QueryResults {
  36. response.Results[refId] = result
  37. }
  38. for _, batch := range batches {
  39. // not interested in started batches
  40. if batch.Started {
  41. continue
  42. }
  43. if batch.allDependenciesAreIn(response) {
  44. currentlyExecuting += 1
  45. batch.Started = true
  46. go batch.process(ctx, resultsChan, tsdbQuery)
  47. }
  48. }
  49. case <-ctx.Done():
  50. return nil, ctx.Err()
  51. }
  52. }
  53. //response.Results = tsdbQuery.Results
  54. return response, nil
  55. }