webhook.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package notifications
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io/ioutil"
  7. "net"
  8. "net/http"
  9. "time"
  10. "golang.org/x/net/context/ctxhttp"
  11. "github.com/grafana/grafana/pkg/log"
  12. "github.com/grafana/grafana/pkg/util"
  13. )
  14. type Webhook struct {
  15. Url string
  16. User string
  17. Password string
  18. Body string
  19. HttpMethod string
  20. HttpHeader map[string]string
  21. }
  22. var netTransport = &http.Transport{
  23. Dial: (&net.Dialer{
  24. Timeout: 30 * time.Second,
  25. }).Dial,
  26. TLSHandshakeTimeout: 5 * time.Second,
  27. }
  28. var netClient = &http.Client{
  29. Timeout: time.Second * 30,
  30. Transport: netTransport,
  31. }
  32. var (
  33. webhookQueue chan *Webhook
  34. webhookLog log.Logger
  35. )
  36. func initWebhookQueue() {
  37. webhookLog = log.New("notifications.webhook")
  38. webhookQueue = make(chan *Webhook, 10)
  39. go processWebhookQueue()
  40. }
  41. func processWebhookQueue() {
  42. for {
  43. select {
  44. case webhook := <-webhookQueue:
  45. err := sendWebRequestSync(context.Background(), webhook)
  46. if err != nil {
  47. webhookLog.Error("Failed to send webrequest ", "error", err)
  48. }
  49. }
  50. }
  51. }
  52. func sendWebRequestSync(ctx context.Context, webhook *Webhook) error {
  53. webhookLog.Debug("Sending webhook", "url", webhook.Url, "http method", webhook.HttpMethod)
  54. if webhook.HttpMethod == "" {
  55. webhook.HttpMethod = http.MethodPost
  56. }
  57. request, err := http.NewRequest(webhook.HttpMethod, webhook.Url, bytes.NewReader([]byte(webhook.Body)))
  58. if err != nil {
  59. return err
  60. }
  61. request.Header.Add("Content-Type", "application/json")
  62. request.Header.Add("User-Agent", "Grafana")
  63. if webhook.User != "" && webhook.Password != "" {
  64. request.Header.Add("Authorization", util.GetBasicAuthHeader(webhook.User, webhook.Password))
  65. }
  66. for k, v := range webhook.HttpHeader {
  67. request.Header.Set(k, v)
  68. }
  69. resp, err := ctxhttp.Do(ctx, netClient, request)
  70. if err != nil {
  71. return err
  72. }
  73. if resp.StatusCode/100 == 2 {
  74. return nil
  75. }
  76. defer resp.Body.Close()
  77. body, err := ioutil.ReadAll(resp.Body)
  78. if err != nil {
  79. return err
  80. }
  81. webhookLog.Debug("Webhook failed", "statuscode", resp.Status, "body", string(body))
  82. return fmt.Errorf("Webhook response status %v", resp.Status)
  83. }
  84. var addToWebhookQueue = func(msg *Webhook) {
  85. webhookQueue <- msg
  86. }