From c76c70a16ce6c1d472059ea3e03206abb5ed884d Mon Sep 17 00:00:00 2001 From: zeripath Date: Thu, 16 Jan 2020 17:55:36 +0000 Subject: [PATCH] Move mailer to use a queue (#9789) * Move mailer to use a queue * Make sectionMap map[string]bool * Ensure that Message is json encodable --- modules/setting/queue.go | 24 +++++--- services/mailer/mail.go | 2 +- services/mailer/mail_test.go | 29 ++++----- services/mailer/mailer.go | 110 ++++++++++++++++++++++------------- 4 files changed, 102 insertions(+), 63 deletions(-) diff --git a/modules/setting/queue.go b/modules/setting/queue.go index c91ff55acd..8c07685855 100644 --- a/modules/setting/queue.go +++ b/modules/setting/queue.go @@ -103,11 +103,11 @@ func NewQueueService() { // Now handle the old issue_indexer configuration section := Cfg.Section("queue.issue_indexer") - issueIndexerSectionMap := map[string]string{} + sectionMap := map[string]bool{} for _, key := range section.Keys() { - issueIndexerSectionMap[key.Name()] = key.Value() + sectionMap[key.Name()] = true } - if _, ok := issueIndexerSectionMap["TYPE"]; !ok { + if _, ok := sectionMap["TYPE"]; !ok { switch Indexer.IssueQueueType { case LevelQueueType: section.Key("TYPE").SetValue("level") @@ -120,18 +120,28 @@ func NewQueueService() { Indexer.IssueQueueType) } } - if _, ok := issueIndexerSectionMap["LENGTH"]; !ok { + if _, ok := sectionMap["LENGTH"]; !ok { section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Indexer.UpdateQueueLength)) } - if _, ok := issueIndexerSectionMap["BATCH_LENGTH"]; !ok { + if _, ok := sectionMap["BATCH_LENGTH"]; !ok { section.Key("BATCH_LENGTH").SetValue(fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber)) } - if _, ok := issueIndexerSectionMap["DATADIR"]; !ok { + if _, ok := sectionMap["DATADIR"]; !ok { section.Key("DATADIR").SetValue(Indexer.IssueQueueDir) } - if _, ok := issueIndexerSectionMap["CONN_STR"]; !ok { + if _, ok := sectionMap["CONN_STR"]; !ok { section.Key("CONN_STR").SetValue(Indexer.IssueQueueConnStr) } + + // Handle the old mailer configuration + section = Cfg.Section("queue.mailer") + sectionMap = map[string]bool{} + for _, key := range section.Keys() { + sectionMap[key.Name()] = true + } + if _, ok := sectionMap["LENGTH"]; !ok { + section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100))) + } } // ParseQueueConnStr parses a queue connection string diff --git a/services/mailer/mail.go b/services/mailer/mail.go index 4b8e46715f..3241ae728d 100644 --- a/services/mailer/mail.go +++ b/services/mailer/mail.go @@ -51,7 +51,7 @@ func InitMailRender(subjectTpl *texttmpl.Template, bodyTpl *template.Template) { // SendTestMail sends a test mail func SendTestMail(email string) error { - return gomail.Send(Sender, NewMessage([]string{email}, "Gitea Test Email!", "Gitea Test Email!").Message) + return gomail.Send(Sender, NewMessage([]string{email}, "Gitea Test Email!", "Gitea Test Email!").ToMessage()) } // SendUserMail sends a mail to the user diff --git a/services/mailer/mail_test.go b/services/mailer/mail_test.go index 43e99c635e..d7d02d9dee 100644 --- a/services/mailer/mail_test.go +++ b/services/mailer/mail_test.go @@ -61,11 +61,11 @@ func TestComposeIssueCommentMessage(t *testing.T) { msgs := composeIssueCommentMessages(&mailCommentContext{Issue: issue, Doer: doer, ActionType: models.ActionCommentIssue, Content: "test body", Comment: comment}, tos, false, "issue comment") assert.Len(t, msgs, 2) - - mailto := msgs[0].GetHeader("To") - subject := msgs[0].GetHeader("Subject") - inreplyTo := msgs[0].GetHeader("In-Reply-To") - references := msgs[0].GetHeader("References") + gomailMsg := msgs[0].ToMessage() + mailto := gomailMsg.GetHeader("To") + subject := gomailMsg.GetHeader("Subject") + inreplyTo := gomailMsg.GetHeader("In-Reply-To") + references := gomailMsg.GetHeader("References") assert.Len(t, mailto, 1, "exactly one recipient is expected in the To field") assert.Equal(t, "Re: ", subject[0][:4], "Comment reply subject should contain Re:") @@ -96,14 +96,15 @@ func TestComposeIssueMessage(t *testing.T) { Content: "test body"}, tos, false, "issue create") assert.Len(t, msgs, 2) - mailto := msgs[0].GetHeader("To") - subject := msgs[0].GetHeader("Subject") - messageID := msgs[0].GetHeader("Message-ID") + gomailMsg := msgs[0].ToMessage() + mailto := gomailMsg.GetHeader("To") + subject := gomailMsg.GetHeader("Subject") + messageID := gomailMsg.GetHeader("Message-ID") assert.Len(t, mailto, 1, "exactly one recipient is expected in the To field") assert.Equal(t, "[user2/repo1] @user2 #1 - issue1", subject[0]) - assert.Nil(t, msgs[0].GetHeader("In-Reply-To")) - assert.Nil(t, msgs[0].GetHeader("References")) + assert.Nil(t, gomailMsg.GetHeader("In-Reply-To")) + assert.Nil(t, gomailMsg.GetHeader("References")) assert.Equal(t, messageID[0], "", "Message-ID header doesn't match") } @@ -134,9 +135,9 @@ func TestTemplateSelection(t *testing.T) { InitMailRender(stpl, btpl) expect := func(t *testing.T, msg *Message, expSubject, expBody string) { - subject := msg.GetHeader("Subject") + subject := msg.ToMessage().GetHeader("Subject") msgbuf := new(bytes.Buffer) - _, _ = msg.WriteTo(msgbuf) + _, _ = msg.ToMessage().WriteTo(msgbuf) wholemsg := msgbuf.String() assert.Equal(t, []string{expSubject}, subject) assert.Contains(t, wholemsg, expBody) @@ -188,9 +189,9 @@ func TestTemplateServices(t *testing.T) { msg := testComposeIssueCommentMessage(t, &mailCommentContext{Issue: issue, Doer: doer, ActionType: actionType, Content: "test body", Comment: comment}, tos, fromMention, "TestTemplateServices") - subject := msg.GetHeader("Subject") + subject := msg.ToMessage().GetHeader("Subject") msgbuf := new(bytes.Buffer) - _, _ = msg.WriteTo(msgbuf) + _, _ = msg.ToMessage().WriteTo(msgbuf) wholemsg := msgbuf.String() assert.Equal(t, []string{expSubject}, subject) diff --git a/services/mailer/mailer.go b/services/mailer/mailer.go index 2e4aa8d71b..afcc7a7278 100644 --- a/services/mailer/mailer.go +++ b/services/mailer/mailer.go @@ -18,7 +18,9 @@ import ( "time" "code.gitea.io/gitea/modules/base" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" "github.com/jaytaylor/html2text" @@ -27,38 +29,63 @@ import ( // Message mail body and log info type Message struct { - Info string // Message information for log purpose. - *gomail.Message + Info string // Message information for log purpose. + FromAddress string + FromDisplayName string + To []string + Subject string + Date time.Time + Body string + Headers map[string][]string +} + +// ToMessage converts a Message to gomail.Message +func (m *Message) ToMessage() *gomail.Message { + msg := gomail.NewMessage() + msg.SetAddressHeader("From", m.FromAddress, m.FromDisplayName) + msg.SetHeader("To", m.To...) + for header := range m.Headers { + msg.SetHeader(header, m.Headers[header]...) + } + + if len(setting.MailService.SubjectPrefix) > 0 { + msg.SetHeader("Subject", setting.MailService.SubjectPrefix+" "+m.Subject) + } else { + msg.SetHeader("Subject", m.Subject) + } + msg.SetDateHeader("Date", m.Date) + msg.SetHeader("X-Auto-Response-Suppress", "All") + + plainBody, err := html2text.FromString(m.Body) + if err != nil || setting.MailService.SendAsPlainText { + if strings.Contains(base.TruncateString(m.Body, 100), "") { + log.Warn("Mail contains HTML but configured to send as plain text.") + } + msg.SetBody("text/plain", plainBody) + } else { + msg.SetBody("text/plain", plainBody) + msg.AddAlternative("text/html", m.Body) + } + return msg +} + +// SetHeader adds additional headers to a message +func (m *Message) SetHeader(field string, value ...string) { + m.Headers[field] = value } // NewMessageFrom creates new mail message object with custom From header. func NewMessageFrom(to []string, fromDisplayName, fromAddress, subject, body string) *Message { log.Trace("NewMessageFrom (body):\n%s", body) - msg := gomail.NewMessage() - msg.SetAddressHeader("From", fromAddress, fromDisplayName) - msg.SetHeader("To", to...) - if len(setting.MailService.SubjectPrefix) > 0 { - msg.SetHeader("Subject", setting.MailService.SubjectPrefix+" "+subject) - } else { - msg.SetHeader("Subject", subject) - } - msg.SetDateHeader("Date", time.Now()) - msg.SetHeader("X-Auto-Response-Suppress", "All") - - plainBody, err := html2text.FromString(body) - if err != nil || setting.MailService.SendAsPlainText { - if strings.Contains(base.TruncateString(body, 100), "") { - log.Warn("Mail contains HTML but configured to send as plain text.") - } - msg.SetBody("text/plain", plainBody) - } else { - msg.SetBody("text/plain", plainBody) - msg.AddAlternative("text/html", body) - } - return &Message{ - Message: msg, + FromAddress: fromAddress, + FromDisplayName: fromDisplayName, + To: to, + Subject: subject, + Date: time.Now(), + Body: body, + Headers: map[string][]string{}, } } @@ -257,18 +284,7 @@ func (s *dummySender) Send(from string, to []string, msg io.WriterTo) error { return nil } -func processMailQueue() { - for msg := range mailQueue { - log.Trace("New e-mail sending request %s: %s", msg.GetHeader("To"), msg.Info) - if err := gomail.Send(Sender, msg.Message); err != nil { - log.Error("Failed to send emails %s: %s - %v", msg.GetHeader("To"), msg.Info, err) - } else { - log.Trace("E-mails sent %s: %s", msg.GetHeader("To"), msg.Info) - } - } -} - -var mailQueue chan *Message +var mailQueue queue.Queue // Sender sender for sending mail synchronously var Sender gomail.Sender @@ -291,14 +307,26 @@ func NewContext() { Sender = &dummySender{} } - mailQueue = make(chan *Message, setting.MailService.QueueLength) - go processMailQueue() + mailQueue = queue.CreateQueue("mail", func(data ...queue.Data) { + for _, datum := range data { + msg := datum.(*Message) + gomailMsg := msg.ToMessage() + log.Trace("New e-mail sending request %s: %s", gomailMsg.GetHeader("To"), msg.Info) + if err := gomail.Send(Sender, gomailMsg); err != nil { + log.Error("Failed to send emails %s: %s - %v", gomailMsg.GetHeader("To"), msg.Info, err) + } else { + log.Trace("E-mails sent %s: %s", gomailMsg.GetHeader("To"), msg.Info) + } + } + }, &Message{}) + + go graceful.GetManager().RunWithShutdownFns(mailQueue.Run) } // SendAsync send mail asynchronously func SendAsync(msg *Message) { go func() { - mailQueue <- msg + _ = mailQueue.Push(msg) }() } @@ -306,7 +334,7 @@ func SendAsync(msg *Message) { func SendAsyncs(msgs []*Message) { go func() { for _, msg := range msgs { - mailQueue <- msg + _ = mailQueue.Push(msg) } }() }