diff options
author | Tim Culverhouse <tim@timculverhouse.com> | 2023-06-20 11:07:09 -0500 |
---|---|---|
committer | Robin Jarry <robin@jarry.cc> | 2023-06-20 23:16:25 +0200 |
commit | 01d139a7d6dffed87a0e1e5c3ed31f8a5bee6725 (patch) | |
tree | e1dfcdf72eb0ad457eb6369945e33b2353c55ed2 /worker | |
parent | 0662bca73efab7919031d4a89acb1f525c3d8466 (diff) | |
download | aerc-01d139a7d6dffed87a0e1e5c3ed31f8a5bee6725.tar.gz |
maildir: implement cancellation of requests
Implement cancellation of cancellable requests. These include listing of
directory contents, searching, and sorting.
Signed-off-by: Tim Culverhouse <tim@timculverhouse.com>
Tested-by: Bence Ferdinandy <bence@ferdinandy.com>
Acked-by: Robin Jarry <robin@jarry.cc>
Diffstat (limited to 'worker')
-rw-r--r-- | worker/maildir/search.go | 41 | ||||
-rw-r--r-- | worker/maildir/worker.go | 92 |
2 files changed, 78 insertions, 55 deletions
diff --git a/worker/maildir/search.go b/worker/maildir/search.go index 911e6a3d..8286de67 100644 --- a/worker/maildir/search.go +++ b/worker/maildir/search.go @@ -1,6 +1,7 @@ package maildir import ( + "context" "io" "net/textproto" "runtime" @@ -107,7 +108,7 @@ func getParsedFlag(name string) maildir.Flag { return f } -func (w *Worker) search(criteria *searchCriteria) ([]uint32, error) { +func (w *Worker) search(ctx context.Context, criteria *searchCriteria) ([]uint32, error) { requiredParts := getRequiredParts(criteria) w.worker.Debugf("Required parts bitmask for search: %b", requiredParts) @@ -123,22 +124,28 @@ func (w *Worker) search(criteria *searchCriteria) ([]uint32, error) { max := runtime.NumCPU() * 2 limit := make(chan struct{}, max) for _, key := range keys { - limit <- struct{}{} - wg.Add(1) - go func(key uint32) { - defer log.PanicHandler() - defer wg.Done() - success, err := w.searchKey(key, criteria, requiredParts) - if err != nil { - // don't return early so that we can still get some results - w.worker.Errorf("Failed to search key %d: %v", key, err) - } else if success { - mu.Lock() - matchedUids = append(matchedUids, key) - mu.Unlock() - } - <-limit - }(key) + select { + case <-ctx.Done(): + return nil, context.Canceled + default: + limit <- struct{}{} + wg.Add(1) + go func(key uint32) { + defer log.PanicHandler() + defer wg.Done() + success, err := w.searchKey(key, criteria, requiredParts) + if err != nil { + // don't return early so that we can still get some results + w.worker.Errorf("Failed to search key %d: %v", key, err) + } else if success { + mu.Lock() + matchedUids = append(matchedUids, key) + mu.Unlock() + } + <-limit + }(key) + + } } wg.Wait() return matchedUids, nil diff --git a/worker/maildir/worker.go b/worker/maildir/worker.go index e1d825f3..5f0c70c7 100644 --- a/worker/maildir/worker.go +++ b/worker/maildir/worker.go @@ -129,6 +129,10 @@ func (w *Worker) handleAction(action types.WorkerMessage) { w.worker.PostMessage(&types.Unsupported{ Message: types.RespondTo(msg), }, nil) + case errors.Is(err, context.Canceled): + w.worker.PostMessage(&types.Cancelled{ + Message: types.RespondTo(msg), + }, nil) case err != nil: w.worker.PostMessage(&types.Error{ Message: types.RespondTo(msg), @@ -444,7 +448,7 @@ func (w *Worker) handleFetchDirectoryContents( if err != nil { return err } - uids, err = w.search(filter) + uids, err = w.search(msg.Context, filter) if err != nil { return err } @@ -455,7 +459,7 @@ func (w *Worker) handleFetchDirectoryContents( return err } } - sortedUids, err := w.sort(uids, msg.SortCriteria) + sortedUids, err := w.sort(msg.Context, uids, msg.SortCriteria) if err != nil { w.worker.Errorf("failed sorting directory: %v", err) return err @@ -468,7 +472,7 @@ func (w *Worker) handleFetchDirectoryContents( return nil } -func (w *Worker) sort(uids []uint32, criteria []*types.SortCriterion) ([]uint32, error) { +func (w *Worker) sort(ctx context.Context, uids []uint32, criteria []*types.SortCriterion) ([]uint32, error) { if len(criteria) == 0 { // At least sort by uid, parallel searching can create random // order @@ -484,22 +488,27 @@ func (w *Worker) sort(uids []uint32, criteria []*types.SortCriterion) ([]uint32, max := runtime.NumCPU() * 2 limit := make(chan struct{}, max) for _, uid := range uids { - limit <- struct{}{} - wg.Add(1) - go func(uid uint32) { - defer log.PanicHandler() - defer wg.Done() - info, err := w.msgHeadersFromUid(uid) - if err != nil { - w.worker.Errorf("could not get message info: %v", err) + select { + case <-ctx.Done(): + return nil, context.Canceled + default: + limit <- struct{}{} + wg.Add(1) + go func(uid uint32) { + defer log.PanicHandler() + defer wg.Done() + info, err := w.msgHeadersFromUid(uid) + if err != nil { + w.worker.Errorf("could not get message info: %v", err) + <-limit + return + } + mu.Lock() + msgInfos = append(msgInfos, info) + mu.Unlock() <-limit - return - } - mu.Lock() - msgInfos = append(msgInfos, info) - mu.Unlock() - <-limit - }(uid) + }(uid) + } } wg.Wait() @@ -523,7 +532,7 @@ func (w *Worker) handleFetchDirectoryThreaded( if err != nil { return err } - uids, err = w.search(filter) + uids, err = w.search(msg.Context, filter) if err != nil { return err } @@ -534,7 +543,7 @@ func (w *Worker) handleFetchDirectoryThreaded( return err } } - threads, err := w.threads(uids, msg.SortCriteria) + threads, err := w.threads(msg.Context, uids, msg.SortCriteria) if err != nil { w.worker.Errorf("failed sorting directory: %v", err) return err @@ -547,7 +556,9 @@ func (w *Worker) handleFetchDirectoryThreaded( return nil } -func (w *Worker) threads(uids []uint32, criteria []*types.SortCriterion) ([]*types.Thread, error) { +func (w *Worker) threads(ctx context.Context, uids []uint32, + criteria []*types.SortCriterion, +) ([]*types.Thread, error) { builder := aercLib.NewThreadBuilder(iterator.NewFactory(false)) msgInfos := make([]*models.MessageInfo, 0, len(uids)) mu := sync.Mutex{} @@ -555,23 +566,28 @@ func (w *Worker) threads(uids []uint32, criteria []*types.SortCriterion) ([]*typ max := runtime.NumCPU() * 2 limit := make(chan struct{}, max) for _, uid := range uids { - limit <- struct{}{} - wg.Add(1) - go func(uid uint32) { - defer log.PanicHandler() - defer wg.Done() - info, err := w.msgHeadersFromUid(uid) - if err != nil { - w.worker.Errorf("could not get message info: %v", err) + select { + case <-ctx.Done(): + return nil, context.Canceled + default: + limit <- struct{}{} + wg.Add(1) + go func(uid uint32) { + defer log.PanicHandler() + defer wg.Done() + info, err := w.msgHeadersFromUid(uid) + if err != nil { + w.worker.Errorf("could not get message info: %v", err) + <-limit + return + } + mu.Lock() + builder.Update(info) + msgInfos = append(msgInfos, info) + mu.Unlock() <-limit - return - } - mu.Lock() - builder.Update(info) - msgInfos = append(msgInfos, info) - mu.Unlock() - <-limit - }(uid) + }(uid) + } } wg.Wait() var err error @@ -833,7 +849,7 @@ func (w *Worker) handleSearchDirectory(msg *types.SearchDirectory) error { return err } w.worker.Tracef("Searching with parsed criteria: %#v", criteria) - uids, err := w.search(criteria) + uids, err := w.search(msg.Context, criteria) if err != nil { return err } |