aboutsummaryrefslogtreecommitdiffstats
path: root/worker
diff options
context:
space:
mode:
authorTim Culverhouse <tim@timculverhouse.com>2023-06-20 11:07:09 -0500
committerRobin Jarry <robin@jarry.cc>2023-06-20 23:16:25 +0200
commit01d139a7d6dffed87a0e1e5c3ed31f8a5bee6725 (patch)
treee1dfcdf72eb0ad457eb6369945e33b2353c55ed2 /worker
parent0662bca73efab7919031d4a89acb1f525c3d8466 (diff)
downloadaerc-01d139a7d6dffed87a0e1e5c3ed31f8a5bee6725.tar.gz
maildir: implement cancellation of requests
Implement cancellation of cancellable requests. These include listing of directory contents, searching, and sorting. Signed-off-by: Tim Culverhouse <tim@timculverhouse.com> Tested-by: Bence Ferdinandy <bence@ferdinandy.com> Acked-by: Robin Jarry <robin@jarry.cc>
Diffstat (limited to 'worker')
-rw-r--r--worker/maildir/search.go41
-rw-r--r--worker/maildir/worker.go92
2 files changed, 78 insertions, 55 deletions
diff --git a/worker/maildir/search.go b/worker/maildir/search.go
index 911e6a3d..8286de67 100644
--- a/worker/maildir/search.go
+++ b/worker/maildir/search.go
@@ -1,6 +1,7 @@
package maildir
import (
+ "context"
"io"
"net/textproto"
"runtime"
@@ -107,7 +108,7 @@ func getParsedFlag(name string) maildir.Flag {
return f
}
-func (w *Worker) search(criteria *searchCriteria) ([]uint32, error) {
+func (w *Worker) search(ctx context.Context, criteria *searchCriteria) ([]uint32, error) {
requiredParts := getRequiredParts(criteria)
w.worker.Debugf("Required parts bitmask for search: %b", requiredParts)
@@ -123,22 +124,28 @@ func (w *Worker) search(criteria *searchCriteria) ([]uint32, error) {
max := runtime.NumCPU() * 2
limit := make(chan struct{}, max)
for _, key := range keys {
- limit <- struct{}{}
- wg.Add(1)
- go func(key uint32) {
- defer log.PanicHandler()
- defer wg.Done()
- success, err := w.searchKey(key, criteria, requiredParts)
- if err != nil {
- // don't return early so that we can still get some results
- w.worker.Errorf("Failed to search key %d: %v", key, err)
- } else if success {
- mu.Lock()
- matchedUids = append(matchedUids, key)
- mu.Unlock()
- }
- <-limit
- }(key)
+ select {
+ case <-ctx.Done():
+ return nil, context.Canceled
+ default:
+ limit <- struct{}{}
+ wg.Add(1)
+ go func(key uint32) {
+ defer log.PanicHandler()
+ defer wg.Done()
+ success, err := w.searchKey(key, criteria, requiredParts)
+ if err != nil {
+ // don't return early so that we can still get some results
+ w.worker.Errorf("Failed to search key %d: %v", key, err)
+ } else if success {
+ mu.Lock()
+ matchedUids = append(matchedUids, key)
+ mu.Unlock()
+ }
+ <-limit
+ }(key)
+
+ }
}
wg.Wait()
return matchedUids, nil
diff --git a/worker/maildir/worker.go b/worker/maildir/worker.go
index e1d825f3..5f0c70c7 100644
--- a/worker/maildir/worker.go
+++ b/worker/maildir/worker.go
@@ -129,6 +129,10 @@ func (w *Worker) handleAction(action types.WorkerMessage) {
w.worker.PostMessage(&types.Unsupported{
Message: types.RespondTo(msg),
}, nil)
+ case errors.Is(err, context.Canceled):
+ w.worker.PostMessage(&types.Cancelled{
+ Message: types.RespondTo(msg),
+ }, nil)
case err != nil:
w.worker.PostMessage(&types.Error{
Message: types.RespondTo(msg),
@@ -444,7 +448,7 @@ func (w *Worker) handleFetchDirectoryContents(
if err != nil {
return err
}
- uids, err = w.search(filter)
+ uids, err = w.search(msg.Context, filter)
if err != nil {
return err
}
@@ -455,7 +459,7 @@ func (w *Worker) handleFetchDirectoryContents(
return err
}
}
- sortedUids, err := w.sort(uids, msg.SortCriteria)
+ sortedUids, err := w.sort(msg.Context, uids, msg.SortCriteria)
if err != nil {
w.worker.Errorf("failed sorting directory: %v", err)
return err
@@ -468,7 +472,7 @@ func (w *Worker) handleFetchDirectoryContents(
return nil
}
-func (w *Worker) sort(uids []uint32, criteria []*types.SortCriterion) ([]uint32, error) {
+func (w *Worker) sort(ctx context.Context, uids []uint32, criteria []*types.SortCriterion) ([]uint32, error) {
if len(criteria) == 0 {
// At least sort by uid, parallel searching can create random
// order
@@ -484,22 +488,27 @@ func (w *Worker) sort(uids []uint32, criteria []*types.SortCriterion) ([]uint32,
max := runtime.NumCPU() * 2
limit := make(chan struct{}, max)
for _, uid := range uids {
- limit <- struct{}{}
- wg.Add(1)
- go func(uid uint32) {
- defer log.PanicHandler()
- defer wg.Done()
- info, err := w.msgHeadersFromUid(uid)
- if err != nil {
- w.worker.Errorf("could not get message info: %v", err)
+ select {
+ case <-ctx.Done():
+ return nil, context.Canceled
+ default:
+ limit <- struct{}{}
+ wg.Add(1)
+ go func(uid uint32) {
+ defer log.PanicHandler()
+ defer wg.Done()
+ info, err := w.msgHeadersFromUid(uid)
+ if err != nil {
+ w.worker.Errorf("could not get message info: %v", err)
+ <-limit
+ return
+ }
+ mu.Lock()
+ msgInfos = append(msgInfos, info)
+ mu.Unlock()
<-limit
- return
- }
- mu.Lock()
- msgInfos = append(msgInfos, info)
- mu.Unlock()
- <-limit
- }(uid)
+ }(uid)
+ }
}
wg.Wait()
@@ -523,7 +532,7 @@ func (w *Worker) handleFetchDirectoryThreaded(
if err != nil {
return err
}
- uids, err = w.search(filter)
+ uids, err = w.search(msg.Context, filter)
if err != nil {
return err
}
@@ -534,7 +543,7 @@ func (w *Worker) handleFetchDirectoryThreaded(
return err
}
}
- threads, err := w.threads(uids, msg.SortCriteria)
+ threads, err := w.threads(msg.Context, uids, msg.SortCriteria)
if err != nil {
w.worker.Errorf("failed sorting directory: %v", err)
return err
@@ -547,7 +556,9 @@ func (w *Worker) handleFetchDirectoryThreaded(
return nil
}
-func (w *Worker) threads(uids []uint32, criteria []*types.SortCriterion) ([]*types.Thread, error) {
+func (w *Worker) threads(ctx context.Context, uids []uint32,
+ criteria []*types.SortCriterion,
+) ([]*types.Thread, error) {
builder := aercLib.NewThreadBuilder(iterator.NewFactory(false))
msgInfos := make([]*models.MessageInfo, 0, len(uids))
mu := sync.Mutex{}
@@ -555,23 +566,28 @@ func (w *Worker) threads(uids []uint32, criteria []*types.SortCriterion) ([]*typ
max := runtime.NumCPU() * 2
limit := make(chan struct{}, max)
for _, uid := range uids {
- limit <- struct{}{}
- wg.Add(1)
- go func(uid uint32) {
- defer log.PanicHandler()
- defer wg.Done()
- info, err := w.msgHeadersFromUid(uid)
- if err != nil {
- w.worker.Errorf("could not get message info: %v", err)
+ select {
+ case <-ctx.Done():
+ return nil, context.Canceled
+ default:
+ limit <- struct{}{}
+ wg.Add(1)
+ go func(uid uint32) {
+ defer log.PanicHandler()
+ defer wg.Done()
+ info, err := w.msgHeadersFromUid(uid)
+ if err != nil {
+ w.worker.Errorf("could not get message info: %v", err)
+ <-limit
+ return
+ }
+ mu.Lock()
+ builder.Update(info)
+ msgInfos = append(msgInfos, info)
+ mu.Unlock()
<-limit
- return
- }
- mu.Lock()
- builder.Update(info)
- msgInfos = append(msgInfos, info)
- mu.Unlock()
- <-limit
- }(uid)
+ }(uid)
+ }
}
wg.Wait()
var err error
@@ -833,7 +849,7 @@ func (w *Worker) handleSearchDirectory(msg *types.SearchDirectory) error {
return err
}
w.worker.Tracef("Searching with parsed criteria: %#v", criteria)
- uids, err := w.search(criteria)
+ uids, err := w.search(msg.Context, criteria)
if err != nil {
return err
}