diff options
-rw-r--r-- | worker/types/worker.go | 56 | ||||
-rw-r--r-- | worker/worker.go | 7 |
2 files changed, 42 insertions, 21 deletions
diff --git a/worker/types/worker.go b/worker/types/worker.go index 1ff16a4c..0ed216a2 100644 --- a/worker/types/worker.go +++ b/worker/types/worker.go @@ -2,6 +2,7 @@ package types import ( "log" + "sync" ) type Backend interface { @@ -9,11 +10,42 @@ type Backend interface { } type Worker struct { - Actions chan WorkerMessage - Backend Backend - Callbacks map[WorkerMessage]func(msg WorkerMessage) - Messages chan WorkerMessage - Logger *log.Logger + Backend Backend + Actions chan WorkerMessage + Messages chan WorkerMessage + Logger *log.Logger + + callbacks map[WorkerMessage]func(msg WorkerMessage) // protected by mutex + mutex sync.Mutex +} + +func NewWorker(logger *log.Logger) *Worker { + return &Worker{ + Actions: make(chan WorkerMessage, 50), + Messages: make(chan WorkerMessage, 50), + Logger: logger, + callbacks: make(map[WorkerMessage]func(msg WorkerMessage)), + } +} + +func (worker *Worker) setCallback(msg WorkerMessage, + cb func(msg WorkerMessage)) { + + if cb != nil { + worker.mutex.Lock() + worker.callbacks[msg] = cb + worker.mutex.Unlock() + } +} + +func (worker *Worker) getCallback(msg WorkerMessage) (func(msg WorkerMessage), + bool) { + + worker.mutex.Lock() + cb, ok := worker.callbacks[msg] + worker.mutex.Unlock() + + return cb, ok } func (worker *Worker) PostAction(msg WorkerMessage, @@ -26,9 +58,7 @@ func (worker *Worker) PostAction(msg WorkerMessage, } worker.Actions <- msg - if cb != nil { - worker.Callbacks[msg] = cb - } + worker.setCallback(msg, cb) } func (worker *Worker) PostMessage(msg WorkerMessage, @@ -41,9 +71,7 @@ func (worker *Worker) PostMessage(msg WorkerMessage, } worker.Messages <- msg - if cb != nil { - worker.Callbacks[msg] = cb - } + worker.setCallback(msg, cb) } func (worker *Worker) ProcessMessage(msg WorkerMessage) WorkerMessage { @@ -52,9 +80,8 @@ func (worker *Worker) ProcessMessage(msg WorkerMessage) WorkerMessage { } else { worker.Logger.Printf("(ui)<= %T\n", msg) } - if cb, ok := worker.Callbacks[msg.InResponseTo()]; ok { + if cb, ok := worker.getCallback(msg.InResponseTo()); ok { cb(msg) - delete(worker.Callbacks, msg) } return msg } @@ -65,9 +92,8 @@ func (worker *Worker) ProcessAction(msg WorkerMessage) WorkerMessage { } else { worker.Logger.Printf("<-(ui) %T\n", msg) } - if cb, ok := worker.Callbacks[msg.InResponseTo()]; ok { + if cb, ok := worker.getCallback(msg.InResponseTo()); ok { cb(msg) - delete(worker.Callbacks, msg) } return msg } diff --git a/worker/worker.go b/worker/worker.go index 959c7df1..29bb5603 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -15,12 +15,7 @@ func NewWorker(source string, logger *log.Logger) (*types.Worker, error) { if err != nil { return nil, err } - worker := &types.Worker{ - Actions: make(chan types.WorkerMessage, 50), - Callbacks: make(map[types.WorkerMessage]func(msg types.WorkerMessage)), - Messages: make(chan types.WorkerMessage, 50), - Logger: logger, - } + worker := types.NewWorker(logger) switch u.Scheme { case "imap": fallthrough |