request.go 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. package tsdb
  2. type HandleRequestFunc func(req *Request) (*Response, error)
  3. func HandleRequest(req *Request) (*Response, error) {
  4. context := NewQueryContext(req.Queries, req.TimeRange)
  5. batches, err := getBatches(req)
  6. if err != nil {
  7. return nil, err
  8. }
  9. currentlyExecuting := 0
  10. for _, batch := range batches {
  11. if len(batch.Depends) == 0 {
  12. currentlyExecuting += 1
  13. batch.Started = true
  14. go batch.process(context)
  15. }
  16. }
  17. response := &Response{}
  18. for currentlyExecuting != 0 {
  19. select {
  20. case batchResult := <-context.ResultsChan:
  21. currentlyExecuting -= 1
  22. response.BatchTimings = append(response.BatchTimings, batchResult.Timings)
  23. if batchResult.Error != nil {
  24. return nil, batchResult.Error
  25. }
  26. for refId, result := range batchResult.QueryResults {
  27. context.Results[refId] = result
  28. }
  29. for _, batch := range batches {
  30. // not interested in started batches
  31. if batch.Started {
  32. continue
  33. }
  34. if batch.allDependenciesAreIn(context) {
  35. currentlyExecuting += 1
  36. batch.Started = true
  37. go batch.process(context)
  38. }
  39. }
  40. }
  41. }
  42. response.Results = context.Results
  43. return response, nil
  44. }