package maildir import ( "bytes" "context" "errors" "fmt" "io" "net/url" "os" "os/exec" "path/filepath" "runtime" "sort" "strings" "sync" "time" "github.com/emersion/go-maildir" aercLib "git.sr.ht/~rjarry/aerc/lib" "git.sr.ht/~rjarry/aerc/lib/iterator" "git.sr.ht/~rjarry/aerc/log" "git.sr.ht/~rjarry/aerc/models" "git.sr.ht/~rjarry/aerc/worker/handlers" "git.sr.ht/~rjarry/aerc/worker/lib" "git.sr.ht/~rjarry/aerc/worker/types" ) func init() { handlers.RegisterWorkerFactory("maildir", NewWorker) handlers.RegisterWorkerFactory("maildirpp", NewMaildirppWorker) } var errUnsupported = fmt.Errorf("unsupported command") // A Worker handles interfacing between aerc's UI and a group of maildirs. type Worker struct { c *Container selected *maildir.Dir selectedName string selectedInfo *models.DirectoryInfo worker *types.Worker watcher types.FSWatcher watcherDebounce *time.Timer fsEvents chan struct{} currentSortCriteria []*types.SortCriterion maildirpp bool // whether to use Maildir++ directory layout capabilities *models.Capabilities headers []string headersExclude []string } // NewWorker creates a new maildir worker with the provided worker. func NewWorker(worker *types.Worker) (types.Backend, error) { watch, err := handlers.NewWatcher() if err != nil { return nil, fmt.Errorf("could not create file system watcher: %w", err) } return &Worker{ capabilities: &models.Capabilities{ Sort: true, Thread: true, }, worker: worker, watcher: watch, fsEvents: make(chan struct{}), }, nil } // NewMaildirppWorker creates a new Maildir++ worker with the provided worker. func NewMaildirppWorker(worker *types.Worker) (types.Backend, error) { watch, err := handlers.NewWatcher() if err != nil { return nil, fmt.Errorf("could not create file system watcher: %w", err) } return &Worker{ capabilities: &models.Capabilities{ Sort: true, Thread: true, }, worker: worker, watcher: watch, maildirpp: true, }, nil } // Run starts the worker's message handling loop. func (w *Worker) Run() { for { select { case action := <-w.worker.Actions: w.handleAction(action) case <-w.watcher.Events(): if w.watcherDebounce != nil { w.watcherDebounce.Stop() } // Debounce FS changes w.watcherDebounce = time.AfterFunc(50*time.Millisecond, func() { defer log.PanicHandler() w.fsEvents <- struct{}{} }) case <-w.fsEvents: w.handleFSEvent() } } } func (w *Worker) Capabilities() *models.Capabilities { return w.capabilities } func (w *Worker) PathSeparator() string { return string(os.PathSeparator) } func (w *Worker) handleAction(action types.WorkerMessage) { msg := w.worker.ProcessAction(action) switch msg := msg.(type) { // Explicitly handle all asynchronous actions. Async actions are // responsible for posting their own Done message case *types.CheckMail: go w.handleCheckMail(msg) default: // Default handling, will be performed synchronously err := w.handleMessage(msg) switch { case errors.Is(err, errUnsupported): w.worker.PostMessage(&types.Unsupported{ Message: types.RespondTo(msg), }, nil) case err != nil: w.worker.PostMessage(&types.Error{ Message: types.RespondTo(msg), Error: err, }, nil) default: w.done(msg) } } } func (w *Worker) handleFSEvent() { // if there's not a selected directory to rescan, ignore if w.selected == nil { return } err := w.c.SyncNewMail(*w.selected) if err != nil { log.Errorf("could not move new to cur : %v", err) return } w.selectedInfo = w.getDirectoryInfo(w.selectedName) w.worker.PostMessage(&types.DirectoryInfo{ Info: w.selectedInfo, Refetch: true, }, nil) } func (w *Worker) done(msg types.WorkerMessage) { w.worker.PostMessage(&types.Done{Message: types.RespondTo(msg)}, nil) } func (w *Worker) err(msg types.WorkerMessage, err error) { w.worker.PostMessage(&types.Error{ Message: types.RespondTo(msg), Error: err, }, nil) } func splitMaildirFile(name string) (uniq string, flags []maildir.Flag, err error) { i := strings.LastIndexByte(name, ':') if i < 0 { return "", nil, &maildir.MailfileError{Name: name} } info := name[i+1:] uniq = name[:i] if len(info) < 2 { return "", nil, &maildir.FlagError{Info: info, Experimental: false} } if info[1] != ',' || info[0] != '2' { return "", nil, &maildir.FlagError{Info: info, Experimental: false} } if info[0] == '1' { return "", nil, &maildir.FlagError{Info: info, Experimental: true} } flags = []maildir.Flag(info[2:]) sort.Slice(flags, func(i, j int) bool { return info[i] < info[j] }) return uniq, flags, nil } func dirFiles(name string) ([]string, error) { dir, err := os.Open(filepath.Join(name, "cur")) if err != nil { return nil, err } defer dir.Close() return dir.Readdirnames(-1) } func (w *Worker) getDirectoryInfo(name string) *models.DirectoryInfo { dirInfo := &models.DirectoryInfo{ Name: name, // total messages Exists: 0, // new messages since mailbox was last opened Recent: 0, // total unread Unseen: 0, } dir := w.c.Store.Dir(name) var keyFlags map[string][]maildir.Flag files, err := dirFiles(string(dir)) if err == nil { keyFlags = make(map[string][]maildir.Flag, len(files)) for _, v := range files { key, flags, err := splitMaildirFile(v) if err != nil { log.Errorf("%q: error parsing flags (%q): %v", v, key, err) continue } keyFlags[key] = flags } } else { log.Tracef("disabled flags cache: %q: %v", dir, err) } uids, err := w.c.UIDs(dir) if err != nil { log.Errorf("could not get uids: %v", err) return dirInfo } dirInfo.Exists = len(uids) for _, uid := range uids { message, err := w.c.Message(dir, uid) if err != nil { log.Errorf("could not get message: %v", err) continue } var flags []maildir.Flag if keyFlags != nil { ok := false flags, ok = keyFlags[message.key] if !ok { log.Tracef("message (key=%q uid=%d) not found in map cache", message.key, message.uid) flags, err = message.Flags() if err != nil { log.Errorf("could not get flags: %v", err) continue } } } else { flags, err = message.Flags() if err != nil { log.Errorf("could not get flags: %v", err) continue } } seen := false for _, flag := range flags { if flag == maildir.FlagSeen { seen = true break } } if !seen { dirInfo.Unseen++ } if w.c.IsRecent(uid) { dirInfo.Recent++ } } return dirInfo } func (w *Worker) handleMessage(msg types.WorkerMessage) error { switch msg := msg.(type) { case *types.Unsupported: // No-op case *types.Configure: return w.handleConfigure(msg) case *types.Connect: return w.handleConnect(msg) case *types.ListDirectories: return w.handleListDirectories(msg) case *types.OpenDirectory: return w.handleOpenDirectory(msg) case *types.FetchDirectoryContents: return w.handleFetchDirectoryContents(msg) case *types.FetchDirectoryThreaded: return w.handleFetchDirectoryThreaded(msg) case *types.CreateDirectory: return w.handleCreateDirectory(msg) case *types.RemoveDirectory: return w.handleRemoveDirectory(msg) case *types.FetchMessageHeaders: return w.handleFetchMessageHeaders(msg) case *types.FetchMessageBodyPart: return w.handleFetchMessageBodyPart(msg) case *types.FetchFullMessages: return w.handleFetchFullMessages(msg) case *types.DeleteMessages: return w.handleDeleteMessages(msg) case *types.FlagMessages: return w.handleFlagMessages(msg) case *types.AnsweredMessages: return w.handleAnsweredMessages(msg) case *types.CopyMessages: return w.handleCopyMessages(msg) case *types.MoveMessages: return w.handleMoveMessages(msg) case *types.AppendMessage: return w.handleAppendMessage(msg) case *types.SearchDirectory: return w.handleSearchDirectory(msg) } return errUnsupported } func (w *Worker) handleConfigure(msg *types.Configure) error { u, err := url.Parse(msg.Config.Source) if err != nil { log.Errorf("error configuring maildir worker: %v", err) return err } dir := u.Path if u.Host == "~" { home, err := os.UserHomeDir() if err != nil { return fmt.Errorf("could not resolve home directory: %w", err) } dir = filepath.Join(home, u.Path) } if len(dir) == 0 { return fmt.Errorf("could not resolve maildir from URL '%s'", msg.Config.Source) } c, err := NewContainer(dir, w.maildirpp) if err != nil { log.Errorf("could not configure maildir: %s", dir) return err } w.c = c err = w.watcher.Configure(dir) if err != nil { return err } w.headers = msg.Config.Headers w.headersExclude = msg.Config.HeadersExclude log.Debugf("configured base maildir: %s", dir) return nil } func (w *Worker) handleConnect(msg *types.Connect) error { return nil } func (w *Worker) handleListDirectories(msg *types.ListDirectories) error { // TODO If handleConfigure has returned error, w.c is nil. // It could be better if we skip directory listing completely // when configure fails. if w.c == nil { return errors.New("Incorrect maildir directory") } dirs, err := w.c.Store.FolderMap() if err != nil { log.Errorf("failed listing directories: %v", err) return err } for name := range dirs { w.worker.PostMessage(&types.Directory{ Message: types.RespondTo(msg), Dir: &models.Directory{ Name: name, }, }, nil) w.worker.PostMessage(&types.DirectoryInfo{ Info: w.getDirectoryInfo(name), }, nil) } return nil } func (w *Worker) handleOpenDirectory(msg *types.OpenDirectory) error { log.Debugf("opening %s", msg.Directory) // open the directory dir, err := w.c.OpenDirectory(msg.Directory) if err != nil { return err } // remove existing watch paths if w.selected != nil { prevDir := filepath.Join(string(*w.selected), "new") if err := w.watcher.Remove(prevDir); err != nil { return fmt.Errorf("could not unwatch previous directory: %w", err) } prevDir = filepath.Join(string(*w.selected), "cur") if err := w.watcher.Remove(prevDir); err != nil { return fmt.Errorf("could not unwatch previous directory: %w", err) } } w.selected = &dir w.selectedName = msg.Directory // add watch paths newDir := filepath.Join(string(*w.selected), "new") if err := w.watcher.Add(newDir); err != nil { return fmt.Errorf("could not add watch to directory: %w", err) } newDir = filepath.Join(string(*w.selected), "cur") if err := w.watcher.Add(newDir); err != nil { return fmt.Errorf("could not add watch to directory: %w", err) } if err := dir.Clean(); err != nil { return fmt.Errorf("could not clean directory: %w", err) } info := &types.DirectoryInfo{ Info: w.getDirectoryInfo(msg.Directory), } w.selectedInfo = info.Info w.worker.PostMessage(info, nil) return nil } func (w *Worker) handleFetchDirectoryContents( msg *types.FetchDirectoryContents, ) error { var ( uids []uint32 err error ) // FilterCriteria always contains "filter" as first item if len(msg.FilterCriteria) > 1 { filter, err := parseSearch(msg.FilterCriteria) if err != nil { return err } uids, err = w.search(filter) if err != nil { return err } } else { uids, err = w.c.UIDs(*w.selected) if err != nil { log.Errorf("failed scanning uids: %v", err) return err } } sortedUids, err := w.sort(uids, msg.SortCriteria) if err != nil { log.Errorf("failed sorting directory: %v", err) return err } w.currentSortCriteria = msg.SortCriteria w.worker.PostMessage(&types.DirectoryContents{ Message: types.RespondTo(msg), Uids: sortedUids, }, nil) return nil } func (w *Worker) sort(uids []uint32, criteria []*types.SortCriterion) ([]uint32, error) { if len(criteria) == 0 { // At least sort by uid, parallel searching can create random // order sort.Slice(uids, func(i int, j int) bool { return uids[i] < uids[j] }) return uids, nil } var msgInfos []*models.MessageInfo mu := sync.Mutex{} wg := sync.WaitGroup{} // Hard limit at 2x CPU cores max := runtime.NumCPU() * 2 limit := make(chan struct{}, max) for _, uid := range uids { limit <- struct{}{} wg.Add(1) go func(uid uint32) { defer log.PanicHandler() defer wg.Done() info, err := w.msgHeadersFromUid(uid) if err != nil { log.Errorf("could not get message info: %v", err) <-limit return } mu.Lock() msgInfos = append(msgInfos, info) mu.Unlock() <-limit }(uid) } wg.Wait() sortedUids, err := lib.Sort(msgInfos, criteria) if err != nil { log.Errorf("could not sort the messages: %v", err) return nil, err } return sortedUids, nil } func (w *Worker) handleFetchDirectoryThreaded( msg *types.FetchDirectoryThreaded, ) error { var ( uids []uint32 err error ) if len(msg.FilterCriteria) > 1 { filter, err := parseSearch(msg.FilterCriteria) if err != nil { return err } uids, err = w.search(filter) if err != nil { return err } } else { uids, err = w.c.UIDs(*w.selected) if err != nil { log.Errorf("failed scanning uids: %v", err) return err } } threads, err := w.threads(uids, msg.SortCriteria) if err != nil { log.Errorf("failed sorting directory: %v", err) return err } w.currentSortCriteria = msg.SortCriteria w.worker.PostMessage(&types.DirectoryThreaded{ Message: types.RespondTo(msg), Threads: threads, }, nil) return nil } func (w *Worker) threads(uids []uint32, criteria []*types.SortCriterion) ([]*types.Thread, error) { builder := aercLib.NewThreadBuilder(iterator.NewFactory(false)) msgInfos := make([]*models.MessageInfo, 0, len(uids)) mu := sync.Mutex{} wg := sync.WaitGroup{} max := runtime.NumCPU() * 2 limit := make(chan struct{}, max) for _, uid := range uids { limit <- struct{}{} wg.Add(1) go func(uid uint32) { defer log.PanicHandler() defer wg.Done() info, err := w.msgHeadersFromUid(uid) if err != nil { log.Errorf("could not get message info: %v", err) <-limit return } mu.Lock() builder.Update(info) msgInfos = append(msgInfos, info) mu.Unlock() <-limit }(uid) } wg.Wait() var err error switch { case len(criteria) == 0: sort.Slice(uids, func(i int, j int) bool { return uids[i] < uids[j] }) default: uids, err = lib.Sort(msgInfos, criteria) if err != nil { log.Errorf("could not sort the messages: %v", err) return nil, err } } threads := builder.Threads(uids, false, false) return threads, nil } func (w *Worker) handleCreateDirectory(msg *types.CreateDirectory) error { dir := w.c.Store.Dir(msg.Directory) if err := dir.Init(); err != nil { log.Errorf("could not create directory %s: %v", msg.Directory, err) return err } return nil } func (w *Worker) handleRemoveDirectory(msg *types.RemoveDirectory) error { dir := w.c.Store.Dir(msg.Directory) if err := os.RemoveAll(string(dir)); err != nil { log.Errorf("could not remove directory %s: %v", msg.Directory, err) return err } return nil } func (w *Worker) handleFetchMessageHeaders( msg *types.FetchMessageHeaders, ) error { for _, uid := range msg.Uids { info, err := w.msgInfoFromUid(uid) if err != nil { log.Errorf("could not get message info: %v", err) w.worker.PostMessageInfoError(msg, uid, err) continue } switch { case len(w.headersExclude) > 0: lib.LimitHeaders(info.RFC822Headers, w.headersExclude, true) case len(w.headers) > 0: lib.LimitHeaders(info.RFC822Headers, w.headers, false) } w.worker.PostMessage(&types.MessageInfo{ Message: types.RespondTo(msg), Info: info, }, nil) w.c.ClearRecentFlag(uid) } return nil } func (w *Worker) handleFetchMessageBodyPart( msg *types.FetchMessageBodyPart, ) error { // get reader m, err := w.c.Message(*w.selected, msg.Uid) if err != nil { log.Errorf("could not get message %d: %v", msg.Uid, err) return err } r, err := m.NewBodyPartReader(msg.Part) if err != nil { log.Errorf( "could not get body part reader for message=%d, parts=%#v: %w", msg.Uid, msg.Part, err) return err } w.worker.PostMessage(&types.MessageBodyPart{ Message: types.RespondTo(msg), Part: &models.MessageBodyPart{ Reader: r, Uid: msg.Uid, }, }, nil) return nil } func (w *Worker) handleFetchFullMessages(msg *types.FetchFullMessages) error { for _, uid := range msg.Uids { m, err := w.c.Message(*w.selected, uid) if err != nil { log.Errorf("could not get message %d: %v", uid, err) return err } r, err := m.NewReader() if err != nil { log.Errorf("could not get message reader: %v", err) return err } defer r.Close() b, err := io.ReadAll(r) if err != nil { return err } w.worker.PostMessage(&types.FullMessage{ Message: types.RespondTo(msg), Content: &models.FullMessage{ Uid: uid, Reader: bytes.NewReader(b), }, }, nil) } w.worker.PostMessage(&types.Done{ Message: types.RespondTo(msg), }, nil) return nil } func (w *Worker) handleDeleteMessages(msg *types.DeleteMessages) error { deleted, err := w.c.DeleteAll(*w.selected, msg.Uids) if len(deleted) > 0 { w.worker.PostMessage(&types.MessagesDeleted{ Message: types.RespondTo(msg), Uids: deleted, }, nil) } if err != nil { log.Errorf("failed removing messages: %v", err) return err } return nil } func (w *Worker) handleAnsweredMessages(msg *types.AnsweredMessages) error { for _, uid := range msg.Uids { m, err := w.c.Message(*w.selected, uid) if err != nil { log.Errorf("could not get message: %v", err) w.err(msg, err) continue } if err := m.MarkReplied(msg.Answered); err != nil { log.Errorf("could not mark message as answered: %v", err) w.err(msg, err) continue } info, err := m.MessageInfo() if err != nil { log.Errorf("could not get message info: %v", err) w.err(msg, err) continue } w.worker.PostMessage(&types.MessageInfo{ Message: types.RespondTo(msg), Info: info, }, nil) w.worker.PostMessage(&types.DirectoryInfo{ Info: w.getDirectoryInfo(w.selectedName), }, nil) } return nil } func (w *Worker) handleFlagMessages(msg *types.FlagMessages) error { for _, uid := range msg.Uids { m, err := w.c.Message(*w.selected, uid) if err != nil { log.Errorf("could not get message: %v", err) w.err(msg, err) continue } flag := lib.FlagToMaildir[msg.Flags] if err := m.SetOneFlag(flag, msg.Enable); err != nil { log.Errorf("could change flag %v to %v on message: %v", flag, msg.Enable, err) w.err(msg, err) continue } info, err := m.MessageInfo() if err != nil { log.Errorf("could not get message info: %v", err) w.err(msg, err) continue } w.worker.PostMessage(&types.MessageInfo{ Message: types.RespondTo(msg), Info: info, }, nil) } w.worker.PostMessage(&types.DirectoryInfo{ Info: w.getDirectoryInfo(w.selectedName), }, nil) return nil } func (w *Worker) handleCopyMessages(msg *types.CopyMessages) error { dest := w.c.Store.Dir(msg.Destination) err := w.c.CopyAll(dest, *w.selected, msg.Uids) if err != nil { return err } w.worker.PostMessage(&types.MessagesCopied{ Message: types.RespondTo(msg), Destination: msg.Destination, Uids: msg.Uids, }, nil) return nil } func (w *Worker) handleMoveMessages(msg *types.MoveMessages) error { dest := w.c.Store.Dir(msg.Destination) moved, err := w.c.MoveAll(dest, *w.selected, msg.Uids) w.worker.PostMessage(&types.MessagesMoved{ Message: types.RespondTo(msg), Destination: msg.Destination, Uids: moved, }, nil) w.worker.PostMessage(&types.MessagesDeleted{ Message: types.RespondTo(msg), Uids: moved, }, nil) return err } func (w *Worker) handleAppendMessage(msg *types.AppendMessage) error { // since we are the "master" maildir process, we can modify the maildir directly dest := w.c.Store.Dir(msg.Destination) _, writer, err := dest.Create(lib.ToMaildirFlags(msg.Flags)) if err != nil { return fmt.Errorf("could not create message at %s: %w", msg.Destination, err) } defer writer.Close() if _, err := io.Copy(writer, msg.Reader); err != nil { return fmt.Errorf( "could not write message to destination: %w", err) } w.worker.PostMessage(&types.Done{ Message: types.RespondTo(msg), }, nil) w.worker.PostMessage(&types.DirectoryInfo{ Info: w.getDirectoryInfo(msg.Destination), }, nil) return nil } func (w *Worker) handleSearchDirectory(msg *types.SearchDirectory) error { log.Debugf("Searching directory %v with args: %v", *w.selected, msg.Argv) criteria, err := parseSearch(msg.Argv) if err != nil { return err } log.Tracef("Searching with parsed criteria: %#v", criteria) uids, err := w.search(criteria) if err != nil { return err } w.worker.PostMessage(&types.SearchResults{ Message: types.RespondTo(msg), Uids: uids, }, nil) return nil } func (w *Worker) msgInfoFromUid(uid uint32) (*models.MessageInfo, error) { m, err := w.c.Message(*w.selected, uid) if err != nil { return nil, err } info, err := m.MessageInfo() if err != nil { return nil, err } if w.c.IsRecent(uid) { info.Flags |= models.RecentFlag } return info, nil } func (w *Worker) msgHeadersFromUid(uid uint32) (*models.MessageInfo, error) { m, err := w.c.Message(*w.selected, uid) if err != nil { return nil, err } info, err := m.MessageHeaders() if err != nil { return nil, err } return info, nil } func (w *Worker) handleCheckMail(msg *types.CheckMail) { defer log.PanicHandler() if msg.Command == "" { w.err(msg, fmt.Errorf("checkmail: no command specified")) return } ctx, cancel := context.WithTimeout(context.Background(), msg.Timeout) defer cancel() cmd := exec.CommandContext(ctx, "sh", "-c", msg.Command) ch := make(chan error) go func() { defer log.PanicHandler() err := cmd.Run() ch <- err }() select { case <-ctx.Done(): w.err(msg, fmt.Errorf("checkmail: timed out")) case err := <-ch: if err != nil { w.err(msg, fmt.Errorf("checkmail: error running command: %w", err)) } else { dirs, err := w.c.Store.FolderMap() if err != nil { w.err(msg, fmt.Errorf("failed listing directories: %w", err)) } for name, dir := range dirs { err := w.c.SyncNewMail(dir) if err != nil { w.err(msg, fmt.Errorf("could not sync new mail: %w", err)) } dirInfo := w.getDirectoryInfo(name) w.worker.PostMessage(&types.DirectoryInfo{ Info: dirInfo, }, nil) } w.done(msg) } } }