diff options
author | Tim Culverhouse <tim@timculverhouse.com> | 2022-09-27 22:43:53 -0500 |
---|---|---|
committer | Robin Jarry <robin@jarry.cc> | 2022-10-04 09:18:30 +0200 |
commit | 4c371170c57d39399a924bd4d27d67226a9a7f39 (patch) | |
tree | 8b08b9cfc1b8406656d005d2761ae56df961a130 | |
parent | 1c2dd4c9f15ddc9cba0849c4077525bd6d64cd6a (diff) | |
download | aerc-4c371170c57d39399a924bd4d27d67226a9a7f39.tar.gz |
worker: use container/list as job queue
The worker uses a buffered channel to queue tasks. Buffered channels
are effective at FIFO, but are prone to blocking. The design of aerc is
such that the UI must always accept a response from the backends, and
the backends must always accept a request from the UI. By using buffered
channels for both of these communication channels, a deadlock will
occur.
Break the chain by using a doubly linked list (container/list from the
standard library) to queue tasks for the worker. Essentially, this is an
infinitely buffered channel - but more memory efficient as it can change
size dynamically.
Signed-off-by: Tim Culverhouse <tim@timculverhouse.com>
Signed-off-by: Robin Jarry <robin@jarry.cc>
-rw-r--r-- | worker/types/worker.go | 43 |
1 files changed, 41 insertions, 2 deletions
diff --git a/worker/types/worker.go b/worker/types/worker.go index ee2f9a32..b5f51496 100644 --- a/worker/types/worker.go +++ b/worker/types/worker.go @@ -1,6 +1,7 @@ package types import ( + "container/list" "sync" "sync/atomic" @@ -21,16 +22,19 @@ type Worker struct { actionCallbacks map[int64]func(msg WorkerMessage) messageCallbacks map[int64]func(msg WorkerMessage) + actionQueue *list.List + status int32 sync.Mutex } func NewWorker() *Worker { return &Worker{ - Actions: make(chan WorkerMessage, 50), + Actions: make(chan WorkerMessage), Messages: make(chan WorkerMessage, 50), actionCallbacks: make(map[int64]func(msg WorkerMessage)), messageCallbacks: make(map[int64]func(msg WorkerMessage)), + actionQueue: list.New(), } } @@ -39,6 +43,40 @@ func (worker *Worker) setId(msg WorkerMessage) { msg.setId(id) } +const ( + idle int32 = iota + busy +) + +// Add a new task to the action queue without blocking. Start processing the +// queue in the background if needed. +func (worker *Worker) queue(msg WorkerMessage) { + worker.Lock() + defer worker.Unlock() + worker.actionQueue.PushBack(msg) + if atomic.LoadInt32(&worker.status) == idle { + atomic.StoreInt32(&worker.status, busy) + go worker.processQueue() + } +} + +// Start processing the action queue and write all messages to the Actions +// channel, one by one. Stop when the action queue is empty. +func (worker *Worker) processQueue() { + for { + worker.Lock() + e := worker.actionQueue.Front() + if e == nil { + atomic.StoreInt32(&worker.status, idle) + worker.Unlock() + return + } + msg := worker.actionQueue.Remove(e).(WorkerMessage) + worker.Unlock() + worker.Actions <- msg + } +} + // PostAction posts an action to the worker. This method should not be called // from the same goroutine that the worker runs in or deadlocks may occur func (worker *Worker) PostAction(msg WorkerMessage, cb func(msg WorkerMessage)) { @@ -49,7 +87,8 @@ func (worker *Worker) PostAction(msg WorkerMessage, cb func(msg WorkerMessage)) } else { logging.Debugf("PostAction %T", msg) } - worker.Actions <- msg + // write to Actions channel without blocking + worker.queue(msg) if cb != nil { worker.Lock() |