request.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package tsdb
  2. import (
  3. "context"
  4. )
  5. type HandleRequestFunc func(ctx context.Context, req *Request) (*Response, error)
  6. func HandleRequest(ctx context.Context, req *Request) (*Response, error) {
  7. tsdbQuery := NewQueryContext(req.Queries, req.TimeRange)
  8. batches, err := getBatches(req)
  9. if err != nil {
  10. return nil, err
  11. }
  12. currentlyExecuting := 0
  13. resultsChan := make(chan *BatchResult)
  14. for _, batch := range batches {
  15. if len(batch.Depends) == 0 {
  16. currentlyExecuting += 1
  17. batch.Started = true
  18. go batch.process(ctx, resultsChan, tsdbQuery)
  19. }
  20. }
  21. response := &Response{
  22. Results: make(map[string]*QueryResult),
  23. }
  24. for currentlyExecuting != 0 {
  25. select {
  26. case batchResult := <-resultsChan:
  27. currentlyExecuting -= 1
  28. response.BatchTimings = append(response.BatchTimings, batchResult.Timings)
  29. if batchResult.Error != nil {
  30. return nil, batchResult.Error
  31. }
  32. for refId, result := range batchResult.QueryResults {
  33. response.Results[refId] = result
  34. }
  35. for _, batch := range batches {
  36. // not interested in started batches
  37. if batch.Started {
  38. continue
  39. }
  40. if batch.allDependenciesAreIn(response) {
  41. currentlyExecuting += 1
  42. batch.Started = true
  43. go batch.process(ctx, resultsChan, tsdbQuery)
  44. }
  45. }
  46. case <-ctx.Done():
  47. return nil, ctx.Err()
  48. }
  49. }
  50. //response.Results = tsdbQuery.Results
  51. return response, nil
  52. }