diff options
Diffstat (limited to 'worker')
-rw-r--r-- | worker/types/worker.go | 43 |
1 files changed, 41 insertions, 2 deletions
diff --git a/worker/types/worker.go b/worker/types/worker.go index ee2f9a32..b5f51496 100644 --- a/worker/types/worker.go +++ b/worker/types/worker.go @@ -1,6 +1,7 @@ package types import ( + "container/list" "sync" "sync/atomic" @@ -21,16 +22,19 @@ type Worker struct { actionCallbacks map[int64]func(msg WorkerMessage) messageCallbacks map[int64]func(msg WorkerMessage) + actionQueue *list.List + status int32 sync.Mutex } func NewWorker() *Worker { return &Worker{ - Actions: make(chan WorkerMessage, 50), + Actions: make(chan WorkerMessage), Messages: make(chan WorkerMessage, 50), actionCallbacks: make(map[int64]func(msg WorkerMessage)), messageCallbacks: make(map[int64]func(msg WorkerMessage)), + actionQueue: list.New(), } } @@ -39,6 +43,40 @@ func (worker *Worker) setId(msg WorkerMessage) { msg.setId(id) } +const ( + idle int32 = iota + busy +) + +// Add a new task to the action queue without blocking. Start processing the +// queue in the background if needed. +func (worker *Worker) queue(msg WorkerMessage) { + worker.Lock() + defer worker.Unlock() + worker.actionQueue.PushBack(msg) + if atomic.LoadInt32(&worker.status) == idle { + atomic.StoreInt32(&worker.status, busy) + go worker.processQueue() + } +} + +// Start processing the action queue and write all messages to the Actions +// channel, one by one. Stop when the action queue is empty. +func (worker *Worker) processQueue() { + for { + worker.Lock() + e := worker.actionQueue.Front() + if e == nil { + atomic.StoreInt32(&worker.status, idle) + worker.Unlock() + return + } + msg := worker.actionQueue.Remove(e).(WorkerMessage) + worker.Unlock() + worker.Actions <- msg + } +} + // PostAction posts an action to the worker. This method should not be called // from the same goroutine that the worker runs in or deadlocks may occur func (worker *Worker) PostAction(msg WorkerMessage, cb func(msg WorkerMessage)) { @@ -49,7 +87,8 @@ func (worker *Worker) PostAction(msg WorkerMessage, cb func(msg WorkerMessage)) } else { logging.Debugf("PostAction %T", msg) } - worker.Actions <- msg + // write to Actions channel without blocking + worker.queue(msg) if cb != nil { worker.Lock() |