diff options
Diffstat (limited to 'worker/maildir/worker.go')
-rw-r--r-- | worker/maildir/worker.go | 92 |
1 files changed, 54 insertions, 38 deletions
diff --git a/worker/maildir/worker.go b/worker/maildir/worker.go index e1d825f3..5f0c70c7 100644 --- a/worker/maildir/worker.go +++ b/worker/maildir/worker.go @@ -129,6 +129,10 @@ func (w *Worker) handleAction(action types.WorkerMessage) { w.worker.PostMessage(&types.Unsupported{ Message: types.RespondTo(msg), }, nil) + case errors.Is(err, context.Canceled): + w.worker.PostMessage(&types.Cancelled{ + Message: types.RespondTo(msg), + }, nil) case err != nil: w.worker.PostMessage(&types.Error{ Message: types.RespondTo(msg), @@ -444,7 +448,7 @@ func (w *Worker) handleFetchDirectoryContents( if err != nil { return err } - uids, err = w.search(filter) + uids, err = w.search(msg.Context, filter) if err != nil { return err } @@ -455,7 +459,7 @@ func (w *Worker) handleFetchDirectoryContents( return err } } - sortedUids, err := w.sort(uids, msg.SortCriteria) + sortedUids, err := w.sort(msg.Context, uids, msg.SortCriteria) if err != nil { w.worker.Errorf("failed sorting directory: %v", err) return err @@ -468,7 +472,7 @@ func (w *Worker) handleFetchDirectoryContents( return nil } -func (w *Worker) sort(uids []uint32, criteria []*types.SortCriterion) ([]uint32, error) { +func (w *Worker) sort(ctx context.Context, uids []uint32, criteria []*types.SortCriterion) ([]uint32, error) { if len(criteria) == 0 { // At least sort by uid, parallel searching can create random // order @@ -484,22 +488,27 @@ func (w *Worker) sort(uids []uint32, criteria []*types.SortCriterion) ([]uint32, max := runtime.NumCPU() * 2 limit := make(chan struct{}, max) for _, uid := range uids { - limit <- struct{}{} - wg.Add(1) - go func(uid uint32) { - defer log.PanicHandler() - defer wg.Done() - info, err := w.msgHeadersFromUid(uid) - if err != nil { - w.worker.Errorf("could not get message info: %v", err) + select { + case <-ctx.Done(): + return nil, context.Canceled + default: + limit <- struct{}{} + wg.Add(1) + go func(uid uint32) { + defer log.PanicHandler() + defer wg.Done() + info, err := w.msgHeadersFromUid(uid) + if err != nil { + w.worker.Errorf("could not get message info: %v", err) + <-limit + return + } + mu.Lock() + msgInfos = append(msgInfos, info) + mu.Unlock() <-limit - return - } - mu.Lock() - msgInfos = append(msgInfos, info) - mu.Unlock() - <-limit - }(uid) + }(uid) + } } wg.Wait() @@ -523,7 +532,7 @@ func (w *Worker) handleFetchDirectoryThreaded( if err != nil { return err } - uids, err = w.search(filter) + uids, err = w.search(msg.Context, filter) if err != nil { return err } @@ -534,7 +543,7 @@ func (w *Worker) handleFetchDirectoryThreaded( return err } } - threads, err := w.threads(uids, msg.SortCriteria) + threads, err := w.threads(msg.Context, uids, msg.SortCriteria) if err != nil { w.worker.Errorf("failed sorting directory: %v", err) return err @@ -547,7 +556,9 @@ func (w *Worker) handleFetchDirectoryThreaded( return nil } -func (w *Worker) threads(uids []uint32, criteria []*types.SortCriterion) ([]*types.Thread, error) { +func (w *Worker) threads(ctx context.Context, uids []uint32, + criteria []*types.SortCriterion, +) ([]*types.Thread, error) { builder := aercLib.NewThreadBuilder(iterator.NewFactory(false)) msgInfos := make([]*models.MessageInfo, 0, len(uids)) mu := sync.Mutex{} @@ -555,23 +566,28 @@ func (w *Worker) threads(uids []uint32, criteria []*types.SortCriterion) ([]*typ max := runtime.NumCPU() * 2 limit := make(chan struct{}, max) for _, uid := range uids { - limit <- struct{}{} - wg.Add(1) - go func(uid uint32) { - defer log.PanicHandler() - defer wg.Done() - info, err := w.msgHeadersFromUid(uid) - if err != nil { - w.worker.Errorf("could not get message info: %v", err) + select { + case <-ctx.Done(): + return nil, context.Canceled + default: + limit <- struct{}{} + wg.Add(1) + go func(uid uint32) { + defer log.PanicHandler() + defer wg.Done() + info, err := w.msgHeadersFromUid(uid) + if err != nil { + w.worker.Errorf("could not get message info: %v", err) + <-limit + return + } + mu.Lock() + builder.Update(info) + msgInfos = append(msgInfos, info) + mu.Unlock() <-limit - return - } - mu.Lock() - builder.Update(info) - msgInfos = append(msgInfos, info) - mu.Unlock() - <-limit - }(uid) + }(uid) + } } wg.Wait() var err error @@ -833,7 +849,7 @@ func (w *Worker) handleSearchDirectory(msg *types.SearchDirectory) error { return err } w.worker.Tracef("Searching with parsed criteria: %#v", criteria) - uids, err := w.search(criteria) + uids, err := w.search(msg.Context, criteria) if err != nil { return err } |