From 280eba48ee382cfe41f0deab1a6898ce0e803351 Mon Sep 17 00:00:00 2001 From: benadha Date: Sat, 8 Jan 2022 06:49:02 +0700 Subject: Initial library implementation --- epy.py | 174 ++++++++++++++++++++++++++++++++++++++++------------------------- 1 file changed, 107 insertions(+), 67 deletions(-) (limited to 'epy.py') diff --git a/epy.py b/epy.py index 88e95c1..261bd6b 100755 --- a/epy.py +++ b/epy.py @@ -42,6 +42,7 @@ import zipfile from typing import Optional, Union, Sequence, Tuple, List, Dict, Mapping, Set, Type, Any from dataclasses import dataclass, field +from datetime import datetime from difflib import SequenceMatcher as SM from enum import Enum from functools import wraps @@ -209,6 +210,35 @@ class BookMetadata: source: Optional[str] = None +@dataclass(frozen=True) +class LibraryItem: + last_read: datetime + filepath: str + title: Optional[str] = None + author: Optional[str] = None + reading_progress: Optional[float] = None + + def __str__(self) -> str: + if self.reading_progress is None: + reading_progress_str = "N/A" + else: + reading_progress_str = f"{int(self.reading_progress * 100)}%" + reading_progress_str = reading_progress_str.rjust(4) + + book_name: str + file_basename = os.path.basename(self.filepath) + if self.title is not None and self.author is not None: + book_name = f"{self.title} - {self.author} ({file_basename})" + elif self.title is None: + book_name = f"{file_basename} - {self.author}" + else: + book_name = file_basename + + last_read_str = self.last_read.strftime("%I:%M %p %b %d, %Y") + + return f"{reading_progress_str} {book_name}. {last_read_str}." + + @dataclass(frozen=True) class ReadingState: """ @@ -1322,40 +1352,56 @@ class State(AppData): def filepath(self) -> str: return os.path.join(self.prefix, "states.db") if self.prefix else os.devnull - def get_from_history(self) -> Tuple[str, ...]: + def get_from_history(self) -> List[LibraryItem]: try: conn = sqlite3.connect(self.filepath) cur = conn.cursor() - cur.execute("SELECT filepath FROM reading_states") + cur.execute( + """ + SELECT last_read, filepath, title, author, reading_progress + FROM library ORDER BY last_read DESC + """ + ) results = cur.fetchall() - return tuple(i[0] for i in results) # type: ignore + library_items: List[LibraryItem] = [] + for result in results: + library_items.append( + LibraryItem( + last_read=datetime.fromisoformat(result[0]), + filepath=result[1], + title=result[2], + author=result[3], + reading_progress=result[4], + ) + ) + return library_items finally: conn.close() - def delete_from_history(self, filepath: str) -> None: + def delete_from_library(self, filepath: str) -> None: try: conn = sqlite3.connect(self.filepath) + conn.execute("PRAGMA foreign_keys = ON") conn.execute("DELETE FROM reading_states WHERE filepath=?", (filepath,)) conn.commit() finally: conn.close() def get_last_read(self) -> Optional[str]: - try: - conn = sqlite3.connect(self.filepath) - cur = conn.cursor() - cur.execute("SELECT filepath FROM last_read WHERE id=0") - res = cur.fetchone() - if res: - return res[0] - return None - finally: - conn.close() + library = self.get_from_history() + return library[0].filepath if library else None - def set_last_read(self, ebook: Ebook) -> None: + def update_library(self, ebook: Ebook, reading_progress: Optional[float]) -> None: try: + metadata = ebook.get_meta() conn = sqlite3.connect(self.filepath) - conn.execute("INSERT OR REPLACE INTO last_read VALUES (0, ?)", (ebook.path,)) + conn.execute( + """ + INSERT OR REPLACE INTO library (filepath, title, author, reading_progress) + VALUES (?, ?, ?, ?) + """, + (ebook.path, metadata.title, metadata.creator, reading_progress), + ) conn.commit() finally: conn.close() @@ -1416,14 +1462,6 @@ class State(AppData): finally: conn.close() - def delete_bookmarks_by_filepath(self, filepath: str) -> None: - try: - conn = sqlite3.connect(self.filepath) - conn.execute("DELETE FROM bookmarks WHERE filepath=?", (filepath,)) - conn.commit() - finally: - conn.close() - def get_bookmarks(self, ebook: Ebook) -> List[Tuple[str, ReadingState]]: try: conn = sqlite3.connect(self.filepath) @@ -1448,15 +1486,7 @@ class State(AppData): def init_db(self) -> None: try: conn = sqlite3.connect(self.filepath) - conn.execute( - """ - CREATE TABLE last_read ( - id INTEGER PRIMARY KEY, - filepath TEXT - ) - """ - ) - conn.execute( + conn.executescript( """ CREATE TABLE reading_states ( filepath TEXT PRIMARY KEY, @@ -1464,11 +1494,18 @@ class State(AppData): textwidth INTEGER, row INTEGER, rel_pctg REAL - ) - """ - ) - conn.execute( - """ + ); + + CREATE TABLE library ( + last_read DATETIME DEFAULT (datetime('now','localtime')), + filepath TEXT PRIMARY KEY, + title TEXT, + author TEXT, + reading_progress REAL, + FOREIGN KEY (filepath) REFERENCES reading_states(filepath) + ON DELETE CASCADE + ); + CREATE TABLE bookmarks ( id TEXT PRIMARY KEY, filepath TEXT, @@ -1476,8 +1513,10 @@ class State(AppData): content_index INTEGER, textwidth INTEGER, row INTEGER, - rel_pctg REAL - ) + rel_pctg REAL, + FOREIGN KEY (filepath) REFERENCES reading_states(filepath) + ON DELETE CASCADE + ); """ ) conn.commit() @@ -2184,6 +2223,15 @@ class Reader: if not self._multiprocess_support: self.letters_count = count_letters(self.ebook) + def try_assign_letters_count(self) -> None: + if isinstance(self._process_counting_letter, multiprocessing.Process): + if self._process_counting_letter.exitcode == 0: + self.letters_count = self._proc_parent.recv() + self._proc_parent.close() + self._process_counting_letter.terminate() + self._process_counting_letter.close() + self._process_counting_letter = None + @property def screen_rows(self) -> int: return self.screen.getmaxyx()[0] @@ -2680,8 +2728,8 @@ class Reader: def savestate(self, reading_state: ReadingState) -> None: if self.seamless: reading_state = Reader.adjust_seamless_reading_state(reading_state) - self.state.set_last_read(self.ebook) self.state.set_last_reading_state(self.ebook, reading_state) + self.state.update_library(self.ebook, self.reading_progress) def cleanup(self) -> None: self.ebook.cleanup() @@ -3433,14 +3481,8 @@ class Reader: self.screen.addstr(0, 0, countstring) board.write(reading_state.row) - # check self._process - if isinstance(self._process_counting_letter, multiprocessing.Process): - if self._process_counting_letter.exitcode == 0: - self.letters_count = self._proc_parent.recv() - self._proc_parent.close() - self._process_counting_letter.terminate() - self._process_counting_letter.close() - self._process_counting_letter = None + # check if letters counting process is done + self.try_assign_letters_count() # reading progress if self.letters_count: @@ -3560,15 +3602,14 @@ def parse_cli_args() -> str: last_read_in_history = app_state.get_last_read() # clean up history from missing file - reading_history = app_state.get_from_history() - is_history_modified = False - for file in reading_history: - if not os.path.isfile(file): - app_state.delete_from_history(file) - app_state.delete_bookmarks_by_filepath(file) - is_history_modified = True - if is_history_modified: - reading_history = app_state.get_from_history() + library = app_state.get_from_history() + is_library_modified = False + for item in library: + if not os.path.isfile(item.filepath): + app_state.delete_from_library(item.filepath) + is_library_modified = True + if is_library_modified: + library = app_state.get_from_history() if len({"-d"} & set(args)) != 0: args.remove("-d") @@ -3592,14 +3633,16 @@ def parse_cli_args() -> str: if len(args) == 1 and re.match(r"[0-9]+", args[0]) is not None: try: # file = list(STATE["States"].keys())[int(args[0]) - 1] - candidate = (reading_history[int(args[0]) - 1], None) + chosen_by_index = library[int(args[0]) - 1] + candidate = (chosen_by_index.filepath, None) except IndexError: pass # find file from history by string matching if (not candidate[0]) or candidate[1]: matching_value = 0 - for file in reading_history: + for item in library: + file = item.filepath this_file_match_value = sum( [ i.size @@ -3618,16 +3661,13 @@ def parse_cli_args() -> str: if (not candidate[0]) or candidate[1] or "-r" in args: print("Reading history:") # dig = len(str(len(STATE["States"].keys()) + 1)) - dig = len(str(len(reading_history) + 1)) + dig = len(str(len(library) + 1)) tcols = termc - dig - 2 - for n, i in enumerate(reading_history): - homedir = os.getenv("HOME") - p = i.replace(homedir, "~") if homedir else i + for n, item in enumerate(library): print( - "{}{} {}".format( + "{}. {}".format( str(n + 1).rjust(dig), - "*" if i == last_read_in_history else " ", - truncate(p, "...", tcols, 7), + truncate(str(item), "...", tcols, 7), ) ) if "-r" in args: -- cgit