diff options
-rw-r--r-- | worker/imap/checkmail.go | 2 | ||||
-rw-r--r-- | worker/imap/configure.go | 1 | ||||
-rw-r--r-- | worker/imap/connect.go | 2 | ||||
-rw-r--r-- | worker/imap/worker.go | 3 | ||||
-rw-r--r-- | worker/jmap/worker.go | 2 | ||||
-rw-r--r-- | worker/maildir/worker.go | 2 | ||||
-rw-r--r-- | worker/mbox/worker.go | 2 | ||||
-rw-r--r-- | worker/notmuch/worker.go | 10 | ||||
-rw-r--r-- | worker/types/worker.go | 52 |
9 files changed, 35 insertions, 41 deletions
diff --git a/worker/imap/checkmail.go b/worker/imap/checkmail.go index b457dc4c..ae20a5b8 100644 --- a/worker/imap/checkmail.go +++ b/worker/imap/checkmail.go @@ -31,7 +31,7 @@ func (w *IMAPWorker) handleCheckMailMessage(msg *types.CheckMail) { } default: for _, dir := range msg.Directories { - if len(w.worker.Actions) > 0 { + if len(w.worker.Actions()) > 0 { remaining = append(remaining, dir) continue } diff --git a/worker/imap/configure.go b/worker/imap/configure.go index 1581794f..783485a7 100644 --- a/worker/imap/configure.go +++ b/worker/imap/configure.go @@ -12,6 +12,7 @@ import ( ) func (w *IMAPWorker) handleConfigure(msg *types.Configure) error { + w.config.name = msg.Config.Name u, err := url.Parse(msg.Config.Source) if err != nil { return err diff --git a/worker/imap/connect.go b/worker/imap/connect.go index 84d69fe5..818952ac 100644 --- a/worker/imap/connect.go +++ b/worker/imap/connect.go @@ -82,7 +82,7 @@ func (w *IMAPWorker) connect() (*client.Client, error) { } } else if w.config.xoauth2.Enabled { if err := w.config.xoauth2.Authenticate( - username, password, w.worker.Name, c); err != nil { + username, password, w.config.name, c); err != nil { return nil, err } } else if err := c.Login(username, password); err != nil { diff --git a/worker/imap/worker.go b/worker/imap/worker.go index dc1891ca..d30140a1 100644 --- a/worker/imap/worker.go +++ b/worker/imap/worker.go @@ -38,6 +38,7 @@ type imapClient struct { } type imapConfig struct { + name string scheme string insecure bool addr string @@ -290,7 +291,7 @@ func (w *IMAPWorker) handleImapUpdate(update client.Update) { func (w *IMAPWorker) Run() { for { select { - case msg := <-w.worker.Actions: + case msg := <-w.worker.Actions(): msg = w.worker.ProcessAction(msg) if err := w.handleMessage(msg); errors.Is(err, errUnsupported) { diff --git a/worker/jmap/worker.go b/worker/jmap/worker.go index a538f60c..efd6b041 100644 --- a/worker/jmap/worker.go +++ b/worker/jmap/worker.go @@ -172,7 +172,7 @@ func (w *JMAPWorker) Run() { if err != nil { w.w.Errorf("refresh: %s", err) } - case msg := <-w.w.Actions: + case msg := <-w.w.Actions(): msg = w.w.ProcessAction(msg) err := w.handleMessage(msg) switch { diff --git a/worker/maildir/worker.go b/worker/maildir/worker.go index 5f0c70c7..46cfd472 100644 --- a/worker/maildir/worker.go +++ b/worker/maildir/worker.go @@ -89,7 +89,7 @@ func NewMaildirppWorker(worker *types.Worker) (types.Backend, error) { func (w *Worker) Run() { for { select { - case action := <-w.worker.Actions: + case action := <-w.worker.Actions(): w.handleAction(action) case <-w.watcher.Events(): if w.watcherDebounce != nil { diff --git a/worker/mbox/worker.go b/worker/mbox/worker.go index 0e6c64d1..5034f66d 100644 --- a/worker/mbox/worker.go +++ b/worker/mbox/worker.go @@ -374,7 +374,7 @@ func (w *mboxWorker) handleMessage(msg types.WorkerMessage) error { } func (w *mboxWorker) Run() { - for msg := range w.worker.Actions { + for msg := range w.worker.Actions() { msg = w.worker.ProcessAction(msg) if err := w.handleMessage(msg); errors.Is(err, errUnsupported) { w.worker.PostMessage(&types.Unsupported{ diff --git a/worker/notmuch/worker.go b/worker/notmuch/worker.go index b3f4013e..f47f3889 100644 --- a/worker/notmuch/worker.go +++ b/worker/notmuch/worker.go @@ -58,7 +58,7 @@ func NewWorker(w *types.Worker) (types.Backend, error) { events := make(chan eventType, 20) watcher, err := handlers.NewWatcher() if err != nil { - return nil, fmt.Errorf("(%s) could not create file system watcher: %w", w.Name, err) + return nil, fmt.Errorf("could not create file system watcher: %w", err) } return &worker{ w: w, @@ -75,7 +75,7 @@ func NewWorker(w *types.Worker) (types.Backend, error) { func (w *worker) Run() { for { select { - case action := <-w.w.Actions: + case action := <-w.w.Actions(): msg := w.w.ProcessAction(action) err := w.handleMessage(msg) switch { @@ -759,7 +759,7 @@ func (w *worker) sort(uids []uint32, func (w *worker) handleCheckMail(msg *types.CheckMail) { defer log.PanicHandler() if msg.Command == "" { - w.err(msg, fmt.Errorf("(%s) checkmail: no command specified", w.w.Name)) + w.err(msg, fmt.Errorf("(%s) checkmail: no command specified", msg.Account())) return } ctx, cancel := context.WithTimeout(context.Background(), msg.Timeout) @@ -768,9 +768,9 @@ func (w *worker) handleCheckMail(msg *types.CheckMail) { err := cmd.Run() switch { case ctx.Err() != nil: - w.err(msg, fmt.Errorf("(%s) checkmail: timed out", w.w.Name)) + w.err(msg, fmt.Errorf("(%s) checkmail: timed out", msg.Account())) case err != nil: - w.err(msg, fmt.Errorf("(%s) checkmail: error running command: %w", w.w.Name, err)) + w.err(msg, fmt.Errorf("(%s) checkmail: error running command: %w", msg.Account(), err)) default: w.done(msg) } diff --git a/worker/types/worker.go b/worker/types/worker.go index e263679a..663ac067 100644 --- a/worker/types/worker.go +++ b/worker/types/worker.go @@ -9,6 +9,14 @@ import ( "git.sr.ht/~rjarry/aerc/models" ) +type WorkerInteractor interface { + log.Logger + Actions() chan WorkerMessage + ProcessAction(WorkerMessage) WorkerMessage + PostAction(WorkerMessage, func(msg WorkerMessage)) + PostMessage(WorkerMessage, func(msg WorkerMessage)) +} + var lastId int64 = 1 // access via atomic type Backend interface { @@ -19,29 +27,33 @@ type Backend interface { type Worker struct { Backend Backend - Actions chan WorkerMessage - Name string - logger log.Logger + actions chan WorkerMessage actionCallbacks map[int64]func(msg WorkerMessage) messageCallbacks map[int64]func(msg WorkerMessage) actionQueue *list.List status int32 + name string sync.Mutex + log.Logger } func NewWorker(name string) *Worker { return &Worker{ - Actions: make(chan WorkerMessage), - Name: name, + Logger: log.NewLogger(name, 2), + actions: make(chan WorkerMessage), actionCallbacks: make(map[int64]func(msg WorkerMessage)), messageCallbacks: make(map[int64]func(msg WorkerMessage)), actionQueue: list.New(), - logger: log.NewLogger(name, 3), + name: name, } } +func (worker *Worker) Actions() chan WorkerMessage { + return worker.actions +} + func (worker *Worker) setId(msg WorkerMessage) { id := atomic.AddInt64(&lastId, 1) msg.setId(id) @@ -64,7 +76,7 @@ func (worker *Worker) queue(msg WorkerMessage) { } } -// Start processing the action queue and write all messages to the Actions +// Start processing the action queue and write all messages to the actions // channel, one by one. Stop when the action queue is empty. func (worker *Worker) processQueue() { defer log.PanicHandler() @@ -78,7 +90,7 @@ func (worker *Worker) processQueue() { } msg := worker.actionQueue.Remove(e).(WorkerMessage) worker.Unlock() - worker.Actions <- msg + worker.actions <- msg } } @@ -86,7 +98,7 @@ func (worker *Worker) processQueue() { // from the same goroutine that the worker runs in or deadlocks may occur func (worker *Worker) PostAction(msg WorkerMessage, cb func(msg WorkerMessage)) { worker.setId(msg) - // write to Actions channel without blocking + // write to actions channel without blocking worker.queue(msg) if cb != nil { @@ -104,7 +116,7 @@ func (worker *Worker) PostMessage(msg WorkerMessage, cb func(msg WorkerMessage), ) { worker.setId(msg) - msg.setAccount(worker.Name) + msg.setAccount(worker.name) WorkerMessages <- msg @@ -167,23 +179,3 @@ func (worker *Worker) PostMessageInfoError(msg WorkerMessage, uid uint32, err er func (worker *Worker) PathSeparator() string { return worker.Backend.PathSeparator() } - -func (worker *Worker) Tracef(message string, args ...interface{}) { - worker.logger.Tracef(message, args...) -} - -func (worker *Worker) Debugf(message string, args ...interface{}) { - worker.logger.Debugf(message, args...) -} - -func (worker *Worker) Infof(message string, args ...interface{}) { - worker.logger.Infof(message, args...) -} - -func (worker *Worker) Warnf(message string, args ...interface{}) { - worker.logger.Warnf(message, args...) -} - -func (worker *Worker) Errorf(message string, args ...interface{}) { - worker.logger.Errorf(message, args...) -} |