aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTim Culverhouse <tim@timculverhouse.com>2022-09-27 22:43:53 -0500
committerRobin Jarry <robin@jarry.cc>2022-10-04 09:18:30 +0200
commit4c371170c57d39399a924bd4d27d67226a9a7f39 (patch)
tree8b08b9cfc1b8406656d005d2761ae56df961a130
parent1c2dd4c9f15ddc9cba0849c4077525bd6d64cd6a (diff)
downloadaerc-4c371170c57d39399a924bd4d27d67226a9a7f39.tar.gz
worker: use container/list as job queue
The worker uses a buffered channel to queue tasks. Buffered channels are effective at FIFO, but are prone to blocking. The design of aerc is such that the UI must always accept a response from the backends, and the backends must always accept a request from the UI. By using buffered channels for both of these communication channels, a deadlock will occur. Break the chain by using a doubly linked list (container/list from the standard library) to queue tasks for the worker. Essentially, this is an infinitely buffered channel - but more memory efficient as it can change size dynamically. Signed-off-by: Tim Culverhouse <tim@timculverhouse.com> Signed-off-by: Robin Jarry <robin@jarry.cc>
-rw-r--r--worker/types/worker.go43
1 files changed, 41 insertions, 2 deletions
diff --git a/worker/types/worker.go b/worker/types/worker.go
index ee2f9a32..b5f51496 100644
--- a/worker/types/worker.go
+++ b/worker/types/worker.go
@@ -1,6 +1,7 @@
package types
import (
+ "container/list"
"sync"
"sync/atomic"
@@ -21,16 +22,19 @@ type Worker struct {
actionCallbacks map[int64]func(msg WorkerMessage)
messageCallbacks map[int64]func(msg WorkerMessage)
+ actionQueue *list.List
+ status int32
sync.Mutex
}
func NewWorker() *Worker {
return &Worker{
- Actions: make(chan WorkerMessage, 50),
+ Actions: make(chan WorkerMessage),
Messages: make(chan WorkerMessage, 50),
actionCallbacks: make(map[int64]func(msg WorkerMessage)),
messageCallbacks: make(map[int64]func(msg WorkerMessage)),
+ actionQueue: list.New(),
}
}
@@ -39,6 +43,40 @@ func (worker *Worker) setId(msg WorkerMessage) {
msg.setId(id)
}
+const (
+ idle int32 = iota
+ busy
+)
+
+// Add a new task to the action queue without blocking. Start processing the
+// queue in the background if needed.
+func (worker *Worker) queue(msg WorkerMessage) {
+ worker.Lock()
+ defer worker.Unlock()
+ worker.actionQueue.PushBack(msg)
+ if atomic.LoadInt32(&worker.status) == idle {
+ atomic.StoreInt32(&worker.status, busy)
+ go worker.processQueue()
+ }
+}
+
+// Start processing the action queue and write all messages to the Actions
+// channel, one by one. Stop when the action queue is empty.
+func (worker *Worker) processQueue() {
+ for {
+ worker.Lock()
+ e := worker.actionQueue.Front()
+ if e == nil {
+ atomic.StoreInt32(&worker.status, idle)
+ worker.Unlock()
+ return
+ }
+ msg := worker.actionQueue.Remove(e).(WorkerMessage)
+ worker.Unlock()
+ worker.Actions <- msg
+ }
+}
+
// PostAction posts an action to the worker. This method should not be called
// from the same goroutine that the worker runs in or deadlocks may occur
func (worker *Worker) PostAction(msg WorkerMessage, cb func(msg WorkerMessage)) {
@@ -49,7 +87,8 @@ func (worker *Worker) PostAction(msg WorkerMessage, cb func(msg WorkerMessage))
} else {
logging.Debugf("PostAction %T", msg)
}
- worker.Actions <- msg
+ // write to Actions channel without blocking
+ worker.queue(msg)
if cb != nil {
worker.Lock()