From 089740758c0f408a62de331990fe694d35bc8d1c Mon Sep 17 00:00:00 2001 From: Simon Ser Date: Sun, 28 Apr 2019 13:01:56 +0000 Subject: worker/imap: use the IMAP connection from a single goroutine Unfortunately, the IMAP protocol hasn't been designed to be used from multiple goroutines at the same time. For instance, if you fetch twice the same message from two different goroutines, it's not possible to tell whether the response is for one receiver or the other. For this reason, go-imap clients aren't safe to use from multiple goroutines. This commit changes the IMAP workers to be synchronous again (a command is executed only after the previous one has completed). To use IMAP from different threads, popular clients (e.g. Thunderbird) typically open multiple connections. --- worker/imap/fetch.go | 78 +++++++++++++++++++++++++--------------------------- worker/imap/list.go | 24 ++++++++-------- worker/imap/open.go | 61 +++++++++++++++++++--------------------- 3 files changed, 77 insertions(+), 86 deletions(-) (limited to 'worker') diff --git a/worker/imap/fetch.go b/worker/imap/fetch.go index af9d3b1d..a799f2a5 100644 --- a/worker/imap/fetch.go +++ b/worker/imap/fetch.go @@ -46,50 +46,46 @@ func (imapw *IMAPWorker) handleFetchMessages( msg types.WorkerMessage, uids *imap.SeqSet, items []imap.FetchItem, section *imap.BodySectionName) { + messages := make(chan *imap.Message) + go func() { - messages := make(chan *imap.Message) - done := make(chan error, 1) - go func() { - done <- imapw.client.UidFetch(uids, items, messages) - }() - go func() { - for _msg := range messages { - imapw.seqMap[_msg.SeqNum-1] = _msg.Uid - switch msg.(type) { - case *types.FetchMessageHeaders: - imapw.worker.PostMessage(&types.MessageInfo{ - Message: types.RespondTo(msg), - BodyStructure: _msg.BodyStructure, - Envelope: _msg.Envelope, - Flags: _msg.Flags, - InternalDate: _msg.InternalDate, - Uid: _msg.Uid, - }, nil) - case *types.FetchFullMessages: - reader := _msg.GetBody(section) - imapw.worker.PostMessage(&types.FullMessage{ - Message: types.RespondTo(msg), - Reader: reader, - Uid: _msg.Uid, - }, nil) - case *types.FetchMessageBodyPart: - reader := _msg.GetBody(section) - imapw.worker.PostMessage(&types.MessageBodyPart{ - Message: types.RespondTo(msg), - Reader: reader, - Uid: _msg.Uid, - }, nil) - } - } - if err := <-done; err != nil { - imapw.worker.PostMessage(&types.Error{ + for _msg := range messages { + imapw.seqMap[_msg.SeqNum-1] = _msg.Uid + switch msg.(type) { + case *types.FetchMessageHeaders: + imapw.worker.PostMessage(&types.MessageInfo{ + Message: types.RespondTo(msg), + BodyStructure: _msg.BodyStructure, + Envelope: _msg.Envelope, + Flags: _msg.Flags, + InternalDate: _msg.InternalDate, + Uid: _msg.Uid, + }, nil) + case *types.FetchFullMessages: + reader := _msg.GetBody(section) + imapw.worker.PostMessage(&types.FullMessage{ Message: types.RespondTo(msg), - Error: err, + Reader: reader, + Uid: _msg.Uid, + }, nil) + case *types.FetchMessageBodyPart: + reader := _msg.GetBody(section) + imapw.worker.PostMessage(&types.MessageBodyPart{ + Message: types.RespondTo(msg), + Reader: reader, + Uid: _msg.Uid, }, nil) - } else { - imapw.worker.PostMessage( - &types.Done{types.RespondTo(msg)}, nil) } - }() + } }() + + if err := imapw.client.UidFetch(uids, items, messages); err != nil { + imapw.worker.PostMessage(&types.Error{ + Message: types.RespondTo(msg), + Error: err, + }, nil) + } else { + imapw.worker.PostMessage( + &types.Done{types.RespondTo(msg)}, nil) + } } diff --git a/worker/imap/list.go b/worker/imap/list.go index eff3d490..22addc3f 100644 --- a/worker/imap/list.go +++ b/worker/imap/list.go @@ -8,11 +8,8 @@ import ( func (imapw *IMAPWorker) handleListDirectories(msg *types.ListDirectories) { mailboxes := make(chan *imap.MailboxInfo) - done := make(chan error, 1) imapw.worker.Logger.Println("Listing mailboxes") - go func() { - done <- imapw.client.List("", "*", mailboxes) - }() + go func() { for mbox := range mailboxes { imapw.worker.PostMessage(&types.Directory{ @@ -21,14 +18,15 @@ func (imapw *IMAPWorker) handleListDirectories(msg *types.ListDirectories) { Attributes: mbox.Attributes, }, nil) } - if err := <-done; err != nil { - imapw.worker.PostMessage(&types.Error{ - Message: types.RespondTo(msg), - Error: err, - }, nil) - } else { - imapw.worker.PostMessage( - &types.Done{types.RespondTo(msg)}, nil) - } }() + + if err := imapw.client.List("", "*", mailboxes); err != nil { + imapw.worker.PostMessage(&types.Error{ + Message: types.RespondTo(msg), + Error: err, + }, nil) + } else { + imapw.worker.PostMessage( + &types.Done{types.RespondTo(msg)}, nil) + } } diff --git a/worker/imap/open.go b/worker/imap/open.go index 3705bc0d..dc5d6d17 100644 --- a/worker/imap/open.go +++ b/worker/imap/open.go @@ -8,17 +8,16 @@ import ( func (imapw *IMAPWorker) handleOpenDirectory(msg *types.OpenDirectory) { imapw.worker.Logger.Printf("Opening %s", msg.Directory) - go func() { - _, err := imapw.client.Select(msg.Directory, false) - if err != nil { - imapw.worker.PostMessage(&types.Error{ - Message: types.RespondTo(msg), - Error: err, - }, nil) - } else { - imapw.worker.PostMessage(&types.Done{types.RespondTo(msg)}, nil) - } - }() + + _, err := imapw.client.Select(msg.Directory, false) + if err != nil { + imapw.worker.PostMessage(&types.Error{ + Message: types.RespondTo(msg), + Error: err, + }, nil) + } else { + imapw.worker.PostMessage(&types.Done{types.RespondTo(msg)}, nil) + } } func (imapw *IMAPWorker) handleFetchDirectoryContents( @@ -26,25 +25,23 @@ func (imapw *IMAPWorker) handleFetchDirectoryContents( imapw.worker.Logger.Printf("Fetching UID list") - go func() { - seqSet := &imap.SeqSet{} - seqSet.AddRange(1, imapw.selected.Messages) - uids, err := imapw.client.UidSearch(&imap.SearchCriteria{ - SeqNum: seqSet, - }) - if err != nil { - imapw.worker.PostMessage(&types.Error{ - Message: types.RespondTo(msg), - Error: err, - }, nil) - } else { - imapw.worker.Logger.Printf("Found %d UIDs", len(uids)) - imapw.seqMap = make([]uint32, len(uids)) - imapw.worker.PostMessage(&types.DirectoryContents{ - Message: types.RespondTo(msg), - Uids: uids, - }, nil) - imapw.worker.PostMessage(&types.Done{types.RespondTo(msg)}, nil) - } - }() + seqSet := &imap.SeqSet{} + seqSet.AddRange(1, imapw.selected.Messages) + uids, err := imapw.client.UidSearch(&imap.SearchCriteria{ + SeqNum: seqSet, + }) + if err != nil { + imapw.worker.PostMessage(&types.Error{ + Message: types.RespondTo(msg), + Error: err, + }, nil) + } else { + imapw.worker.Logger.Printf("Found %d UIDs", len(uids)) + imapw.seqMap = make([]uint32, len(uids)) + imapw.worker.PostMessage(&types.DirectoryContents{ + Message: types.RespondTo(msg), + Uids: uids, + }, nil) + imapw.worker.PostMessage(&types.Done{types.RespondTo(msg)}, nil) + } } -- cgit