Make webhook http connections resuable (#6976)
* make http connections resuable * add error handler * fix lint
This commit is contained in:
parent
02542a2c15
commit
1f84970de0
1 changed files with 72 additions and 26 deletions
|
@ -13,11 +13,12 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"code.gitea.io/gitea/modules/httplib"
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
"code.gitea.io/gitea/modules/setting"
|
||||
api "code.gitea.io/gitea/modules/structs"
|
||||
|
@ -753,47 +754,66 @@ func prepareWebhooks(e Engine, repo *Repository, event HookEventType, p api.Payl
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t *HookTask) deliver() {
|
||||
func (t *HookTask) deliver() error {
|
||||
t.IsDelivered = true
|
||||
|
||||
timeout := time.Duration(setting.Webhook.DeliverTimeout) * time.Second
|
||||
var req *http.Request
|
||||
var err error
|
||||
|
||||
var req *httplib.Request
|
||||
switch t.HTTPMethod {
|
||||
case "":
|
||||
log.Info("HTTP Method for webhook %d empty, setting to POST as default", t.ID)
|
||||
fallthrough
|
||||
case http.MethodPost:
|
||||
req = httplib.Post(t.URL)
|
||||
switch t.ContentType {
|
||||
case ContentTypeJSON:
|
||||
req = req.Header("Content-Type", "application/json").Body(t.PayloadContent)
|
||||
req, err = http.NewRequest("POST", t.URL, strings.NewReader(t.PayloadContent))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
case ContentTypeForm:
|
||||
req.Param("payload", t.PayloadContent)
|
||||
var forms = url.Values{
|
||||
"payload": []string{t.PayloadContent},
|
||||
}
|
||||
|
||||
req, err = http.NewRequest("POST", t.URL, strings.NewReader(forms.Encode()))
|
||||
if err != nil {
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
case http.MethodGet:
|
||||
req = httplib.Get(t.URL).Param("payload", t.PayloadContent)
|
||||
u, err := url.Parse(t.URL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
vals := u.Query()
|
||||
vals["payload"] = []string{t.PayloadContent}
|
||||
u.RawQuery = vals.Encode()
|
||||
req, err = http.NewRequest("GET", u.String(), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
log.Error("Invalid http method for webhook: [%d] %v", t.ID, t.HTTPMethod)
|
||||
return
|
||||
return fmt.Errorf("Invalid http method for webhook: [%d] %v", t.ID, t.HTTPMethod)
|
||||
}
|
||||
|
||||
req = req.SetTimeout(timeout, timeout).
|
||||
Header("X-Gitea-Delivery", t.UUID).
|
||||
Header("X-Gitea-Event", string(t.EventType)).
|
||||
Header("X-Gitea-Signature", t.Signature).
|
||||
Header("X-Gogs-Delivery", t.UUID).
|
||||
Header("X-Gogs-Event", string(t.EventType)).
|
||||
Header("X-Gogs-Signature", t.Signature).
|
||||
HeaderWithSensitiveCase("X-GitHub-Delivery", t.UUID).
|
||||
HeaderWithSensitiveCase("X-GitHub-Event", string(t.EventType)).
|
||||
SetTLSClientConfig(&tls.Config{InsecureSkipVerify: setting.Webhook.SkipTLSVerify})
|
||||
req.Header.Add("X-Gitea-Delivery", t.UUID)
|
||||
req.Header.Add("X-Gitea-Event", string(t.EventType))
|
||||
req.Header.Add("X-Gitea-Signature", t.Signature)
|
||||
req.Header.Add("X-Gogs-Delivery", t.UUID)
|
||||
req.Header.Add("X-Gogs-Event", string(t.EventType))
|
||||
req.Header.Add("X-Gogs-Signature", t.Signature)
|
||||
req.Header["X-GitHub-Delivery"] = []string{t.UUID}
|
||||
req.Header["X-GitHub-Event"] = []string{string(t.EventType)}
|
||||
|
||||
// Record delivery information.
|
||||
t.RequestInfo = &HookRequest{
|
||||
Headers: map[string]string{},
|
||||
}
|
||||
for k, vals := range req.Headers() {
|
||||
for k, vals := range req.Header {
|
||||
t.RequestInfo.Headers[k] = strings.Join(vals, ",")
|
||||
}
|
||||
|
||||
|
@ -830,10 +850,10 @@ func (t *HookTask) deliver() {
|
|||
}
|
||||
}()
|
||||
|
||||
resp, err := req.Response()
|
||||
resp, err := webhookHTTPClient.Do(req)
|
||||
if err != nil {
|
||||
t.ResponseInfo.Body = fmt.Sprintf("Delivery: %v", err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
|
@ -847,9 +867,10 @@ func (t *HookTask) deliver() {
|
|||
p, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
t.ResponseInfo.Body = fmt.Sprintf("read body: %s", err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
t.ResponseInfo.Body = string(p)
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeliverHooks checks and delivers undelivered hooks.
|
||||
|
@ -864,7 +885,10 @@ func DeliverHooks() {
|
|||
|
||||
// Update hook task status.
|
||||
for _, t := range tasks {
|
||||
t.deliver()
|
||||
if err = t.deliver(); err != nil {
|
||||
log.Error("deliver: %v", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Start listening on new hook requests.
|
||||
|
@ -884,12 +908,34 @@ func DeliverHooks() {
|
|||
continue
|
||||
}
|
||||
for _, t := range tasks {
|
||||
t.deliver()
|
||||
if err = t.deliver(); err != nil {
|
||||
log.Error("deliver: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var webhookHTTPClient *http.Client
|
||||
|
||||
// InitDeliverHooks starts the hooks delivery thread
|
||||
func InitDeliverHooks() {
|
||||
timeout := time.Duration(setting.Webhook.DeliverTimeout) * time.Second
|
||||
|
||||
webhookHTTPClient = &http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: setting.Webhook.SkipTLSVerify},
|
||||
Dial: func(netw, addr string) (net.Conn, error) {
|
||||
conn, err := net.DialTimeout(netw, addr, timeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn.SetDeadline(time.Now().Add(timeout))
|
||||
return conn, nil
|
||||
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
go DeliverHooks()
|
||||
}
|
||||
|
|
Reference in a new issue