request.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. package tsdb
  2. import "context"
  3. type HandleRequestFunc func(ctx context.Context, req *Request) (*Response, error)
  4. func HandleRequest(ctx context.Context, req *Request) (*Response, error) {
  5. context := NewQueryContext(req.Queries, req.TimeRange)
  6. batches, err := getBatches(req)
  7. if err != nil {
  8. return nil, err
  9. }
  10. currentlyExecuting := 0
  11. for _, batch := range batches {
  12. if len(batch.Depends) == 0 {
  13. currentlyExecuting += 1
  14. batch.Started = true
  15. go batch.process(ctx, context)
  16. }
  17. }
  18. response := &Response{}
  19. for currentlyExecuting != 0 {
  20. select {
  21. case batchResult := <-context.ResultsChan:
  22. currentlyExecuting -= 1
  23. response.BatchTimings = append(response.BatchTimings, batchResult.Timings)
  24. if batchResult.Error != nil {
  25. return nil, batchResult.Error
  26. }
  27. for refId, result := range batchResult.QueryResults {
  28. context.Results[refId] = result
  29. }
  30. for _, batch := range batches {
  31. // not interested in started batches
  32. if batch.Started {
  33. continue
  34. }
  35. if batch.allDependenciesAreIn(context) {
  36. currentlyExecuting += 1
  37. batch.Started = true
  38. go batch.process(ctx, context)
  39. }
  40. }
  41. }
  42. }
  43. response.Results = context.Results
  44. return response, nil
  45. }