diff options
author | Koni Marti <koni.marti@gmail.com> | 2024-01-20 11:31:03 +0100 |
---|---|---|
committer | Robin Jarry <robin@jarry.cc> | 2024-01-25 21:55:17 +0100 |
commit | 1980744f7bf9e147abf649d37a2fa7dddd4e7254 (patch) | |
tree | dba9967e81d4c65c287dd5c9c1e26c5f269891b7 /worker | |
parent | 3452c9233f623c4049098b66911ae82fc14e119c (diff) | |
download | aerc-1980744f7bf9e147abf649d37a2fa7dddd4e7254.tar.gz |
idler: improve the imap idler
Rewrite the imap idler to make it more fault tolerant and prevent hangs
(and possibly short writes).
Fixes: https://todo.sr.ht/~rjarry/aerc/208
Signed-off-by: Koni Marti <koni.marti@gmail.com>
Tested-by: Karel Balej <balejk@matfyz.cz>
Acked-by: Robin Jarry <robin@jarry.cc>
Diffstat (limited to 'worker')
-rw-r--r-- | worker/imap/configure.go | 2 | ||||
-rw-r--r-- | worker/imap/extensions/xgmext/client.go | 8 | ||||
-rw-r--r-- | worker/imap/idler.go | 203 | ||||
-rw-r--r-- | worker/imap/observer.go | 3 | ||||
-rw-r--r-- | worker/imap/worker.go | 98 | ||||
-rw-r--r-- | worker/middleware/gmailworker.go | 24 |
6 files changed, 182 insertions, 156 deletions
diff --git a/worker/imap/configure.go b/worker/imap/configure.go index 49464689..9d4a02de 100644 --- a/worker/imap/configure.go +++ b/worker/imap/configure.go @@ -166,7 +166,7 @@ func (w *IMAPWorker) handleConfigure(msg *types.Configure) error { if w.config.cacheEnabled { w.initCacheDb(msg.Config.Name) } - w.idler = newIdler(w.config, w.worker) + w.idler = newIdler(w.config, w.worker, w.executeIdle) w.observer = newObserver(w.config, w.worker) if name, ok := msg.Config.Params["folder-map"]; ok { diff --git a/worker/imap/extensions/xgmext/client.go b/worker/imap/extensions/xgmext/client.go index 3107e642..62d081b2 100644 --- a/worker/imap/extensions/xgmext/client.go +++ b/worker/imap/extensions/xgmext/client.go @@ -46,7 +46,13 @@ func (h handler) fetchThreadIds(uids []uint32) ([]string, error) { go func() { defer log.PanicHandler() for msg := range messages { - m[msg.Items[thriditem].(string)] = struct{}{} + if msg == nil { + continue + } + item, ok := msg.Items[thriditem].(string) + if ok { + m[item] = struct{}{} + } } done <- nil }() diff --git a/worker/imap/idler.go b/worker/imap/idler.go index aa61776d..369ebd2e 100644 --- a/worker/imap/idler.go +++ b/worker/imap/idler.go @@ -2,156 +2,131 @@ package imap import ( "fmt" - "sync" "time" "git.sr.ht/~rjarry/aerc/log" "git.sr.ht/~rjarry/aerc/worker/types" "github.com/emersion/go-imap" - "github.com/emersion/go-imap/client" ) -var ( - errIdleTimeout = fmt.Errorf("idle timeout") - errIdleModeHangs = fmt.Errorf("idle mode hangs; waiting to reconnect") -) +var errIdleTimeout = fmt.Errorf("idle timeout") // idler manages the idle mode of the imap server. Enter idle mode if there's // no other task and leave idle mode when a new task arrives. Idle mode is only // used when the client is ready and connected. After a connection loss, make // sure that idling returns gracefully and the worker remains responsive. type idler struct { - sync.Mutex - config imapConfig - client *imapClient - worker types.WorkerInteractor - stop chan struct{} - done chan error - waiting bool - idleing bool + client *imapClient + debouncer *time.Timer + debounce time.Duration + timeout time.Duration + worker types.WorkerInteractor + stop chan struct{} + start chan struct{} + done chan error } -func newIdler(cfg imapConfig, w types.WorkerInteractor) *idler { - return &idler{config: cfg, worker: w, done: make(chan error)} +func newIdler(cfg imapConfig, w types.WorkerInteractor, startIdler chan struct{}) *idler { + return &idler{ + debouncer: nil, + debounce: cfg.idle_debounce, + timeout: cfg.idle_timeout, + worker: w, + stop: make(chan struct{}), + start: startIdler, + done: make(chan error), + } } func (i *idler) SetClient(c *imapClient) { - i.Lock() i.client = c - i.Unlock() } -func (i *idler) setWaiting(wait bool) { - i.Lock() - i.waiting = wait - i.Unlock() -} - -func (i *idler) isWaiting() bool { - i.Lock() - defer i.Unlock() - return i.waiting -} - -func (i *idler) isReady() bool { - i.Lock() - defer i.Unlock() - return (!i.waiting && i.client != nil && - i.client.State() == imap.SelectedState) -} - -func (i *idler) setIdleing(v bool) { - i.Lock() - defer i.Unlock() - i.idleing = v -} - -func (i *idler) isIdleing() bool { - i.Lock() - defer i.Unlock() - return i.idleing +func (i *idler) ready() bool { + return (i.client != nil && i.client.State() == imap.SelectedState) } func (i *idler) Start() { - switch { - case i.isReady(): - i.stop = make(chan struct{}) - - go func() { - defer log.PanicHandler() - select { - case <-i.stop: - // debounce idle - i.done <- nil - case <-time.After(i.config.idle_debounce): - // enter idle mode - i.setIdleing(true) - now := time.Now() - err := i.client.Idle(i.stop, - &client.IdleOptions{ - LogoutTimeout: 0, - PollInterval: 0, - }) - i.setIdleing(false) - i.done <- err - i.log("elapsed idle time: %v", time.Since(now)) - } - }() - - case i.isWaiting(): - i.log("not started: wait for idle to exit") - default: - i.log("not started: client not ready") + if !i.ready() { + return } -} -func (i *idler) Stop() error { - var reterr error - switch { - case i.isReady(): + select { + case <-i.stop: + // stop channel is nil (probably after a debounce), we don't + // want to close it + default: close(i.stop) + } + + // create new stop channel + i.stop = make(chan struct{}) + + // clear done channel + clearing := true + for clearing { select { - case err := <-i.done: - if err != nil { - i.log("<=(idle) with err: %v", err) - } - reterr = nil - case <-time.After(i.config.idle_timeout): - i.worker.PostMessage(&types.Done{ - Message: types.RespondTo(&types.Disconnect{}), - }, nil) - - i.waitOnIdle() - - reterr = errIdleTimeout + case <-i.done: + continue + default: + clearing = false } - case i.isWaiting(): - reterr = errIdleModeHangs - default: - reterr = nil } - return reterr + + i.worker.Tracef("idler (start): start idle after debounce") + i.debouncer = time.AfterFunc(i.debounce, func() { + i.start <- struct{}{} + i.worker.Tracef("idler (start): started") + }) } -func (i *idler) waitOnIdle() { - i.setWaiting(true) +func (i *idler) Execute() { + if !i.ready() { + return + } + + // we need to call client.Idle in a goroutine since it is blocking call + // and we still want to receive messages go func() { defer log.PanicHandler() - err := <-i.done - if err == nil { - i.worker.PostMessage(&types.Done{ - Message: types.RespondTo(&types.Connect{}), - }, nil) - } else { - i.log("<=(idle) waited; with err: %v", err) + + start := time.Now() + err := i.client.Idle(i.stop, nil) + if err != nil { + i.worker.Errorf("idle returned error: %v", err) } - i.setWaiting(false) - i.stop = make(chan struct{}) - i.Start() + i.worker.Tracef("idler (execute): idleing for %s", time.Since(start)) + + i.done <- err }() } -func (i *idler) log(format string, v ...interface{}) { - msg := fmt.Sprintf(format, v...) - i.worker.Tracef("idler (%p) [idle:%t,wait:%t] %s", i, i.isIdleing(), i.isWaiting(), msg) +func (i *idler) Stop() error { + if !i.ready() { + return nil + } + + select { + case <-i.stop: + i.worker.Debugf("idler (stop): idler already stopped?") + return nil + default: + close(i.stop) + } + + if i.debouncer != nil { + if i.debouncer.Stop() { + i.worker.Tracef("idler (stop): debounced") + return nil + } + } + + select { + case err := <-i.done: + i.worker.Tracef("idler (stop): idle stopped: %v", err) + return err + case <-time.After(i.timeout): + i.worker.Errorf("idler (stop): cannot stop idle (timeout)") + return errIdleTimeout + } } diff --git a/worker/imap/observer.go b/worker/imap/observer.go index 7367ff58..7a604a1a 100644 --- a/worker/imap/observer.go +++ b/worker/imap/observer.go @@ -105,9 +105,6 @@ func (o *observer) Stop() { } func (o *observer) DelayedReconnect() error { - if o.client == nil { - return nil - } var wait time.Duration var reterr error diff --git a/worker/imap/worker.go b/worker/imap/worker.go index 7ef759d4..391f365a 100644 --- a/worker/imap/worker.go +++ b/worker/imap/worker.go @@ -80,29 +80,37 @@ type IMAPWorker struct { threadAlgorithm sortthread.ThreadAlgorithm liststatus bool + + executeIdle chan struct{} } func NewIMAPWorker(worker *types.Worker) (types.Backend, error) { return &IMAPWorker{ - updates: make(chan client.Update, 50), - worker: worker, - selected: &imap.MailboxStatus{}, - idler: newIdler(imapConfig{}, worker), - observer: newObserver(imapConfig{}, worker), - caps: &models.Capabilities{}, + updates: make(chan client.Update, 50), + worker: worker, + selected: &imap.MailboxStatus{}, + idler: nil, // will be set in configure() + observer: nil, // will be set in configure() + caps: &models.Capabilities{}, + executeIdle: make(chan struct{}), }, nil } func (w *IMAPWorker) newClient(c *client.Client) { - c.Updates = w.updates + c.Updates = nil w.client = &imapClient{ c, sortthread.NewThreadClient(c), sortthread.NewSortClient(c), extensions.NewListStatusClient(c), } - w.idler.SetClient(w.client) - w.observer.SetClient(w.client) + if w.idler != nil { + w.idler.SetClient(w.client) + c.Updates = w.updates + } + if w.observer != nil { + w.observer.SetClient(w.client) + } sort, err := w.client.sort.SupportSort() if err == nil && sort { w.caps.Sort = true @@ -125,7 +133,7 @@ func (w *IMAPWorker) newClient(c *client.Client) { xgmext, err := w.client.Support("X-GM-EXT-1") if err == nil && xgmext && w.config.useXGMEXT { w.worker.Debugf("Server Capability found: X-GM-EXT-1") - w.worker = middleware.NewGmailWorker(w.worker, w.client.Client, w.idler) + w.worker = middleware.NewGmailWorker(w.worker, w.client.Client) } if err == nil && !xgmext && w.config.useXGMEXT { w.worker.Infof("X-GM-EXT-1 requested, but it is not supported") @@ -133,13 +141,6 @@ func (w *IMAPWorker) newClient(c *client.Client) { } func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error { - defer func() { - w.idler.Start() - }() - if err := w.idler.Stop(); err != nil { - return err - } - var reterr error // will be returned at the end, needed to support idle // when client is nil allow only certain messages to be handled @@ -200,12 +201,14 @@ func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error { case *types.Disconnect: w.observer.SetAutoReconnect(false) w.observer.Stop() - if w.client == nil || w.client.State() != imap.SelectedState { + + if w.client == nil || (w.client != nil && w.client.State() != imap.SelectedState) { reterr = errNotConnected break } if err := w.client.Logout(); err != nil { + w.terminate() reterr = err break } @@ -298,10 +301,64 @@ func (w *IMAPWorker) handleImapUpdate(update client.Update) { } } +func (w *IMAPWorker) terminate() { + if w.observer != nil { + w.observer.Stop() + w.observer.SetClient(nil) + } + + if w.client != nil { + w.client.Updates = nil + if err := w.client.Terminate(); err != nil { + w.worker.Errorf("could not terminate connection: %v", err) + } + } + + w.client = nil + w.selected = &imap.MailboxStatus{} + + if w.idler != nil { + w.idler.SetClient(nil) + } +} + +func (w *IMAPWorker) stopIdler() error { + if w.idler == nil { + return nil + } + + if err := w.idler.Stop(); err != nil { + w.terminate() + w.observer.EmitIfNotConnected() + w.worker.Errorf("idler stopped with error:%v", err) + return err + } + + return nil +} + +func (w *IMAPWorker) startIdler() { + if w.idler == nil { + return + } + + w.idler.Start() +} + func (w *IMAPWorker) Run() { for { select { case msg := <-w.worker.Actions(): + + if err := w.stopIdler(); err != nil { + w.worker.PostMessage(&types.Error{ + Message: types.RespondTo(msg), + Error: err, + }, nil) + break + } + w.worker.Tracef("ready to handle %T", msg) + msg = w.worker.ProcessAction(msg) if err := w.handleMessage(msg); errors.Is(err, errUnsupported) { @@ -315,8 +372,13 @@ func (w *IMAPWorker) Run() { }, nil) } + w.startIdler() + case update := <-w.updates: w.handleImapUpdate(update) + + case <-w.executeIdle: + w.idler.Execute() } } } diff --git a/worker/middleware/gmailworker.go b/worker/middleware/gmailworker.go index 807f7bff..f9924732 100644 --- a/worker/middleware/gmailworker.go +++ b/worker/middleware/gmailworker.go @@ -8,20 +8,14 @@ import ( "github.com/emersion/go-imap/client" ) -type idler interface { - Start() - Stop() error -} - type gmailWorker struct { types.WorkerInteractor mu sync.Mutex client *client.Client - idler idler } // NewGmailWorker returns an IMAP middleware for the X-GM-EXT-1 extension -func NewGmailWorker(base types.WorkerInteractor, c *client.Client, i idler, +func NewGmailWorker(base types.WorkerInteractor, c *client.Client, ) types.WorkerInteractor { base.Infof("loading worker middleware: X-GM-EXT-1") @@ -29,37 +23,30 @@ func NewGmailWorker(base types.WorkerInteractor, c *client.Client, i idler, for iter := base; iter != nil; iter = iter.Unwrap() { if g, ok := iter.(*gmailWorker); ok { base.Infof("already loaded; resetting") - err := g.reset(c, i) + err := g.reset(c) if err != nil { base.Errorf("reset failed: %v", err) } return base } } - return &gmailWorker{WorkerInteractor: base, client: c, idler: i} + return &gmailWorker{WorkerInteractor: base, client: c} } func (g *gmailWorker) Unwrap() types.WorkerInteractor { return g.WorkerInteractor } -func (g *gmailWorker) reset(c *client.Client, i idler) error { +func (g *gmailWorker) reset(c *client.Client) error { g.mu.Lock() defer g.mu.Unlock() g.client = c - g.idler = i return nil } func (g *gmailWorker) ProcessAction(msg types.WorkerMessage) types.WorkerMessage { - switch msg := msg.(type) { - case *types.FetchMessageHeaders: + if msg, ok := msg.(*types.FetchMessageHeaders); ok && len(msg.Uids) > 0 { g.mu.Lock() - err := g.idler.Stop() - if err != nil { - g.Errorf("idler reported an error: %v", err) - break - } handler := xgmext.NewHandler(g.client) uids, err := handler.FetchEntireThreads(msg.Uids) @@ -71,7 +58,6 @@ func (g *gmailWorker) ProcessAction(msg types.WorkerMessage) types.WorkerMessage msg.Uids = uids } - g.idler.Start() g.mu.Unlock() } return g.WorkerInteractor.ProcessAction(msg) |