From 2ee7c7f00e0d9895681b6a523e6f92eb93122095 Mon Sep 17 00:00:00 2001 From: Tim Culverhouse Date: Wed, 30 Aug 2023 16:17:57 -0500 Subject: notmuch: track database state Track the notmuch database state. When a state change is detected, query if any changes affect the current query then send updates accordingly. Signed-off-by: Tim Culverhouse Tested-by: Inwit Acked-by: Robin Jarry --- worker/notmuch/eventhandlers.go | 59 ++++++++++++++++++++++++++++-------- worker/notmuch/events.go | 8 ----- worker/notmuch/lib/database.go | 7 +++++ worker/notmuch/worker.go | 66 ++++++++++++++--------------------------- 4 files changed, 77 insertions(+), 63 deletions(-) delete mode 100644 worker/notmuch/events.go (limited to 'worker/notmuch') diff --git a/worker/notmuch/eventhandlers.go b/worker/notmuch/eventhandlers.go index d4d75628..a8a396c9 100644 --- a/worker/notmuch/eventhandlers.go +++ b/worker/notmuch/eventhandlers.go @@ -4,28 +4,34 @@ package notmuch import ( + "context" "fmt" "path/filepath" "strconv" + "git.sr.ht/~rjarry/aerc/log" "git.sr.ht/~rjarry/aerc/worker/types" ) -func (w *worker) handleNotmuchEvent(et eventType) error { - switch et.(type) { - case *updateDirCounts: - return w.handleUpdateDirCounts() - default: - return errUnsupported - } -} - -func (w *worker) handleUpdateDirCounts() error { +func (w *worker) handleNotmuchEvent() error { err := w.db.Connect() if err != nil { return err } defer w.db.Close() + err = w.updateDirCounts() + if err != nil { + return err + } + err = w.updateChangedMessages() + if err != nil { + return err + } + w.emitLabelList() + return nil +} + +func (w *worker) updateDirCounts() error { if w.store != nil { folders, err := w.store.FolderMap() if err != nil { @@ -36,15 +42,44 @@ func (w *worker) handleUpdateDirCounts() error { folder := filepath.Join(w.maildirAccountPath, name) query := fmt.Sprintf("folder:%s", strconv.Quote(folder)) w.w.PostMessage(&types.DirectoryInfo{ - Info: w.getDirectoryInfo(name, query), + Info: w.getDirectoryInfo(name, query), + Refetch: w.query == query, }, nil) } } for name, query := range w.nameQueryMap { w.w.PostMessage(&types.DirectoryInfo{ - Info: w.getDirectoryInfo(name, query), + Info: w.getDirectoryInfo(name, query), + Refetch: w.query == query, }, nil) } + + return nil +} + +func (w *worker) updateChangedMessages() error { + newState := w.db.State() + if newState == w.state { + return nil + } + w.w.Logger.Debugf("State change: %d to %d", w.state, newState) + query := fmt.Sprintf("%s lastmod:%d..%d", w.query, w.state, newState) + uids, err := w.uidsFromQuery(context.TODO(), query) + if err != nil { + return fmt.Errorf("Couldn't get updates messages: %w", err) + } + for _, uid := range uids { + m, err := w.msgFromUid(uid) + if err != nil { + log.Errorf("%s", err) + continue + } + err = w.emitMessageInfo(m, nil) + if err != nil { + log.Errorf("%s", err) + } + } + w.state = newState return nil } diff --git a/worker/notmuch/events.go b/worker/notmuch/events.go deleted file mode 100644 index 94b27fd7..00000000 --- a/worker/notmuch/events.go +++ /dev/null @@ -1,8 +0,0 @@ -//go:build notmuch -// +build notmuch - -package notmuch - -type eventType interface{} - -type updateDirCounts struct{} diff --git a/worker/notmuch/lib/database.go b/worker/notmuch/lib/database.go index 501f39de..3b9de011 100644 --- a/worker/notmuch/lib/database.go +++ b/worker/notmuch/lib/database.go @@ -52,6 +52,13 @@ func (db *DB) ListTags() []string { return db.db.Tags() } +// State returns the lastmod of the database. This is a uin64 which is +// incremented with every modification +func (db *DB) State() uint64 { + _, lastmod := db.db.Revision() + return lastmod +} + // getQuery returns a query based on the provided query string. // It also configures the query as specified on the worker func (db *DB) newQuery(query string) (*notmuch.Query, error) { diff --git a/worker/notmuch/worker.go b/worker/notmuch/worker.go index 9ba76d90..aa0d1135 100644 --- a/worker/notmuch/worker.go +++ b/worker/notmuch/worker.go @@ -37,7 +37,7 @@ var errUnsupported = fmt.Errorf("unsupported command") type worker struct { w *types.Worker - nmEvents chan eventType + nmStateChange chan bool query string currentQueryName string queryMapOrder []string @@ -52,19 +52,20 @@ type worker struct { capabilities *models.Capabilities headers []string headersExclude []string + state uint64 } // NewWorker creates a new notmuch worker with the provided worker. func NewWorker(w *types.Worker) (types.Backend, error) { - events := make(chan eventType, 20) + events := make(chan bool, 20) watcher, err := watchers.NewWatcher() if err != nil { return nil, fmt.Errorf("could not create file system watcher: %w", err) } return &worker{ - w: w, - nmEvents: events, - watcher: watcher, + w: w, + nmStateChange: events, + watcher: watcher, capabilities: &models.Capabilities{ Sort: true, Thread: true, @@ -96,8 +97,8 @@ func (w *worker) Run() { }, nil) w.w.Errorf("ProcessAction(%T) failure: %v", msg, err) } - case nmEvent := <-w.nmEvents: - err := w.handleNotmuchEvent(nmEvent) + case <-w.nmStateChange: + err := w.handleNotmuchEvent() if err != nil { w.w.Errorf("notmuch event failure: %v", err) } @@ -108,7 +109,7 @@ func (w *worker) Run() { // Debounce FS changes w.watcherDebounce = time.AfterFunc(50*time.Millisecond, func() { defer log.PanicHandler() - w.nmEvents <- &updateDirCounts{} + w.nmStateChange <- true }) } } @@ -245,6 +246,8 @@ func (w *worker) handleConfigure(msg *types.Configure) error { func (w *worker) handleConnect(msg *types.Connect) error { w.done(msg) w.emitLabelList() + // Get initial db state + w.state = w.db.State() // Watch all the files in the xapian folder for changes. We'll debounce // changes, so catching multiple is ok dbPath := path.Join(w.db.Path(), ".notmuch", "xapian") @@ -282,7 +285,7 @@ func (w *worker) handleListDirectories(msg *types.ListDirectories) error { }, nil) } // Update dir counts when listing directories - err := w.handleUpdateDirCounts() + err := w.updateDirCounts() if err != nil { return err } @@ -503,16 +506,7 @@ func (w *worker) handleAnsweredMessages(msg *types.AnsweredMessages) error { w.err(msg, err) continue } - err = w.emitMessageInfo(m, msg) - if err != nil { - w.w.Errorf("could not emit message info: %v", err) - w.err(msg, err) - continue - } } - w.w.PostMessage(&types.DirectoryInfo{ - Info: w.getDirectoryInfo(w.currentQueryName, w.query), - }, nil) w.done(msg) return nil } @@ -531,16 +525,7 @@ func (w *worker) handleFlagMessages(msg *types.FlagMessages) error { w.err(msg, err) continue } - err = w.emitMessageInfo(m, msg) - if err != nil { - w.w.Errorf("could not emit message info: %v", err) - w.err(msg, err) - continue - } } - w.w.PostMessage(&types.DirectoryInfo{ - Info: w.getDirectoryInfo(w.currentQueryName, w.query), - }, nil) w.done(msg) return nil } @@ -580,19 +565,7 @@ func (w *worker) handleModifyLabels(msg *types.ModifyLabels) error { if err != nil { return fmt.Errorf("could not modify message tags: %w", err) } - err = w.emitMessageInfo(m, msg) - if err != nil { - return err - } } - // tags changed, most probably some messages shifted to other folders - // so we need to re-enumerate the query content and update the list of - // possible tags - w.emitLabelList() - w.w.PostMessage(&types.DirectoryInfo{ - Info: w.getDirectoryInfo(w.currentQueryName, w.query), - Refetch: true, - }, nil) w.done(msg) return nil } @@ -709,10 +682,17 @@ func (w *worker) emitMessageInfo(m *Message, case len(w.headers) > 0: lib.LimitHeaders(info.RFC822Headers, w.headers, false) } - w.w.PostMessage(&types.MessageInfo{ - Message: types.RespondTo(parent), - Info: info, - }, nil) + switch parent { + case nil: + w.w.PostMessage(&types.MessageInfo{ + Info: info, + }, nil) + default: + w.w.PostMessage(&types.MessageInfo{ + Message: types.RespondTo(parent), + Info: info, + }, nil) + } return nil } -- cgit