aboutsummaryrefslogtreecommitdiffstats
path: root/worker
diff options
context:
space:
mode:
Diffstat (limited to 'worker')
-rw-r--r--worker/types/worker.go43
1 files changed, 41 insertions, 2 deletions
diff --git a/worker/types/worker.go b/worker/types/worker.go
index ee2f9a32..b5f51496 100644
--- a/worker/types/worker.go
+++ b/worker/types/worker.go
@@ -1,6 +1,7 @@
package types
import (
+ "container/list"
"sync"
"sync/atomic"
@@ -21,16 +22,19 @@ type Worker struct {
actionCallbacks map[int64]func(msg WorkerMessage)
messageCallbacks map[int64]func(msg WorkerMessage)
+ actionQueue *list.List
+ status int32
sync.Mutex
}
func NewWorker() *Worker {
return &Worker{
- Actions: make(chan WorkerMessage, 50),
+ Actions: make(chan WorkerMessage),
Messages: make(chan WorkerMessage, 50),
actionCallbacks: make(map[int64]func(msg WorkerMessage)),
messageCallbacks: make(map[int64]func(msg WorkerMessage)),
+ actionQueue: list.New(),
}
}
@@ -39,6 +43,40 @@ func (worker *Worker) setId(msg WorkerMessage) {
msg.setId(id)
}
+const (
+ idle int32 = iota
+ busy
+)
+
+// Add a new task to the action queue without blocking. Start processing the
+// queue in the background if needed.
+func (worker *Worker) queue(msg WorkerMessage) {
+ worker.Lock()
+ defer worker.Unlock()
+ worker.actionQueue.PushBack(msg)
+ if atomic.LoadInt32(&worker.status) == idle {
+ atomic.StoreInt32(&worker.status, busy)
+ go worker.processQueue()
+ }
+}
+
+// Start processing the action queue and write all messages to the Actions
+// channel, one by one. Stop when the action queue is empty.
+func (worker *Worker) processQueue() {
+ for {
+ worker.Lock()
+ e := worker.actionQueue.Front()
+ if e == nil {
+ atomic.StoreInt32(&worker.status, idle)
+ worker.Unlock()
+ return
+ }
+ msg := worker.actionQueue.Remove(e).(WorkerMessage)
+ worker.Unlock()
+ worker.Actions <- msg
+ }
+}
+
// PostAction posts an action to the worker. This method should not be called
// from the same goroutine that the worker runs in or deadlocks may occur
func (worker *Worker) PostAction(msg WorkerMessage, cb func(msg WorkerMessage)) {
@@ -49,7 +87,8 @@ func (worker *Worker) PostAction(msg WorkerMessage, cb func(msg WorkerMessage))
} else {
logging.Debugf("PostAction %T", msg)
}
- worker.Actions <- msg
+ // write to Actions channel without blocking
+ worker.queue(msg)
if cb != nil {
worker.Lock()