request.go 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. package tsdb
  2. func HandleRequest(req *Request) (*Response, error) {
  3. context := NewQueryContext(req.Queries, req.TimeRange)
  4. batches, err := getBatches(req)
  5. if err != nil {
  6. return nil, err
  7. }
  8. currentlyExecuting := 0
  9. for _, batch := range batches {
  10. if len(batch.Depends) == 0 {
  11. currentlyExecuting += 1
  12. batch.Started = true
  13. go batch.process(context)
  14. }
  15. }
  16. response := &Response{}
  17. for currentlyExecuting != 0 {
  18. select {
  19. case batchResult := <-context.ResultsChan:
  20. currentlyExecuting -= 1
  21. response.BatchTimings = append(response.BatchTimings, batchResult.Timings)
  22. if batchResult.Error != nil {
  23. return nil, batchResult.Error
  24. }
  25. for refId, result := range batchResult.QueryResults {
  26. context.Results[refId] = result
  27. }
  28. for _, batch := range batches {
  29. // not interested in started batches
  30. if batch.Started {
  31. continue
  32. }
  33. if batch.allDependenciesAreIn(context) {
  34. currentlyExecuting += 1
  35. batch.Started = true
  36. go batch.process(context)
  37. }
  38. }
  39. }
  40. }
  41. response.Results = context.Results
  42. return response, nil
  43. }