request.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package tsdb
  2. import (
  3. "context"
  4. )
  5. type HandleRequestFunc func(ctx context.Context, req *Request) (*Response, error)
  6. func HandleRequest(ctx context.Context, req *Request) (*Response, error) {
  7. context := NewQueryContext(req.Queries, req.TimeRange)
  8. batches, err := getBatches(req)
  9. if err != nil {
  10. return nil, err
  11. }
  12. currentlyExecuting := 0
  13. for _, batch := range batches {
  14. if len(batch.Depends) == 0 {
  15. currentlyExecuting += 1
  16. batch.Started = true
  17. go batch.process(ctx, context)
  18. }
  19. }
  20. response := &Response{}
  21. for currentlyExecuting != 0 {
  22. select {
  23. case batchResult := <-context.ResultsChan:
  24. currentlyExecuting -= 1
  25. response.BatchTimings = append(response.BatchTimings, batchResult.Timings)
  26. if batchResult.Error != nil {
  27. return nil, batchResult.Error
  28. }
  29. for refId, result := range batchResult.QueryResults {
  30. context.Results[refId] = result
  31. }
  32. for _, batch := range batches {
  33. // not interested in started batches
  34. if batch.Started {
  35. continue
  36. }
  37. if batch.allDependenciesAreIn(context) {
  38. currentlyExecuting += 1
  39. batch.Started = true
  40. go batch.process(ctx, context)
  41. }
  42. }
  43. case <-ctx.Done():
  44. return nil, ctx.Err()
  45. }
  46. }
  47. response.Results = context.Results
  48. return response, nil
  49. }