From 86c612e48025104b32b2048fead1248a3d3b1c8f Mon Sep 17 00:00:00 2001 From: Tim Culverhouse Date: Mon, 7 Nov 2022 10:15:50 -0600 Subject: maildir: enable backend threads Add thread capability to maildir backend. Enables the maildir worker to thread the entire folder, as opposed to only the displayed messages via client side threads. Signed-off-by: Tim Culverhouse Tested-by: Inwit Acked-by: Robin Jarry --- worker/maildir/worker.go | 86 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 85 insertions(+), 1 deletion(-) (limited to 'worker/maildir') diff --git a/worker/maildir/worker.go b/worker/maildir/worker.go index fe4425e2..8b89c0d2 100644 --- a/worker/maildir/worker.go +++ b/worker/maildir/worker.go @@ -18,6 +18,8 @@ import ( "github.com/emersion/go-maildir" "github.com/fsnotify/fsnotify" + aercLib "git.sr.ht/~rjarry/aerc/lib" + "git.sr.ht/~rjarry/aerc/lib/iterator" "git.sr.ht/~rjarry/aerc/logging" "git.sr.ht/~rjarry/aerc/models" "git.sr.ht/~rjarry/aerc/worker/handlers" @@ -180,7 +182,7 @@ func (w *Worker) getDirectoryInfo(name string) *models.DirectoryInfo { Caps: &models.Capabilities{ Sort: true, - Thread: false, + Thread: true, }, } @@ -265,6 +267,8 @@ func (w *Worker) handleMessage(msg types.WorkerMessage) error { return w.handleOpenDirectory(msg) case *types.FetchDirectoryContents: return w.handleFetchDirectoryContents(msg) + case *types.FetchDirectoryThreaded: + return w.handleFetchDirectoryThreaded(msg) case *types.CreateDirectory: return w.handleCreateDirectory(msg) case *types.RemoveDirectory: @@ -459,6 +463,7 @@ func (w *Worker) sort(uids []uint32, criteria []*types.SortCriterion) ([]uint32, <-limit }(uid) } + wg.Wait() sortedUids, err := lib.Sort(msgInfos, criteria) if err != nil { @@ -468,6 +473,85 @@ func (w *Worker) sort(uids []uint32, criteria []*types.SortCriterion) ([]uint32, return sortedUids, nil } +func (w *Worker) handleFetchDirectoryThreaded( + msg *types.FetchDirectoryThreaded, +) error { + var ( + uids []uint32 + err error + ) + if len(msg.FilterCriteria) > 1 { + filter, err := parseSearch(msg.FilterCriteria) + if err != nil { + return err + } + uids, err = w.search(filter) + if err != nil { + return err + } + } else { + uids, err = w.c.UIDs(*w.selected) + if err != nil { + logging.Errorf("failed scanning uids: %v", err) + return err + } + } + threads, err := w.threads(uids, msg.SortCriteria) + if err != nil { + logging.Errorf("failed sorting directory: %v", err) + return err + } + w.currentSortCriteria = msg.SortCriteria + w.worker.PostMessage(&types.DirectoryThreaded{ + Message: types.RespondTo(msg), + Threads: threads, + }, nil) + return nil +} + +func (w *Worker) threads(uids []uint32, criteria []*types.SortCriterion) ([]*types.Thread, error) { + builder := aercLib.NewThreadBuilder(iterator.NewFactory(false)) + msgInfos := make([]*models.MessageInfo, 0, len(uids)) + mu := sync.Mutex{} + wg := sync.WaitGroup{} + max := runtime.NumCPU() * 2 + limit := make(chan struct{}, max) + for _, uid := range uids { + limit <- struct{}{} + wg.Add(1) + go func(uid uint32) { + defer wg.Done() + info, err := w.msgHeadersFromUid(uid) + if err != nil { + logging.Errorf("could not get message info: %v", err) + <-limit + return + } + mu.Lock() + builder.Update(info) + msgInfos = append(msgInfos, info) + mu.Unlock() + <-limit + }(uid) + } + wg.Wait() + var err error + switch { + case len(criteria) == 0: + sort.Slice(uids, func(i int, j int) bool { + return uids[i] < uids[j] + }) + default: + uids, err = lib.Sort(msgInfos, criteria) + if err != nil { + logging.Errorf("could not sort the messages: %v", err) + return nil, err + } + } + threads := builder.Threads(uids, false, false) + return threads, nil +} + func (w *Worker) handleCreateDirectory(msg *types.CreateDirectory) error { dir := w.c.Store.Dir(msg.Directory) if err := dir.Init(); err != nil { -- cgit