aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKoni Marti <koni.marti@gmail.com>2022-04-30 01:08:56 +0200
committerRobin Jarry <robin@jarry.cc>2022-05-04 14:07:15 +0200
commite5b339702a56fa02dedec770a79b64313fb30108 (patch)
treee9c345f5043c5cf748bc6e25c6be6c1bb173d33f
parent397a6f267f41c501f28d3adb9d641a9283af474f (diff)
downloadaerc-e5b339702a56fa02dedec770a79b64313fb30108.tar.gz
imap: monitor the logout channel with an observer
Untangle the observer functionality from the message handling routine. Observe the imap client's logout channel and trigger a connection error when necessary to start the reconnect cycle. Signed-off-by: Koni Marti <koni.marti@gmail.com> Acked-by: Robin Jarry <robin@jarry.cc>
-rw-r--r--worker/imap/configure.go12
-rw-r--r--worker/imap/observer.go152
-rw-r--r--worker/imap/worker.go108
3 files changed, 186 insertions, 86 deletions
diff --git a/worker/imap/configure.go b/worker/imap/configure.go
index 0bccbaea..c25600df 100644
--- a/worker/imap/configure.go
+++ b/worker/imap/configure.go
@@ -50,6 +50,9 @@ func (w *IMAPWorker) handleConfigure(msg *types.Configure) error {
w.config.keepalive_period = 0 * time.Second
w.config.keepalive_probes = 3
w.config.keepalive_interval = 3
+
+ w.config.reconnect_maxwait = 30 * time.Second
+
for key, value := range msg.Config.Params {
switch key {
case "idle-timeout":
@@ -60,6 +63,14 @@ func (w *IMAPWorker) handleConfigure(msg *types.Configure) error {
value, err)
}
w.config.idle_timeout = val
+ case "reconnect-maxwait":
+ val, err := time.ParseDuration(value)
+ if err != nil || val < 0 {
+ return fmt.Errorf(
+ "invalid reconnect-maxwait value %v: %v",
+ value, err)
+ }
+ w.config.reconnect_maxwait = val
case "connection-timeout":
val, err := time.ParseDuration(value)
if err != nil || val < 0 {
@@ -96,6 +107,7 @@ func (w *IMAPWorker) handleConfigure(msg *types.Configure) error {
}
w.idler = newIdler(w.config, w.worker)
+ w.observer = newObserver(w.config, w.worker)
return nil
}
diff --git a/worker/imap/observer.go b/worker/imap/observer.go
new file mode 100644
index 00000000..e49744cc
--- /dev/null
+++ b/worker/imap/observer.go
@@ -0,0 +1,152 @@
+package imap
+
+import (
+ "fmt"
+ "math"
+ "sync"
+ "time"
+
+ "git.sr.ht/~rjarry/aerc/worker/types"
+ "github.com/emersion/go-imap"
+)
+
+// observer monitors the loggedOut channel of the imap client. If the logout
+// signal is received, the observer will emit a connection error to the ui in
+// order to start the reconnect cycle.
+type observer struct {
+ sync.Mutex
+ config imapConfig
+ client *imapClient
+ worker *types.Worker
+ done chan struct{}
+ autoReconnect bool
+ retries int
+ running bool
+}
+
+func newObserver(cfg imapConfig, w *types.Worker) *observer {
+ return &observer{config: cfg, worker: w, done: make(chan struct{})}
+}
+
+func (o *observer) SetClient(c *imapClient) {
+ o.Stop()
+ o.Lock()
+ o.client = c
+ o.Unlock()
+ o.Start()
+ o.retries = 0
+}
+
+func (o *observer) SetAutoReconnect(auto bool) {
+ o.autoReconnect = auto
+}
+
+func (o *observer) AutoReconnect() bool {
+ return o.autoReconnect
+}
+
+func (o *observer) isClientConnected() bool {
+ o.Lock()
+ defer o.Unlock()
+ return o.client != nil && o.client.State() == imap.SelectedState
+}
+
+func (o *observer) EmitIfNotConnected() bool {
+ if !o.isClientConnected() {
+ o.emit("imap client not connected: attempt reconnect")
+ return true
+ }
+ return false
+}
+
+func (o *observer) IsRunning() bool {
+ return o.running
+}
+
+func (o *observer) Start() {
+ if o.running {
+ o.log("runs already")
+ return
+ }
+ if o.client == nil {
+ return
+ }
+ if o.EmitIfNotConnected() {
+ return
+ }
+ go func() {
+ select {
+ case <-o.client.LoggedOut():
+ o.log("<-logout")
+ if o.autoReconnect {
+ o.emit("logged out")
+ } else {
+ o.log("ignore logout (auto-reconnect off)")
+ }
+ case <-o.done:
+ o.log("<-done")
+ }
+ o.running = false
+ o.log("stopped")
+ }()
+ o.running = true
+ o.log("started")
+}
+
+func (o *observer) Stop() {
+ if o.client == nil {
+ return
+ }
+ if o.done != nil {
+ close(o.done)
+ }
+ o.done = make(chan struct{})
+ o.running = false
+}
+
+func (o *observer) DelayedReconnect() error {
+ if o.client == nil {
+ return nil
+ }
+ var wait time.Duration
+ var reterr error
+
+ if o.retries > 0 {
+ backoff := int(math.Pow(1.8, float64(o.retries)))
+ var err error
+ wait, err = time.ParseDuration(fmt.Sprintf("%ds", backoff))
+ if err != nil {
+ return err
+ }
+ if wait > o.config.reconnect_maxwait {
+ wait = o.config.reconnect_maxwait
+ }
+
+ reterr = fmt.Errorf("reconnect in %v", wait)
+ } else {
+ reterr = fmt.Errorf("reconnect")
+ }
+
+ go func() {
+ <-time.After(wait)
+ o.emit(reterr.Error())
+ }()
+
+ o.retries++
+ return reterr
+}
+
+func (o *observer) emit(errMsg string) {
+ o.log("disconnect done->")
+ o.worker.PostMessage(&types.Done{
+ Message: types.RespondTo(&types.Disconnect{})}, nil)
+ o.log("connection error->")
+ o.worker.PostMessage(&types.ConnError{
+ Error: fmt.Errorf(errMsg),
+ }, nil)
+}
+
+func (o *observer) log(args ...interface{}) {
+ header := fmt.Sprintf("observer (%p) [running:%t]", o, o.running)
+ o.worker.Logger.Println(append([]interface{}{header}, args...)...)
+}
diff --git a/worker/imap/worker.go b/worker/imap/worker.go
index d0f8482f..6e475305 100644
--- a/worker/imap/worker.go
+++ b/worker/imap/worker.go
@@ -3,7 +3,6 @@ package imap
import (
"crypto/tls"
"fmt"
- "math"
"net"
"net/url"
"time"
@@ -37,13 +36,14 @@ type imapClient struct {
}
type imapConfig struct {
- scheme string
- insecure bool
- addr string
- user *url.Userinfo
- folders []string
- oauthBearer lib.OAuthBearer
- idle_timeout time.Duration
+ scheme string
+ insecure bool
+ addr string
+ user *url.Userinfo
+ folders []string
+ oauthBearer lib.OAuthBearer
+ idle_timeout time.Duration
+ reconnect_maxwait time.Duration
// tcp connection parameters
connection_timeout time.Duration
keepalive_period time.Duration
@@ -61,11 +61,8 @@ type IMAPWorker struct {
// Map of sequence numbers to UIDs, index 0 is seq number 1
seqMap []uint32
- done chan struct{}
- autoReconnect bool
- retries int
-
- idler *idler
+ idler *idler
+ observer *observer
}
func NewIMAPWorker(worker *types.Worker) (types.Backend, error) {
@@ -74,6 +71,7 @@ func NewIMAPWorker(worker *types.Worker) (types.Backend, error) {
worker: worker,
selected: &imap.MailboxStatus{},
idler: newIdler(imapConfig{}, worker),
+ observer: newObserver(imapConfig{}, worker),
}, nil
}
@@ -81,6 +79,7 @@ func (w *IMAPWorker) newClient(c *client.Client) {
c.Updates = w.updates
w.client = &imapClient{c, sortthread.NewThreadClient(c), sortthread.NewSortClient(c)}
w.idler.SetClient(w.client)
+ w.observer.SetClient(w.client)
}
func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error {
@@ -93,12 +92,6 @@ func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error {
var reterr error // will be returned at the end, needed to support idle
- checkConn := func(wait time.Duration) {
- time.Sleep(wait)
- w.stopConnectionObserver()
- w.startConnectionObserver()
- }
-
// set connection timeout for calls to imap server
if w.client != nil {
w.client.Timeout = w.config.connection_timeout
@@ -111,53 +104,43 @@ func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error {
reterr = w.handleConfigure(msg)
case *types.Connect:
if w.client != nil && w.client.State() == imap.SelectedState {
- if !w.autoReconnect {
- w.autoReconnect = true
- checkConn(0)
+ if !w.observer.AutoReconnect() {
+ w.observer.SetAutoReconnect(true)
+ w.observer.EmitIfNotConnected()
}
reterr = errAlreadyConnected
break
}
- w.autoReconnect = true
+ w.observer.SetAutoReconnect(true)
c, err := w.connect()
if err != nil {
- checkConn(0)
+ w.observer.EmitIfNotConnected()
reterr = err
break
}
- w.stopConnectionObserver()
-
w.newClient(c)
- w.startConnectionObserver()
-
w.worker.PostMessage(&types.Done{Message: types.RespondTo(msg)}, nil)
case *types.Reconnect:
- if !w.autoReconnect {
+ if !w.observer.AutoReconnect() {
reterr = fmt.Errorf("auto-reconnect is disabled; run connect to enable it")
break
}
c, err := w.connect()
if err != nil {
- wait, msg := w.exponentialBackoff()
- go checkConn(wait)
- w.retries++
- reterr = errors.Wrap(err, msg)
+ errReconnect := w.observer.DelayedReconnect()
+ reterr = errors.Wrap(errReconnect, err.Error())
break
}
- w.stopConnectionObserver()
-
w.newClient(c)
- w.startConnectionObserver()
-
w.worker.PostMessage(&types.Done{Message: types.RespondTo(msg)}, nil)
case *types.Disconnect:
- w.autoReconnect = false
- w.stopConnectionObserver()
+ w.observer.SetAutoReconnect(false)
+ w.observer.Stop()
if w.client == nil || w.client.State() != imap.SelectedState {
reterr = errNotConnected
break
@@ -267,51 +250,6 @@ func (w *IMAPWorker) handleImapUpdate(update client.Update) {
}
}
-func (w *IMAPWorker) exponentialBackoff() (time.Duration, string) {
- maxWait := 16
- if w.retries > 0 {
- backoff := int(math.Pow(2.0, float64(w.retries)))
- if backoff > maxWait {
- backoff = maxWait
- }
- waitStr := fmt.Sprintf("%ds", backoff)
- wait, err := time.ParseDuration(waitStr)
- if err == nil {
- return wait, fmt.Sprintf("wait %s before reconnect", waitStr)
- }
- }
- return 0 * time.Second, ""
-}
-
-func (w *IMAPWorker) startConnectionObserver() {
- emitConnErr := func(errMsg string) {
- w.worker.PostMessage(&types.ConnError{
- Error: fmt.Errorf(errMsg),
- }, nil)
- }
- if w.client == nil {
- emitConnErr("imap client not connected")
- return
- }
- go func() {
- select {
- case <-w.client.LoggedOut():
- if w.autoReconnect {
- emitConnErr("imap: logged out")
- }
- case <-w.done:
- return
- }
- }()
-}
-
-func (w *IMAPWorker) stopConnectionObserver() {
- if w.done != nil {
- close(w.done)
- }
- w.done = make(chan struct{})
-}
-
func (w *IMAPWorker) connect() (*client.Client, error) {
var (
conn *net.TCPConn
@@ -391,8 +329,6 @@ func (w *IMAPWorker) connect() (*client.Client, error) {
return nil, err
}
- w.retries = 0
-
return c, nil
}