aboutsummaryrefslogtreecommitdiffstats
path: root/worker
diff options
context:
space:
mode:
authorKoni Marti <koni.marti@gmail.com>2023-06-21 22:13:01 +0200
committerRobin Jarry <robin@jarry.cc>2023-06-22 10:55:25 +0200
commit697b56b6d3071aab78a3b81114ba39618b59848f (patch)
treeb355c44c3d1f5bb295b2f31c2655a3fdd6854f0c /worker
parent0fc5ffb260f764f2e7313506aa0f73ad98cbea40 (diff)
downloadaerc-697b56b6d3071aab78a3b81114ba39618b59848f.tar.gz
worker: add WorkerInteractor interface
Add a WorkerInteractor interface. Avoid exposing any public fields in the types.Worker. This will set the stage to implement a middleware pattern for the workers, i.e. to map folder names between the ui and the backend. Signed-off-by: Koni Marti <koni.marti@gmail.com> Tested-by: Bence Ferdinandy <bence@ferdinandy.com> Signed-off-by: Robin Jarry <robin@jarry.cc>
Diffstat (limited to 'worker')
-rw-r--r--worker/imap/checkmail.go2
-rw-r--r--worker/imap/configure.go1
-rw-r--r--worker/imap/connect.go2
-rw-r--r--worker/imap/worker.go3
-rw-r--r--worker/jmap/worker.go2
-rw-r--r--worker/maildir/worker.go2
-rw-r--r--worker/mbox/worker.go2
-rw-r--r--worker/notmuch/worker.go10
-rw-r--r--worker/types/worker.go52
9 files changed, 35 insertions, 41 deletions
diff --git a/worker/imap/checkmail.go b/worker/imap/checkmail.go
index b457dc4c..ae20a5b8 100644
--- a/worker/imap/checkmail.go
+++ b/worker/imap/checkmail.go
@@ -31,7 +31,7 @@ func (w *IMAPWorker) handleCheckMailMessage(msg *types.CheckMail) {
}
default:
for _, dir := range msg.Directories {
- if len(w.worker.Actions) > 0 {
+ if len(w.worker.Actions()) > 0 {
remaining = append(remaining, dir)
continue
}
diff --git a/worker/imap/configure.go b/worker/imap/configure.go
index 1581794f..783485a7 100644
--- a/worker/imap/configure.go
+++ b/worker/imap/configure.go
@@ -12,6 +12,7 @@ import (
)
func (w *IMAPWorker) handleConfigure(msg *types.Configure) error {
+ w.config.name = msg.Config.Name
u, err := url.Parse(msg.Config.Source)
if err != nil {
return err
diff --git a/worker/imap/connect.go b/worker/imap/connect.go
index 84d69fe5..818952ac 100644
--- a/worker/imap/connect.go
+++ b/worker/imap/connect.go
@@ -82,7 +82,7 @@ func (w *IMAPWorker) connect() (*client.Client, error) {
}
} else if w.config.xoauth2.Enabled {
if err := w.config.xoauth2.Authenticate(
- username, password, w.worker.Name, c); err != nil {
+ username, password, w.config.name, c); err != nil {
return nil, err
}
} else if err := c.Login(username, password); err != nil {
diff --git a/worker/imap/worker.go b/worker/imap/worker.go
index dc1891ca..d30140a1 100644
--- a/worker/imap/worker.go
+++ b/worker/imap/worker.go
@@ -38,6 +38,7 @@ type imapClient struct {
}
type imapConfig struct {
+ name string
scheme string
insecure bool
addr string
@@ -290,7 +291,7 @@ func (w *IMAPWorker) handleImapUpdate(update client.Update) {
func (w *IMAPWorker) Run() {
for {
select {
- case msg := <-w.worker.Actions:
+ case msg := <-w.worker.Actions():
msg = w.worker.ProcessAction(msg)
if err := w.handleMessage(msg); errors.Is(err, errUnsupported) {
diff --git a/worker/jmap/worker.go b/worker/jmap/worker.go
index a538f60c..efd6b041 100644
--- a/worker/jmap/worker.go
+++ b/worker/jmap/worker.go
@@ -172,7 +172,7 @@ func (w *JMAPWorker) Run() {
if err != nil {
w.w.Errorf("refresh: %s", err)
}
- case msg := <-w.w.Actions:
+ case msg := <-w.w.Actions():
msg = w.w.ProcessAction(msg)
err := w.handleMessage(msg)
switch {
diff --git a/worker/maildir/worker.go b/worker/maildir/worker.go
index 5f0c70c7..46cfd472 100644
--- a/worker/maildir/worker.go
+++ b/worker/maildir/worker.go
@@ -89,7 +89,7 @@ func NewMaildirppWorker(worker *types.Worker) (types.Backend, error) {
func (w *Worker) Run() {
for {
select {
- case action := <-w.worker.Actions:
+ case action := <-w.worker.Actions():
w.handleAction(action)
case <-w.watcher.Events():
if w.watcherDebounce != nil {
diff --git a/worker/mbox/worker.go b/worker/mbox/worker.go
index 0e6c64d1..5034f66d 100644
--- a/worker/mbox/worker.go
+++ b/worker/mbox/worker.go
@@ -374,7 +374,7 @@ func (w *mboxWorker) handleMessage(msg types.WorkerMessage) error {
}
func (w *mboxWorker) Run() {
- for msg := range w.worker.Actions {
+ for msg := range w.worker.Actions() {
msg = w.worker.ProcessAction(msg)
if err := w.handleMessage(msg); errors.Is(err, errUnsupported) {
w.worker.PostMessage(&types.Unsupported{
diff --git a/worker/notmuch/worker.go b/worker/notmuch/worker.go
index b3f4013e..f47f3889 100644
--- a/worker/notmuch/worker.go
+++ b/worker/notmuch/worker.go
@@ -58,7 +58,7 @@ func NewWorker(w *types.Worker) (types.Backend, error) {
events := make(chan eventType, 20)
watcher, err := handlers.NewWatcher()
if err != nil {
- return nil, fmt.Errorf("(%s) could not create file system watcher: %w", w.Name, err)
+ return nil, fmt.Errorf("could not create file system watcher: %w", err)
}
return &worker{
w: w,
@@ -75,7 +75,7 @@ func NewWorker(w *types.Worker) (types.Backend, error) {
func (w *worker) Run() {
for {
select {
- case action := <-w.w.Actions:
+ case action := <-w.w.Actions():
msg := w.w.ProcessAction(action)
err := w.handleMessage(msg)
switch {
@@ -759,7 +759,7 @@ func (w *worker) sort(uids []uint32,
func (w *worker) handleCheckMail(msg *types.CheckMail) {
defer log.PanicHandler()
if msg.Command == "" {
- w.err(msg, fmt.Errorf("(%s) checkmail: no command specified", w.w.Name))
+ w.err(msg, fmt.Errorf("(%s) checkmail: no command specified", msg.Account()))
return
}
ctx, cancel := context.WithTimeout(context.Background(), msg.Timeout)
@@ -768,9 +768,9 @@ func (w *worker) handleCheckMail(msg *types.CheckMail) {
err := cmd.Run()
switch {
case ctx.Err() != nil:
- w.err(msg, fmt.Errorf("(%s) checkmail: timed out", w.w.Name))
+ w.err(msg, fmt.Errorf("(%s) checkmail: timed out", msg.Account()))
case err != nil:
- w.err(msg, fmt.Errorf("(%s) checkmail: error running command: %w", w.w.Name, err))
+ w.err(msg, fmt.Errorf("(%s) checkmail: error running command: %w", msg.Account(), err))
default:
w.done(msg)
}
diff --git a/worker/types/worker.go b/worker/types/worker.go
index e263679a..663ac067 100644
--- a/worker/types/worker.go
+++ b/worker/types/worker.go
@@ -9,6 +9,14 @@ import (
"git.sr.ht/~rjarry/aerc/models"
)
+type WorkerInteractor interface {
+ log.Logger
+ Actions() chan WorkerMessage
+ ProcessAction(WorkerMessage) WorkerMessage
+ PostAction(WorkerMessage, func(msg WorkerMessage))
+ PostMessage(WorkerMessage, func(msg WorkerMessage))
+}
+
var lastId int64 = 1 // access via atomic
type Backend interface {
@@ -19,29 +27,33 @@ type Backend interface {
type Worker struct {
Backend Backend
- Actions chan WorkerMessage
- Name string
- logger log.Logger
+ actions chan WorkerMessage
actionCallbacks map[int64]func(msg WorkerMessage)
messageCallbacks map[int64]func(msg WorkerMessage)
actionQueue *list.List
status int32
+ name string
sync.Mutex
+ log.Logger
}
func NewWorker(name string) *Worker {
return &Worker{
- Actions: make(chan WorkerMessage),
- Name: name,
+ Logger: log.NewLogger(name, 2),
+ actions: make(chan WorkerMessage),
actionCallbacks: make(map[int64]func(msg WorkerMessage)),
messageCallbacks: make(map[int64]func(msg WorkerMessage)),
actionQueue: list.New(),
- logger: log.NewLogger(name, 3),
+ name: name,
}
}
+func (worker *Worker) Actions() chan WorkerMessage {
+ return worker.actions
+}
+
func (worker *Worker) setId(msg WorkerMessage) {
id := atomic.AddInt64(&lastId, 1)
msg.setId(id)
@@ -64,7 +76,7 @@ func (worker *Worker) queue(msg WorkerMessage) {
}
}
-// Start processing the action queue and write all messages to the Actions
+// Start processing the action queue and write all messages to the actions
// channel, one by one. Stop when the action queue is empty.
func (worker *Worker) processQueue() {
defer log.PanicHandler()
@@ -78,7 +90,7 @@ func (worker *Worker) processQueue() {
}
msg := worker.actionQueue.Remove(e).(WorkerMessage)
worker.Unlock()
- worker.Actions <- msg
+ worker.actions <- msg
}
}
@@ -86,7 +98,7 @@ func (worker *Worker) processQueue() {
// from the same goroutine that the worker runs in or deadlocks may occur
func (worker *Worker) PostAction(msg WorkerMessage, cb func(msg WorkerMessage)) {
worker.setId(msg)
- // write to Actions channel without blocking
+ // write to actions channel without blocking
worker.queue(msg)
if cb != nil {
@@ -104,7 +116,7 @@ func (worker *Worker) PostMessage(msg WorkerMessage,
cb func(msg WorkerMessage),
) {
worker.setId(msg)
- msg.setAccount(worker.Name)
+ msg.setAccount(worker.name)
WorkerMessages <- msg
@@ -167,23 +179,3 @@ func (worker *Worker) PostMessageInfoError(msg WorkerMessage, uid uint32, err er
func (worker *Worker) PathSeparator() string {
return worker.Backend.PathSeparator()
}
-
-func (worker *Worker) Tracef(message string, args ...interface{}) {
- worker.logger.Tracef(message, args...)
-}
-
-func (worker *Worker) Debugf(message string, args ...interface{}) {
- worker.logger.Debugf(message, args...)
-}
-
-func (worker *Worker) Infof(message string, args ...interface{}) {
- worker.logger.Infof(message, args...)
-}
-
-func (worker *Worker) Warnf(message string, args ...interface{}) {
- worker.logger.Warnf(message, args...)
-}
-
-func (worker *Worker) Errorf(message string, args ...interface{}) {
- worker.logger.Errorf(message, args...)
-}