diff options
Diffstat (limited to 'epy.py')
-rwxr-xr-x | epy.py | 3976 |
1 files changed, 0 insertions, 3976 deletions
@@ -1,3976 +0,0 @@ -#!/usr/bin/env python3 -# vim:tabstop=4:shiftwidth=4:softtabstop=4:smarttab:expandtab:foldmethod=marker -"""\ -usage: epy [-h] [-r] [-d] [-v] [PATH | # | PATTERN | URL] - -Read ebook in terminal - -positional arguments: - [ PATH | # | PATTERN | URL ] - ebook path, history number, pattern or URL - -optional arguments: - -h, --help show this help message and exit - -r, --history print reading history - -d, --dump dump the content of ebook - -v, --version print version and exit - -examples: - epy /path/to/ebook read /path/to/ebook file - epy 3 read #3 file from reading history - epy count monte read file matching 'count monte' - from reading history -""" - - -__version__ = "2022.9.24" -__license__ = "GPL-3.0" -__author__ = "Benawi Adha" -__email__ = "benawiadha@gmail.com" -__url__ = "https://github.com/wustho/epy" - - -# Imports {{{ - -import argparse -import base64 -import contextlib -import curses -import dataclasses -import hashlib -import json -import multiprocessing -import os -import re -import shutil -import signal -import sqlite3 -import subprocess -import sys -import tempfile -import textwrap -import uuid -import xml.etree.ElementTree as ET -import zipfile -import zlib - -from typing import Optional, Union, Sequence, Tuple, List, Dict, Mapping, Set, Type, Any -from dataclasses import dataclass, field -from datetime import datetime -from difflib import SequenceMatcher as SM -from enum import Enum -from functools import wraps -from html import unescape -from html.parser import HTMLParser -from pathlib import PurePosixPath -from urllib.error import HTTPError, URLError -from urllib.parse import unquote, urljoin, urlparse -from urllib.request import Request, urlopen - -try: - from epy_extras import unpackBook # type: ignore - - MOBI_SUPPORT = True -except ModuleNotFoundError: - MOBI_SUPPORT = False - -# }}} - - -# Debug Utils {{{ - -try: - # Debug swith - # $ DEBUG=1 ./epy.py - DEBUG = int(str(os.getenv("DEBUG"))) == 1 -except ValueError: - DEBUG = False - -# }}} - - -# Data Models {{{ - -# add image viewers here -# sorted by most widely used -VIEWER_PRESET_LIST = ( - "feh", - "imv", - "gio", - "gnome-open", - "gvfs-open", - "xdg-open", - "kde-open", - "firefox", -) - -DICT_PRESET_LIST = ( - "wkdict", - "sdcv", - "dict", -) - - -class Direction(Enum): - FORWARD = "forward" - BACKWARD = "backward" - - -class DoubleSpreadPadding(Enum): - LEFT = 10 - MIDDLE = 7 - RIGHT = 10 - - -@dataclass(frozen=True) -class BookMetadata: - title: Optional[str] = None - creator: Optional[str] = None - description: Optional[str] = None - publisher: Optional[str] = None - date: Optional[str] = None - language: Optional[str] = None - format: Optional[str] = None - identifier: Optional[str] = None - source: Optional[str] = None - - -@dataclass(frozen=True) -class LibraryItem: - last_read: datetime - filepath: str - title: Optional[str] = None - author: Optional[str] = None - reading_progress: Optional[float] = None - - def __str__(self) -> str: - if self.reading_progress is None: - reading_progress_str = "N/A" - else: - reading_progress_str = f"{int(self.reading_progress * 100)}%" - reading_progress_str = reading_progress_str.rjust(4) - - book_name: str - filename = self.filepath.replace(os.path.expanduser("~"), "~", 1) - if self.title is not None and self.author is not None: - book_name = f"{self.title} - {self.author} ({filename})" - elif self.title is None and self.author: - book_name = f"{filename} - {self.author}" - else: - book_name = filename - - last_read_str = self.last_read.strftime("%I:%M%p %b %d") - - return f"{reading_progress_str} {last_read_str}: {book_name}" - - -@dataclass(frozen=True) -class ReadingState: - """ - Data model for reading state. - - `row` has to be explicitly assigned with value - because Seamless feature needs it to adjust from - relative (to book's content index) row to absolute - (to book's entire content) row. - - `rel_pctg` and `section` default to None and if - either of them is assigned with value, then it - will be overriding the `row` value. - """ - - content_index: int - textwidth: int - row: int - rel_pctg: Optional[float] = None - section: Optional[str] = None - - -@dataclass(frozen=True) -class SearchData: - direction: Direction = Direction.FORWARD - value: str = "" - - -@dataclass(frozen=True) -class LettersCount: - """ - all: total letters in book - cumulative: list of total letters for previous contents - eg. let's say cumulative = (0, 50, 89, ...) it means - 0 is total cumulative letters of book contents[-1] to contents[0] - 50 is total cumulative letters of book contents[0] to contents[1] - 89 is total cumulative letters of book contents[0] to contents[2] - """ - - all: int - cumulative: Tuple[int, ...] - - -@dataclass(frozen=True) -class CharPos: - """ - Describes character position in text. - eg. ["Lorem ipsum dolor sit amet,", # row=0 - "consectetur adipiscing elit."] # row=1 - ^CharPos(row=1, col=3) - """ - - row: int - col: int - - -@dataclass(frozen=True) -class TextMark: - """ - Describes marking in text. - eg. Interval [CharPos(row=0, col=3), CharPos(row=1, col=4)] - notice the marking inclusive [] for both side instead of right exclusive [) - """ - - start: CharPos - end: Optional[CharPos] = None - - def is_valid(self) -> bool: - """ - Assert validity and check if the mark is unterminated - eg. <div><i>This is italic text</div> - Missing </i> tag - """ - if self.end is not None: - if self.start.row == self.end.row: - return self.start.col <= self.end.col - else: - return self.start.row < self.end.row - - return False - - -@dataclass(frozen=True) -class TextSpan: - """ - Like TextMark but using span of letters (n_letters) - """ - - start: CharPos - n_letters: int - - -@dataclass(frozen=True) -class InlineStyle: - """ - eg. InlineStyle(attr=curses.A_BOLD, row=3, cols=4, n_letters=3) - """ - - row: int - col: int - n_letters: int - attr: int - - -@dataclass(frozen=True) -class TocEntry: - label: str - content_index: int - section: Optional[str] - - -@dataclass(frozen=True) -class TextStructure: - """ - Object that describes how the text - should be displayed in screen. - - text_lines: ("list of lines", "of text", ...) - image_maps: {line_num: path/to/image/in/ebook/zip} - section_rows: {section_id: line_num} - formatting: (InlineStyle, ...) - """ - - text_lines: Tuple[str, ...] - image_maps: Mapping[int, str] - section_rows: Mapping[str, int] - formatting: Tuple[InlineStyle, ...] - - -@dataclass(frozen=True) -class NoUpdate: - pass - - -class Key: - """ - Because ord("k") chr(34) are confusing - """ - - def __init__(self, char_or_int: Union[str, int]): - self.value: int = char_or_int if isinstance(char_or_int, int) else ord(char_or_int) - self.char: str = char_or_int if isinstance(char_or_int, str) else chr(char_or_int) - - def __eq__(self, other: Any) -> bool: - if isinstance(other, Key): - return self.value == other.value - return False - - def __ne__(self, other: Any) -> bool: - return self.__eq__(other) - - def __hash__(self) -> int: - return hash(self.value) - - -@dataclass(frozen=True) -class Settings: - DefaultViewer: str = "auto" - DictionaryClient: str = "auto" - ShowProgressIndicator: bool = True - PageScrollAnimation: bool = True - MouseSupport: bool = False - StartWithDoubleSpread: bool = False - # -1 is default terminal fg/bg colors - DefaultColorFG: int = -1 - DefaultColorBG: int = -1 - DarkColorFG: int = 252 - DarkColorBG: int = 235 - LightColorFG: int = 238 - LightColorBG: int = 253 - SeamlessBetweenChapters: bool = False - PreferredTTSEngine: Optional[str] = None - TTSEngineArgs: List[str] = field(default_factory=list) - - -@dataclass(frozen=True) -class CfgDefaultKeymaps: - ScrollUp: str = "k" - ScrollDown: str = "j" - PageUp: str = "h" - PageDown: str = "l" - # HalfScreenUp: str = "h" - # HalfScreenDown: str - NextChapter: str = "L" - PrevChapter: str = "H" - BeginningOfCh: str = "g" - EndOfCh: str = "G" - Shrink: str = "-" - Enlarge: str = "+" - SetWidth: str = "=" - Metadata: str = "M" - DefineWord: str = "d" - TableOfContents: str = "t" - Follow: str = "f" - OpenImage: str = "o" - RegexSearch: str = "/" - ShowHideProgress: str = "s" - MarkPosition: str = "m" - JumpToPosition: str = "`" - AddBookmark: str = "b" - ShowBookmarks: str = "B" - Quit: str = "q" - Help: str = "?" - SwitchColor: str = "c" - TTSToggle: str = "!" - DoubleSpreadToggle: str = "D" - Library: str = "R" - - -@dataclass(frozen=True) -class CfgBuiltinKeymaps: - ScrollUp: Tuple[int, ...] = (curses.KEY_UP,) - ScrollDown: Tuple[int, ...] = (curses.KEY_DOWN,) - PageUp: Tuple[int, ...] = (curses.KEY_PPAGE, curses.KEY_LEFT) - PageDown: Tuple[int, ...] = (curses.KEY_NPAGE, ord(" "), curses.KEY_RIGHT) - BeginningOfCh: Tuple[int, ...] = (curses.KEY_HOME,) - EndOfCh: Tuple[int, ...] = (curses.KEY_END,) - TableOfContents: Tuple[int, ...] = (9, ord("\t")) - Follow: Tuple[int, ...] = (10,) - Quit: Tuple[int, ...] = (3, 27, 304) - - -@dataclass(frozen=True) -class Keymap: - # HalfScreenDown: Tuple[Key, ...] - # HalfScreenUp: Tuple[Key, ...] - AddBookmark: Tuple[Key, ...] - BeginningOfCh: Tuple[Key, ...] - DefineWord: Tuple[Key, ...] - DoubleSpreadToggle: Tuple[Key, ...] - EndOfCh: Tuple[Key, ...] - Enlarge: Tuple[Key, ...] - Follow: Tuple[Key, ...] - Help: Tuple[Key, ...] - JumpToPosition: Tuple[Key, ...] - Library: Tuple[Key, ...] - MarkPosition: Tuple[Key, ...] - Metadata: Tuple[Key, ...] - NextChapter: Tuple[Key, ...] - OpenImage: Tuple[Key, ...] - PageDown: Tuple[Key, ...] - PageUp: Tuple[Key, ...] - PrevChapter: Tuple[Key, ...] - Quit: Tuple[Key, ...] - RegexSearch: Tuple[Key, ...] - ScrollDown: Tuple[Key, ...] - ScrollUp: Tuple[Key, ...] - SetWidth: Tuple[Key, ...] - ShowBookmarks: Tuple[Key, ...] - ShowHideProgress: Tuple[Key, ...] - Shrink: Tuple[Key, ...] - SwitchColor: Tuple[Key, ...] - TTSToggle: Tuple[Key, ...] - TableOfContents: Tuple[Key, ...] - - -# }}} - - -# Speaker / TTS Engine Wrappers {{{ - - -class SpeakerBaseModel: - cmd: str = "tts_engine_binary" - available: bool = False - - def __init__(self, args: List[str] = []): - self.args = args - - def speak(self, text: str) -> None: - raise NotImplementedError("Speaker.speak() not implemented") - - def is_done(self) -> bool: - raise NotImplementedError("Speaker.is_done() not implemented") - - def stop(self) -> None: - raise NotImplementedError("Speaker.stop() not implemented") - - def cleanup(self) -> None: - raise NotImplementedError("Speaker.cleanup() not implemented") - - -class SpeakerMimic(SpeakerBaseModel): - cmd = "mimic" - available = bool(shutil.which("mimic")) - - def speak(self, text: str) -> None: - self.process = subprocess.Popen( - [self.cmd, *self.args], - text=True, - stdin=subprocess.PIPE, - stdout=subprocess.DEVNULL, - stderr=subprocess.STDOUT, - ) - assert self.process.stdin - self.process.stdin.write(text) - self.process.stdin.close() - - def is_done(self) -> bool: - return self.process.poll() is not None - - def stop(self) -> None: - self.process.terminate() - # self.process.kill() - - def cleanup(self) -> None: - pass - - -class SpeakerPico(SpeakerBaseModel): - cmd = "pico2wave" - available = all([shutil.which(dep) for dep in ["pico2wave", "play"]]) - - def speak(self, text: str) -> None: - _, self.tmp_path = tempfile.mkstemp(suffix=".wav") - - try: - subprocess.run( - [self.cmd, *self.args, "-w", self.tmp_path, text], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - check=True, - ) - except subprocess.CalledProcessError as e: - if "invalid pointer" not in e.output: - sys.exit(e.output) - - self.process = subprocess.Popen( - ["play", self.tmp_path], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - - def is_done(self) -> bool: - return self.process.poll() is not None - - def stop(self) -> None: - self.process.terminate() - # self.process.kill() - - def cleanup(self) -> None: - os.remove(self.tmp_path) - - -# register wrappers here -SPEAKERS: List[Type[SpeakerBaseModel]] = [SpeakerMimic, SpeakerPico] - -# }}} - - -# Ebooks {{{ - - -class Ebook: - def __init__(self, fileepub: str): - raise NotImplementedError("Ebook.__init__() not implemented") - - @property - def path(self) -> str: - return self._path - - @path.setter - def path(self, value: str) -> None: - self._path = value - - @property - def contents(self) -> Union[Tuple[str, ...], Tuple[ET.Element, ...]]: - return self._contents - - @contents.setter - def contents(self, value: Union[Tuple[str, ...], Tuple[ET.Element, ...]]) -> None: - self._contents = value - - @property - def toc_entries(self) -> Tuple[TocEntry, ...]: - return self._toc_entries - - @toc_entries.setter - def toc_entries(self, value: Tuple[TocEntry, ...]) -> None: - self._toc_entries = value - - def get_meta(self) -> BookMetadata: - raise NotImplementedError("Ebook.get_meta() not implemented") - - def initialize(self) -> None: - raise NotImplementedError("Ebook.initialize() not implemented") - - def get_raw_text(self, content: Union[str, ET.Element]) -> str: - raise NotImplementedError("Ebook.get_raw_text() not implemented") - - def get_img_bytestr(self, impath: str) -> Tuple[str, bytes]: - raise NotImplementedError("Ebook.get_img_bytestr() not implemented") - - def cleanup(self) -> None: - raise NotImplementedError("Ebook.cleanup() not implemented") - - -class Epub(Ebook): - NAMESPACE = { - "DAISY": "http://www.daisy.org/z3986/2005/ncx/", - "OPF": "http://www.idpf.org/2007/opf", - "CONT": "urn:oasis:names:tc:opendocument:xmlns:container", - "XHTML": "http://www.w3.org/1999/xhtml", - "EPUB": "http://www.idpf.org/2007/ops", - # Dublin Core - "DC": "http://purl.org/dc/elements/1.1/", - } - - def __init__(self, fileepub: str): - self.path: str = os.path.abspath(fileepub) - self.file: Union[zipfile.ZipFile, str] = zipfile.ZipFile(fileepub, "r") - - # populate these attributes - # by calling self.initialize() - self.root_filepath: str - self.root_dirpath: str - - def get_meta(self) -> BookMetadata: - assert isinstance(self.file, zipfile.ZipFile) - # why self.file.read(self.root_filepath) problematic - # content_opf = ET.fromstring(self.file.open(self.root_filepath).read()) - content_opf = ET.parse(self.file.open(self.root_filepath)) - return Epub._get_metadata(content_opf) - - @staticmethod - def _get_metadata(content_opf: ET.ElementTree) -> BookMetadata: - metadata: Dict[str, Optional[str]] = {} - for field in dataclasses.fields(BookMetadata): - element = content_opf.find(f".//DC:{field.name}", Epub.NAMESPACE) - if element is not None: - metadata[field.name] = element.text - - return BookMetadata(**metadata) - - @staticmethod - def _get_contents(content_opf: ET.ElementTree) -> Tuple[str, ...]: - # cont = ET.parse(self.file.open(self.root_filepath)).getroot() - manifests: List[Tuple[str, str]] = [] - for manifest_elem in content_opf.findall("OPF:manifest/*", Epub.NAMESPACE): - # EPUB3 - # if manifest_elem.get("id") != "ncx" and manifest_elem.get("properties") != "nav": - if ( - manifest_elem.get("media-type") != "application/x-dtbncx+xml" - and manifest_elem.get("properties") != "nav" - ): - manifest_id = manifest_elem.get("id") - assert manifest_id is not None - manifest_href = manifest_elem.get("href") - assert manifest_href is not None - manifests.append((manifest_id, manifest_href)) - - spines: List[str] = [] - contents: List[str] = [] - for spine_elem in content_opf.findall("OPF:spine/*", Epub.NAMESPACE): - idref = spine_elem.get("idref") - assert idref is not None - spines.append(idref) - for spine in spines: - for manifest in manifests: - if spine == manifest[0]: - # book_contents.append(root_dirpath + unquote(manifest[1])) - contents.append(unquote(manifest[1])) - manifests.remove(manifest) - # TODO: test is break necessary - break - - return tuple(contents) - - @staticmethod - def _get_tocs(toc: ET.Element, version: str, contents: Sequence[str]) -> Tuple[TocEntry, ...]: - try: - # EPUB3 - if version in {"1.0", "2.0"}: - navPoints = toc.findall("DAISY:navMap//DAISY:navPoint", Epub.NAMESPACE) - elif version == "3.0": - navPoints = toc.findall( - "XHTML:body//XHTML:nav[@EPUB:type='toc']//XHTML:a", Epub.NAMESPACE - ) - - toc_entries: List[TocEntry] = [] - for navPoint in navPoints: - if version in {"1.0", "2.0"}: - src_elem = navPoint.find("DAISY:content", Epub.NAMESPACE) - assert src_elem is not None - src = src_elem.get("src") - - name_elem = navPoint.find("DAISY:navLabel/DAISY:text", Epub.NAMESPACE) - assert name_elem is not None - name = name_elem.text - elif version == "3.0": - src_elem = navPoint - assert src_elem is not None - src = src_elem.get("href") - - name = "".join(list(navPoint.itertext())) - - assert src is not None - src_id = src.split("#") - - try: - idx = contents.index(unquote(src_id[0])) - except ValueError: - continue - - # assert name is not None - # NOTE: skip empty label - if name is not None: - toc_entries.append( - TocEntry( - label=name, - content_index=idx, - section=src_id[1] if len(src_id) == 2 else None, - ) - ) - except AttributeError as e: - if DEBUG: - raise e - - return tuple(toc_entries) - - def initialize(self) -> None: - assert isinstance(self.file, zipfile.ZipFile) - - container = ET.parse(self.file.open("META-INF/container.xml")) - rootfile_elem = container.find("CONT:rootfiles/CONT:rootfile", Epub.NAMESPACE) - assert rootfile_elem is not None - self.root_filepath = rootfile_elem.attrib["full-path"] - self.root_dirpath = ( - os.path.dirname(self.root_filepath) + "/" - if os.path.dirname(self.root_filepath) != "" - else "" - ) - - content_opf = ET.parse(self.file.open(self.root_filepath)) - version = content_opf.getroot().get("version") - - contents = Epub._get_contents(content_opf) - self.contents = tuple(urljoin(self.root_dirpath, content) for content in contents) - - if version in {"1.0", "2.0"}: - # "OPF:manifest/*[@id='ncx']" - relative_toc = content_opf.find( - "OPF:manifest/*[@media-type='application/x-dtbncx+xml']", Epub.NAMESPACE - ) - elif version == "3.0": - relative_toc = content_opf.find("OPF:manifest/*[@properties='nav']", Epub.NAMESPACE) - else: - raise RuntimeError(f"Unsupported Epub version: {version}") - assert relative_toc is not None - relative_toc_path = relative_toc.get("href") - assert relative_toc_path is not None - toc_path = self.root_dirpath + relative_toc_path - toc = ET.parse(self.file.open(toc_path)).getroot() - self.toc_entries = Epub._get_tocs(toc, version, contents) # *self.contents (absolute path) - - def get_raw_text(self, content_path: Union[str, ET.Element]) -> str: - assert isinstance(self.file, zipfile.ZipFile) - assert isinstance(content_path, str) - - max_tries: Optional[int] = None # 1 if DEBUG else None - - # use try-except block to catch - # zlib.error: Error -3 while decompressing data: invalid distance too far back - # seems like caused by multiprocessing - tries = 0 - while True: - try: - content = self.file.open(content_path).read() - break - except zlib.error as e: - tries += 1 - if max_tries is not None and tries >= max_tries: - raise e - - return content.decode("utf-8") - - def get_img_bytestr(self, impath: str) -> Tuple[str, bytes]: - assert isinstance(self.file, zipfile.ZipFile) - return impath, self.file.read(impath) - - def cleanup(self) -> None: - pass - - -class Mobi(Epub): - def __init__(self, filemobi: str): - self.path = os.path.abspath(filemobi) - self.file = tempfile.mkdtemp(prefix="epy-") - - # populate these attribute - # by calling self.initialize() - self.root_filepath: str - self.root_dirpath: str - - def get_meta(self) -> BookMetadata: - # why self.file.read(self.root_filepath) problematic - with open(os.path.join(self.root_dirpath, "content.opf")) as f: - content_opf = ET.parse(f) # .getroot() - return Epub._get_metadata(content_opf) - - def initialize(self) -> None: - assert isinstance(self.file, str) - - with contextlib.redirect_stdout(None): - unpackBook(self.path, self.file, epubver="A", use_hd=True) - # TODO: add cleanup here - - self.root_dirpath = os.path.join(self.file, "mobi7") - self.toc_path = os.path.join(self.root_dirpath, "toc.ncx") - version = "2.0" - - with open(os.path.join(self.root_dirpath, "content.opf")) as f: - content_opf = ET.parse(f) # .getroot() - - contents = Epub._get_contents(content_opf) - self.contents = tuple(os.path.join(self.root_dirpath, content) for content in contents) - - with open(self.toc_path) as f: - toc = ET.parse(f).getroot() - self.toc_entries = Epub._get_tocs(toc, version, contents) # *self.contents (absolute path) - - def get_raw_text(self, content_path: Union[str, ET.Element]) -> str: - assert isinstance(content_path, str) - with open(content_path, encoding="utf8") as f: - content = f.read() - # return content.decode("utf-8") - return content - - def get_img_bytestr(self, impath: str) -> Tuple[str, bytes]: - # TODO: test on windows - # if impath "Images/asdf.png" is problematic - image_abspath = os.path.join(self.root_dirpath, impath) - image_abspath = os.path.normpath(image_abspath) # handle crossplatform path - with open(image_abspath, "rb") as f: - src = f.read() - return impath, src - - def cleanup(self) -> None: - assert isinstance(self.file, str) - shutil.rmtree(self.file) - return - - -class Azw(Epub): - def __init__(self, fileepub): - self.path = os.path.abspath(fileepub) - self.tmpdir = tempfile.mkdtemp(prefix="epy-") - basename, _ = os.path.splitext(os.path.basename(self.path)) - self.tmpepub = os.path.join(self.tmpdir, "mobi8", basename + ".epub") - - def initialize(self): - with contextlib.redirect_stdout(None): - unpackBook(self.path, self.tmpdir, epubver="A", use_hd=True) - self.file = zipfile.ZipFile(self.tmpepub, "r") - Epub.initialize(self) - - def cleanup(self) -> None: - shutil.rmtree(self.tmpdir) - return - - -class FictionBook(Ebook): - NAMESPACE = {"FB2": "http://www.gribuser.ru/xml/fictionbook/2.0"} - - def __init__(self, filefb: str): - self.path = os.path.abspath(filefb) - self.file = filefb - - # populate these attribute - # by calling self.initialize() - self.root: ET.Element - - def get_meta(self) -> BookMetadata: - title_elem = self.root.find(".//FB2:book-title", FictionBook.NAMESPACE) - first_name_elem = self.root.find(".//FB2:first-name", FictionBook.NAMESPACE) - last_name_elem = self.root.find(".//FB2:last-name", FictionBook.NAMESPACE) - date_elem = self.root.find(".//FB2:date", FictionBook.NAMESPACE) - identifier_elem = self.root.find(".//FB2:id", FictionBook.NAMESPACE) - - author = first_name_elem.text if first_name_elem is not None else None - if last_name_elem is not None: - if author is not None and author != "": - author += f" {last_name_elem.text}" - else: - author = last_name_elem.text - - return BookMetadata( - title=title_elem.text if title_elem is not None else None, - creator=author, - date=date_elem.text if date_elem is not None else None, - identifier=identifier_elem.text if identifier_elem is not None else None, - ) - - def initialize(self) -> None: - cont = ET.parse(self.file) - self.root = cont.getroot() - - self.contents = tuple(self.root.findall("FB2:body/*", FictionBook.NAMESPACE)) - - # TODO - toc_entries: List[TocEntry] = [] - for n, i in enumerate(self.contents): - title = i.find("FB2:title", FictionBook.NAMESPACE) - if title is not None: - toc_entries.append( - TocEntry(label="".join(title.itertext()), content_index=n, section=None) - ) - self.toc_entries = tuple(toc_entries) - - def get_raw_text(self, node: Union[str, ET.Element]) -> str: - assert isinstance(node, ET.Element) - ET.register_namespace("", "http://www.gribuser.ru/xml/fictionbook/2.0") - # sys.exit(ET.tostring(node, encoding="utf8", method="html").decode("utf-8").replace("ns1:","")) - return ET.tostring(node, encoding="utf8", method="html").decode("utf-8").replace("ns1:", "") - - def get_img_bytestr(self, imgid: str) -> Tuple[str, bytes]: - # TODO: test if image works - imgid = imgid.replace("#", "") - img_elem = self.root.find("*[@id='{}']".format(imgid)) - assert img_elem is not None - imgtype = img_elem.get("content-type") - img_elem_text = img_elem.text - assert imgtype is not None - assert img_elem_text is not None - return imgid + "." + imgtype.split("/")[1], base64.b64decode(img_elem_text) - - def cleanup(self) -> None: - return - - -class URL(Ebook): - _header = { - "User-Agent": f"epy/v{__version__}", - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", - "Accept-Language": "en-US,en;q=0.8", - } - - def __init__(self, url: str): - self.path = url - self.file = url - self.contents = ("_",) - self.toc_entries = tuple() - - def get_meta(self) -> BookMetadata: - return BookMetadata() - - def initialize(self) -> None: - try: - with urlopen(Request(self.path, headers=URL._header)) as response: - self.html = response.read().decode() - except HTTPError as e: - raise e - except URLError as e: - raise e - - def get_raw_text(self, _) -> str: - return self.html - - def get_img_bytestr(self, src: str) -> Tuple[str, bytes]: - image_url = src if is_url(src) else urljoin(self.path, src) - # TODO: catch error on request - with urlopen(Request(image_url, headers=URL._header)) as response: - byte_str = response.read() - return PurePosixPath(urlparse(src).path).name, byte_str - - def cleanup(self) -> None: - return - - -# }}} - - -# HTML & Text Parser {{{ - - -class HTMLtoLines(HTMLParser): - para = {"p", "div"} - inde = {"q", "dt", "dd", "blockquote"} - pref = {"pre"} - bull = {"li"} - hide = {"script", "style", "head"} - ital = {"i", "em"} - bold = {"b", "strong"} - # hide = {"script", "style", "head", ", "sub} - # sup_lookup = "⁰¹²³⁴⁵⁶⁷⁸⁹" - # sub_lookup = "₀₁₂₃₄₅₆₇₈₉" - - attr_bold = curses.A_BOLD - try: - attr_italic = curses.A_ITALIC - except AttributeError: - try: - attr_italic = curses.A_UNDERLINE - except AttributeError: - attr_italic = curses.A_NORMAL - - @staticmethod - def _mark_to_spans(text: Sequence[str], marks: Sequence[TextMark]) -> List[TextSpan]: - """ - Convert text marks in line of text to per line text span. - Keeping duplicate spans. - """ - spans: List[TextSpan] = [] - for mark in marks: - if mark.is_valid(): - # mypy issue, should be handled by mark.is_valid() - assert mark.end is not None - if mark.start.row == mark.end.row: - spans.append( - TextSpan(start=mark.start, n_letters=mark.end.col - mark.start.col) - ) - else: - spans.append( - TextSpan( - start=mark.start, n_letters=len(text[mark.start.row]) - mark.start.col - ) - ) - for nth_line in range(mark.start.row + 1, mark.end.row): - spans.append( - TextSpan( - start=CharPos(row=nth_line, col=0), n_letters=len(text[nth_line]) - ) - ) - spans.append( - TextSpan(start=CharPos(row=mark.end.row, col=0), n_letters=mark.end.col) - ) - - return spans # list(set(spans)) - - @staticmethod - def _adjust_wrapped_spans( - wrapped_lines: Sequence[str], - span: TextSpan, - *, - line_adjustment: int = 0, - left_adjustment: int = 0, - ) -> List[TextSpan]: - """ - Adjust text span to wrapped lines. - Not perfect, but should be good enough considering - the limitation on commandline interface. - """ - - # current_row = span.start.row + line_adjustment - current_row = line_adjustment - start_col = span.start.col - end_col = start_col + span.n_letters - - prev = 0 # chars length before current line - spans: List[TextSpan] = [] - for n, line in enumerate(wrapped_lines): - # + 1 compensates textwrap.wrap(*args, replace_whitespace=True, drop_whitespace=True) - line_len = len(line) + 1 - current = prev + line_len # chars length before next line - - # -:unmarked *:marked - # |------*****--------| - if start_col in range(prev, current) and end_col in range(prev, current): - spans.append( - TextSpan( - start=CharPos(row=current_row + n, col=start_col - prev + left_adjustment), - n_letters=span.n_letters, - ) - ) - - # |----------*********| - elif start_col in range(prev, current): - spans.append( - TextSpan( - start=CharPos(row=current_row + n, col=start_col - prev + left_adjustment), - n_letters=current - start_col - 1, # -1: dropped whitespace - ) - ) - - # |********-----------| - elif end_col in range(prev, current): - spans.append( - TextSpan( - start=CharPos(row=current_row + n, col=0 + left_adjustment), - n_letters=end_col - prev + 1, # +1: dropped whitespace - ) - ) - - # |*******************| - elif prev in range(start_col, end_col) and current in range(start_col, end_col): - spans.append( - TextSpan( - start=CharPos(row=current_row + n, col=0 + left_adjustment), - n_letters=line_len - 1, # -1: dropped whitespace - ) - ) - - elif prev > end_col: - break - - prev = current - - return spans - - @staticmethod - def _group_spans_by_row(blocks: Sequence[TextSpan]) -> Mapping[int, List[TextSpan]]: - groups: Dict[int, List[TextSpan]] = {} - for block in blocks: - row = block.start.row - if row in groups: - groups[row].append(block) - else: - groups[row] = [block] - return groups - - def __init__(self, sects={""}): - HTMLParser.__init__(self) - self.text = [""] - self.ishead = False - self.isinde = False - self.isbull = False - self.ispref = False - self.ishidden = False - self.idhead = set() - self.idinde = set() - self.idbull = set() - self.idpref = set() - self.idimgs = set() - self.sects = sects - self.sectsindex = {} - self.italic_marks: List[TextMark] = [] - self.bold_marks: List[TextMark] = [] - self.imgs: Dict[int, str] = dict() - - def handle_starttag(self, tag, attrs): - if re.match("h[1-6]", tag) is not None: - self.ishead = True - elif tag in self.inde: - self.isinde = True - elif tag in self.pref: - self.ispref = True - elif tag in self.bull: - self.isbull = True - elif tag in self.hide: - self.ishidden = True - elif tag == "sup": - self.text[-1] += "^{" - elif tag == "sub": - self.text[-1] += "_{" - # NOTE: "img" and "image" - # In HTML, both are startendtag (no need endtag) - # but in XHTML both need endtag - elif tag in {"img", "image"}: - for i in attrs: - if (tag == "img" and i[0] == "src") or (tag == "image" and i[0].endswith("href")): - this_line = len(self.text) - self.idimgs.add(this_line) - self.imgs[this_line] = unquote(i[1]) - self.text.append("[IMAGE]") - # formatting - elif tag in self.ital: - if len(self.italic_marks) == 0 or self.italic_marks[-1].is_valid(): - char_pos = CharPos(row=len(self.text) - 1, col=len(self.text[-1])) - self.italic_marks.append(TextMark(start=char_pos)) - elif tag in self.bold: - if len(self.bold_marks) == 0 or self.bold_marks[-1].is_valid(): - char_pos = CharPos(row=len(self.text) - 1, col=len(self.text[-1])) - self.bold_marks.append(TextMark(start=char_pos)) - if self.sects != {""}: - for i in attrs: - if i[0] == "id" and i[1] in self.sects: - # self.text[-1] += " (#" + i[1] + ") " - # self.sectsindex.append([len(self.text), i[1]]) - self.sectsindex[len(self.text) - 1] = i[1] - - def handle_startendtag(self, tag, attrs): - if tag == "br": - self.text += [""] - elif tag in {"img", "image"}: - for i in attrs: - # if (tag == "img" and i[0] == "src")\ - # or (tag == "image" and i[0] == "xlink:href"): - if (tag == "img" and i[0] == "src") or (tag == "image" and i[0].endswith("href")): - this_line = len(self.text) - self.idimgs.add(this_line) - self.imgs[this_line] = unquote(i[1]) - self.text.append("[IMAGE]") - self.text.append("") - # sometimes attribute "id" is inside "startendtag" - # especially html from mobi module (kindleunpack fork) - if self.sects != {""}: - for i in attrs: - if i[0] == "id" and i[1] in self.sects: - # self.text[-1] += " (#" + i[1] + ") " - self.sectsindex[len(self.text) - 1] = i[1] - - def handle_endtag(self, tag): - if re.match("h[1-6]", tag) is not None: - self.text.append("") - self.text.append("") - self.ishead = False - elif tag in self.para: - self.text.append("") - elif tag in self.hide: - self.ishidden = False - elif tag in self.inde: - if self.text[-1] != "": - self.text.append("") - self.isinde = False - elif tag in self.pref: - if self.text[-1] != "": - self.text.append("") - self.ispref = False - elif tag in self.bull: - if self.text[-1] != "": - self.text.append("") - self.isbull = False - elif tag in {"sub", "sup"}: - self.text[-1] += "}" - elif tag in {"img", "image"}: - self.text.append("") - # formatting - elif tag in self.ital: - char_pos = CharPos(row=len(self.text) - 1, col=len(self.text[-1])) - last_mark = self.italic_marks[-1] - self.italic_marks[-1] = dataclasses.replace(last_mark, end=char_pos) - elif tag in self.bold: - char_pos = CharPos(row=len(self.text) - 1, col=len(self.text[-1])) - last_mark = self.bold_marks[-1] - self.bold_marks[-1] = dataclasses.replace(last_mark, end=char_pos) - - def handle_data(self, raw): - if raw and not self.ishidden: - if self.text[-1] == "": - tmp = raw.lstrip() - else: - tmp = raw - if self.ispref: - line = unescape(tmp) - else: - line = unescape(re.sub(r"\s+", " ", tmp)) - self.text[-1] += line - if self.ishead: - self.idhead.add(len(self.text) - 1) - elif self.isbull: - self.idbull.add(len(self.text) - 1) - elif self.isinde: - self.idinde.add(len(self.text) - 1) - elif self.ispref: - self.idpref.add(len(self.text) - 1) - - def get_structured_text( - self, textwidth: Optional[int] = 0, starting_line: int = 0 - ) -> Union[Tuple[str, ...], TextStructure]: - - if not textwidth: - return tuple(self.text) - - # reusable loop indices - i: Any - - text: List[str] = [] - images: Dict[int, str] = dict() # {line_num: path/in/zip} - sect: Dict[str, int] = dict() # {section_id: line_num} - formatting: List[InlineStyle] = [] - - italic_spans: List[TextSpan] = HTMLtoLines._mark_to_spans(self.text, self.italic_marks) - bold_spans: List[TextSpan] = HTMLtoLines._mark_to_spans(self.text, self.bold_marks) - italic_groups = HTMLtoLines._group_spans_by_row(italic_spans) - bold_groups = HTMLtoLines._group_spans_by_row(bold_spans) - - for n, line in enumerate(self.text): - - startline = len(text) - # findsect = re.search(r"(?<= \(#).*?(?=\) )", line) - # if findsect is not None and findsect.group() in self.sects: - # line = line.replace(" (#" + findsect.group() + ") ", "") - # # line = line.replace(" (#" + findsect.group() + ") ", " "*(5+len(findsect.group()))) - # sect[findsect.group()] = len(text) - if n in self.sectsindex.keys(): - sect[self.sectsindex[n]] = starting_line + len(text) - if n in self.idhead: - # text += [line.rjust(textwidth // 2 + len(line) // 2)] + [""] - text += [line.center(textwidth)] + [""] - formatting += [ - InlineStyle( - row=starting_line + i, col=0, n_letters=len(text[i]), attr=self.attr_bold - ) - for i in range(startline, len(text)) - ] - elif n in self.idinde: - text += [" " + i for i in textwrap.wrap(line, textwidth - 3)] + [""] - elif n in self.idbull: - tmp = textwrap.wrap(line, textwidth - 3) - text += [" - " + i if i == tmp[0] else " " + i for i in tmp] + [""] - elif n in self.idpref: - tmp = line.splitlines() - wraptmp = [] - for tmp_line in tmp: - wraptmp += [i for i in textwrap.wrap(tmp_line, textwidth - 6)] - text += [" " + i for i in wraptmp] + [""] - elif n in self.idimgs: - images[starting_line + len(text)] = self.imgs[n] - text += [line.center(textwidth)] - formatting += [ - InlineStyle( - row=starting_line + len(text) - 1, - col=0, - n_letters=len(text[-1]), - attr=self.attr_bold, - ) - ] - text += [""] - else: - text += textwrap.wrap(line, textwidth) + [""] - - endline = len(text) # -1 - - left_adjustment = 3 if n in self.idbull | self.idinde else 0 - - for spans in italic_groups.get(n, []): - italics = HTMLtoLines._adjust_wrapped_spans( - text[startline:endline], - spans, - line_adjustment=startline, - left_adjustment=left_adjustment, - ) - for span in italics: - formatting.append( - InlineStyle( - row=starting_line + span.start.row, - col=span.start.col, - n_letters=span.n_letters, - attr=self.attr_italic, - ) - ) - - for spans in bold_groups.get(n, []): - bolds = HTMLtoLines._adjust_wrapped_spans( - text[startline:endline], - spans, - line_adjustment=startline, - left_adjustment=left_adjustment, - ) - for span in bolds: - formatting.append( - InlineStyle( - row=starting_line + span.start.row, - col=span.start.col, - n_letters=span.n_letters, - attr=self.attr_bold, - ) - ) - - # chapter suffix - text += ["***".center(textwidth)] - - return TextStructure( - text_lines=tuple(text), - image_maps=images, - section_rows=sect, - formatting=tuple(formatting), - ) - - -# }}} - - -# App Configuration {{{ - - -class AppData: - @property - def prefix(self) -> Optional[str]: - """Return None if there exists no homedir | userdir""" - prefix: Optional[str] = None - - # UNIX filesystem - homedir = os.getenv("HOME") - # WIN filesystem - userdir = os.getenv("USERPROFILE") - - if homedir: - if os.path.isdir(os.path.join(homedir, ".config")): - prefix = os.path.join(homedir, ".config", "epy") - else: - prefix = os.path.join(homedir, ".epy") - elif userdir: - prefix = os.path.join(userdir, ".epy") - - if prefix: - os.makedirs(prefix, exist_ok=True) - - return prefix - - -class Config(AppData): - def __init__(self): - setting_dict = dataclasses.asdict(Settings()) - keymap_dict = dataclasses.asdict(CfgDefaultKeymaps()) - keymap_builtin_dict = dataclasses.asdict(CfgBuiltinKeymaps()) - - if os.path.isfile(self.filepath): - with open(self.filepath) as f: - cfg_user = json.load(f) - setting_dict = Config.update_dict(setting_dict, cfg_user["Setting"]) - keymap_dict = Config.update_dict(keymap_dict, cfg_user["Keymap"]) - else: - self.save({"Setting": setting_dict, "Keymap": keymap_dict}) - - keymap_dict_tuple = {k: tuple(v) for k, v in keymap_dict.items()} - keymap_updated = { - k: tuple([Key(i) for i in v]) - for k, v in Config.update_keys_tuple(keymap_dict_tuple, keymap_builtin_dict).items() - } - - if sys.platform == "win32": - setting_dict["PageScrollAnimation"] = False - - self.setting = Settings(**setting_dict) - self.keymap = Keymap(**keymap_updated) - # to build help menu text - self.keymap_user_dict = keymap_dict - - @property - def filepath(self) -> str: - return os.path.join(self.prefix, "configuration.json") if self.prefix else os.devnull - - def save(self, cfg_dict): - with open(self.filepath, "w") as file: - json.dump(cfg_dict, file, indent=2) - - @staticmethod - def update_dict( - old_dict: Mapping[str, Union[str, int, bool]], - new_dict: Mapping[str, Union[str, int, bool]], - place_new=False, - ) -> Mapping[str, Union[str, int, bool]]: - """Returns a copy of `old_dict` after updating it with `new_dict`""" - - result = {**old_dict} - for k, v in new_dict.items(): - if k in result: - result[k] = new_dict[k] - elif place_new: - result[k] = new_dict[k] - - return result - - @staticmethod - def update_keys_tuple( - old_keys: Mapping[str, Tuple[str, ...]], - new_keys: Mapping[str, Tuple[str, ...]], - place_new: bool = False, - ) -> Mapping[str, Tuple[str, ...]]: - """Returns a copy of `old_keys` after updating it with `new_keys` - by appending the tuple value and removes duplicate""" - - result = {**old_keys} - for k, v in new_keys.items(): - if k in result: - result[k] = tuple(set(result[k] + new_keys[k])) - elif place_new: - result[k] = tuple(set(new_keys[k])) - - return result - - -class State(AppData): - """ - Use sqlite3 instead of JSON (in older version) - to shift the weight from memory to process - """ - - def __init__(self): - if not os.path.isfile(self.filepath): - self.init_db() - - @property - def filepath(self) -> str: - return os.path.join(self.prefix, "states.db") if self.prefix else os.devnull - - def get_from_history(self) -> List[LibraryItem]: - try: - conn = sqlite3.connect(self.filepath) - cur = conn.cursor() - cur.execute( - """ - SELECT last_read, filepath, title, author, reading_progress - FROM library ORDER BY last_read DESC - """ - ) - results = cur.fetchall() - library_items: List[LibraryItem] = [] - for result in results: - library_items.append( - LibraryItem( - last_read=datetime.fromisoformat(result[0]), - filepath=result[1], - title=result[2], - author=result[3], - reading_progress=result[4], - ) - ) - return library_items - finally: - conn.close() - - def delete_from_library(self, filepath: str) -> None: - try: - conn = sqlite3.connect(self.filepath) - conn.execute("PRAGMA foreign_keys = ON") - conn.execute("DELETE FROM reading_states WHERE filepath=?", (filepath,)) - conn.commit() - finally: - conn.close() - - def get_last_read(self) -> Optional[str]: - library = self.get_from_history() - return library[0].filepath if library else None - - def update_library(self, ebook: Ebook, reading_progress: Optional[float]) -> None: - try: - metadata = ebook.get_meta() - conn = sqlite3.connect(self.filepath) - conn.execute( - """ - INSERT OR REPLACE INTO library (filepath, title, author, reading_progress) - VALUES (?, ?, ?, ?) - """, - (ebook.path, metadata.title, metadata.creator, reading_progress), - ) - conn.commit() - finally: - conn.close() - - def get_last_reading_state(self, ebook: Ebook) -> ReadingState: - try: - conn = sqlite3.connect(self.filepath) - conn.row_factory = sqlite3.Row - cur = conn.cursor() - cur.execute("SELECT * FROM reading_states WHERE filepath=?", (ebook.path,)) - result = cur.fetchone() - if result: - result = dict(result) - del result["filepath"] - return ReadingState(**result, section=None) - return ReadingState(content_index=0, textwidth=80, row=0, rel_pctg=None, section=None) - finally: - conn.close() - - def set_last_reading_state(self, ebook: Ebook, reading_state: ReadingState) -> None: - try: - conn = sqlite3.connect(self.filepath) - conn.execute( - """ - INSERT OR REPLACE INTO reading_states - VALUES (:filepath, :content_index, :textwidth, :row, :rel_pctg) - """, - {"filepath": ebook.path, **dataclasses.asdict(reading_state)}, - ) - conn.commit() - finally: - conn.close() - - def insert_bookmark(self, ebook: Ebook, name: str, reading_state: ReadingState) -> None: - try: - conn = sqlite3.connect(self.filepath) - conn.execute( - """ - INSERT INTO bookmarks - VALUES (:id, :filepath, :name, :content_index, :textwidth, :row, :rel_pctg) - """, - { - "id": hashlib.sha1(f"{ebook.path}{name}".encode()).hexdigest()[:10], - "filepath": ebook.path, - "name": name, - **dataclasses.asdict(reading_state), - }, - ) - conn.commit() - finally: - conn.close() - - def delete_bookmark(self, ebook: Ebook, name: str) -> None: - try: - conn = sqlite3.connect(self.filepath) - conn.execute("DELETE FROM bookmarks WHERE filepath=? AND name=?", (ebook.path, name)) - conn.commit() - finally: - conn.close() - - def get_bookmarks(self, ebook: Ebook) -> List[Tuple[str, ReadingState]]: - try: - conn = sqlite3.connect(self.filepath) - conn.row_factory = sqlite3.Row - cur = conn.cursor() - cur.execute("SELECT * FROM bookmarks WHERE filepath=?", (ebook.path,)) - results = cur.fetchall() - bookmarks: List[Tuple[str, ReadingState]] = [] - for result in results: - tmp_dict = dict(result) - name = tmp_dict["name"] - tmp_dict = { - k: v - for k, v in tmp_dict.items() - if k in ("content_index", "textwidth", "row", "rel_pctg") - } - bookmarks.append((name, ReadingState(**tmp_dict))) - return bookmarks - finally: - conn.close() - - def init_db(self) -> None: - try: - conn = sqlite3.connect(self.filepath) - conn.executescript( - """ - CREATE TABLE reading_states ( - filepath TEXT PRIMARY KEY, - content_index INTEGER, - textwidth INTEGER, - row INTEGER, - rel_pctg REAL - ); - - CREATE TABLE library ( - last_read DATETIME DEFAULT (datetime('now','localtime')), - filepath TEXT PRIMARY KEY, - title TEXT, - author TEXT, - reading_progress REAL, - FOREIGN KEY (filepath) REFERENCES reading_states(filepath) - ON DELETE CASCADE - ); - - CREATE TABLE bookmarks ( - id TEXT PRIMARY KEY, - filepath TEXT, - name TEXT, - content_index INTEGER, - textwidth INTEGER, - row INTEGER, - rel_pctg REAL, - FOREIGN KEY (filepath) REFERENCES reading_states(filepath) - ON DELETE CASCADE - ); - """ - ) - conn.commit() - finally: - conn.close() - - -# }}} - - -# Text Board {{{ - - -class InfiniBoard: - """ - Wrapper for curses screen to render infinite texts. - The idea is instead of pre render all the text before reading, - this will only renders part of text on demand by which available - page on screen. - - And what this does is only drawing text/string on curses screen - without .clear() or .refresh() to optimize performance. - """ - - def __init__( - self, - screen, - text: Tuple[str, ...], - textwidth: int = 80, - default_style: Tuple[InlineStyle, ...] = tuple(), - spread: int = 1, - ): - self.screen = screen - self.screen_rows, self.screen_cols = self.screen.getmaxyx() - self.textwidth = textwidth - self.x = ((self.screen_cols - self.textwidth) // 2) + 1 - self.text = text - self.total_lines = len(text) - self.default_style: Tuple[InlineStyle, ...] = default_style - self.temporary_style: Tuple[InlineStyle, ...] = () - self.spread = spread - - if self.spread == 2: - self.x = DoubleSpreadPadding.LEFT.value - self.x_alt = ( - DoubleSpreadPadding.LEFT.value + self.textwidth + DoubleSpreadPadding.MIDDLE.value - ) - - def feed_temporary_style(self, styles: Optional[Tuple[InlineStyle, ...]] = None) -> None: - """Reset styling if `styles` is None""" - self.temporary_style = styles if styles else () - - def render_styles( - self, row: int, styles: Tuple[InlineStyle, ...] = (), bottom_padding: int = 0 - ) -> None: - for i in styles: - if i.row in range(row, row + self.screen_rows - bottom_padding): - self.chgat(row, i.row, i.col, i.n_letters, self.screen.getbkgd() | i.attr) - - if self.spread == 2 and i.row in range( - row + self.screen_rows - bottom_padding, - row + 2 * (self.screen_rows - bottom_padding), - ): - self.chgat( - row, - i.row - (self.screen_rows - bottom_padding), - -self.x + self.x_alt + i.col, - i.n_letters, - self.screen.getbkgd() | i.attr, - ) - - def getch(self) -> Union[NoUpdate, Key]: - input = self.screen.getch() - if input == -1: - return NoUpdate() - return Key(input) - - def getbkgd(self): - return self.screen.getbkgd() - - def chgat(self, row: int, y: int, x: int, n: int, attr: int) -> None: - self.screen.chgat(y - row, self.x + x, n, attr) - - def write(self, row: int, bottom_padding: int = 0) -> None: - for n_row in range(min(self.screen_rows - bottom_padding, self.total_lines - row)): - text_line = self.text[row + n_row] - self.screen.addstr(n_row, self.x, text_line) - - if ( - self.spread == 2 - and row + self.screen_rows - bottom_padding + n_row < self.total_lines - ): - text_line = self.text[row + self.screen_rows - bottom_padding + n_row] - # TODO: clean this up - if re.search("\\[IMG:[0-9]+\\]", text_line): - self.screen.addstr( - n_row, self.x_alt, text_line.center(self.textwidth), curses.A_BOLD - ) - else: - self.screen.addstr(n_row, self.x_alt, text_line) - - self.render_styles(row, self.default_style, bottom_padding) - self.render_styles(row, self.temporary_style, bottom_padding) - # self.screen.refresh() - - def write_n( - self, - row: int, - n: int = 1, - direction: Direction = Direction.FORWARD, - bottom_padding: int = 0, - ) -> None: - assert n > 0 - for n_row in range(min(self.screen_rows - bottom_padding, self.total_lines - row)): - text_line = self.text[row + n_row] - if direction == Direction.FORWARD: - # self.screen.addnstr(n_row, self.x + self.textwidth - n, self.text[row+n_row], n) - # `+ " " * (self.textwidth - len(self.text[row + n_row]))` is workaround to - # to prevent curses trace because not calling screen.clear() - self.screen.addnstr( - n_row, - self.x + self.textwidth - n, - text_line + " " * (self.textwidth - len(text_line)), - n, - ) - - if ( - self.spread == 2 - and row + self.screen_rows - bottom_padding + n_row < self.total_lines - ): - text_line_alt = self.text[row + n_row + self.screen_rows - bottom_padding] - self.screen.addnstr( - n_row, - self.x_alt + self.textwidth - n, - text_line_alt + " " * (self.textwidth - len(text_line_alt)), - n, - ) - - else: - if text_line[self.textwidth - n :]: - self.screen.addnstr(n_row, self.x, text_line[self.textwidth - n :], n) - - if ( - self.spread == 2 - and row + self.screen_rows - bottom_padding + n_row < self.total_lines - ): - text_line_alt = self.text[row + n_row + self.screen_rows - bottom_padding] - self.screen.addnstr( - n_row, - self.x_alt, - text_line_alt[self.textwidth - n :], - n, - ) - - -# }}} - - -# Helpers & Utils {{{ - - -def coerce_to_int(string: str) -> Optional[int]: - try: - return int(string) - except ValueError: - return None - - -def cleanup_library(state: State) -> None: - """Cleanup non-existent file from library""" - library_items = state.get_from_history() - for item in library_items: - if not os.path.isfile(item.filepath) and not is_url(item.filepath): - state.delete_from_library(item.filepath) - - -def get_nth_file_from_library(state: State, n) -> Optional[LibraryItem]: - library_items = state.get_from_history() - try: - return library_items[n - 1] - except IndexError: - return None - - -def get_matching_library_item( - state: State, pattern: str, threshold: float = 0.5 -) -> Optional[LibraryItem]: - matches: List[Tuple[LibraryItem, float]] = [] # [(library_item, match_value), ...] - library_items = state.get_from_history() - if not library_items: - return None - - for item in library_items: - tomatch = f"{item.title} - {item.author}" # item.filepath - match_value = sum( - [i.size for i in SM(None, tomatch.lower(), pattern.lower()).get_matching_blocks()] - ) / float(len(pattern)) - matches.append( - ( - item, - match_value, - ) - ) - - sorted_matches = sorted(matches, key=lambda x: -x[1]) - first_match_item, first_match_value = sorted_matches[0] - if first_match_item and first_match_value >= threshold: - return first_match_item - else: - return None - - -def print_reading_history(state: State) -> None: - termc, _ = shutil.get_terminal_size() - library_items = state.get_from_history() - if not library_items: - print("No Reading History.") - return - - print("Reading History:") - dig = len(str(len(library_items) + 1)) - tcols = termc - dig - 2 - for n, item in enumerate(library_items): - print( - "{} {}".format( - str(n + 1).rjust(dig), - truncate(str(item), "...", tcols, tcols - 3), - ) - ) - - -def is_url(string: str) -> bool: - try: - tmp = urlparse(string) - return all([tmp.scheme, tmp.netloc]) - except ValueError: - return False - - -def construct_speaker( - preferred: Optional[str] = None, args: List[str] = [] -) -> Optional[SpeakerBaseModel]: - speakers = ( - sorted(SPEAKERS, key=lambda x: int(x.cmd == preferred), reverse=True) - if preferred - else SPEAKERS - ) - speaker = next((speaker for speaker in speakers if speaker.available), None) - return speaker(args) if speaker else None - - -def parse_html( - html_src: str, - *, - textwidth: Optional[int] = None, - section_ids: Optional[Set[str]] = None, - starting_line: int = 0, -) -> Union[Tuple[str, ...], TextStructure]: - """ - Parse html string into TextStructure - - :param html_src: html str to parse - :param textwidth: textwidth to count max length of returned TextStructure - if None given, sequence of text as paragraph is returned - :param section_ids: set of section ids to look for inside html tag attr - :return: Tuple[str, ...] if textwidth not given else TextStructure - """ - if not section_ids: - section_ids = set() - - parser = HTMLtoLines(section_ids) - # try: - parser.feed(html_src) - parser.close() - # except: - # pass - - return parser.get_structured_text(textwidth, starting_line) - - -def dump_ebook_content(filepath: str) -> None: - ebook = get_ebook_obj(filepath) - try: - try: - ebook.initialize() - except Exception as e: - sys.exit("ERROR: Badly-structured ebook.\n" + str(e)) - for i in ebook.contents: - content = ebook.get_raw_text(i) - src_lines = parse_html(content) - assert isinstance(src_lines, tuple) - # sys.stdout.reconfigure(encoding="utf-8") # Python>=3.7 - for j in src_lines: - sys.stdout.buffer.write((j + "\n\n").encode("utf-8")) - finally: - ebook.cleanup() - - -def merge_text_structures( - text_structure_first: TextStructure, text_structure_second: TextStructure -) -> TextStructure: - return TextStructure( - text_lines=text_structure_first.text_lines + text_structure_second.text_lines, - image_maps={**text_structure_first.image_maps, **text_structure_second.image_maps}, - section_rows={**text_structure_first.section_rows, **text_structure_second.section_rows}, - formatting=text_structure_first.formatting + text_structure_second.formatting, - ) - - -def construct_relative_reading_state( - abs_reading_state: ReadingState, totlines_per_content: Sequence[int] -) -> ReadingState: - """ - :param abs_reading_state: ReadingState absolute to whole book when Setting.Seamless==True - :param totlines_per_content: sequence of total lines per book content - :return: new ReadingState relative to per content of the book - """ - index = 0 - cumulative_contents_lines = 0 - all_contents_lines = sum(totlines_per_content) - # for n, content_lines in enumerate(totlines_per_content): - # cumulative_contents_lines += content_lines - # if cumulative_contents_lines > abs_reading_state.row: - # return - while True: - content_lines = totlines_per_content[index] - cumulative_contents_lines += content_lines - if cumulative_contents_lines > abs_reading_state.row: - break - index += 1 - - return ReadingState( - content_index=index, - textwidth=abs_reading_state.textwidth, - row=abs_reading_state.row - cumulative_contents_lines + content_lines, - rel_pctg=abs_reading_state.rel_pctg - - ((cumulative_contents_lines - content_lines) / all_contents_lines) - if abs_reading_state.rel_pctg - else None, - section=abs_reading_state.section, - ) - - -def get_ebook_obj(filepath: str) -> Ebook: - file_ext = os.path.splitext(filepath)[1].lower() - if is_url(filepath): - return URL(filepath) - elif file_ext in {".epub", ".epub3"}: - return Epub(filepath) - elif file_ext == ".fb2": - return FictionBook(filepath) - elif MOBI_SUPPORT and file_ext == ".mobi": - return Mobi(filepath) - elif MOBI_SUPPORT and file_ext in {".azw", ".azw3"}: - return Azw(filepath) - elif not MOBI_SUPPORT and file_ext in {".mobi", ".azw3"}: - sys.exit( - "ERROR: Format not supported. (Supported: epub, fb2). " - "To get mobi and azw3 support, install epy via pip. " - ) - else: - sys.exit("ERROR: Format not supported. (Supported: epub, fb2)") - - -def tuple_subtract(tuple_one: Tuple[Any, ...], tuple_two: Tuple[Any, ...]) -> Tuple[Any, ...]: - """ - Returns tuple with members in tuple_one - but not in tuple_two - """ - return tuple(i for i in tuple_one if i not in tuple_two) - - -def pgup(current_row: int, window_height: int, counter: int = 1) -> int: - if current_row >= (window_height) * counter: - return current_row - (window_height) * counter - else: - return 0 - - -def pgdn(current_row: int, total_lines: int, window_height: int, counter: int = 1) -> int: - if current_row + (window_height * counter) <= total_lines - window_height: - return current_row + (window_height * counter) - else: - current_row = total_lines - window_height - if current_row < 0: - return 0 - return current_row - - -def pgend(total_lines: int, window_height: int) -> int: - if total_lines - window_height >= 0: - return total_lines - window_height - else: - return 0 - - -def truncate(teks: str, subtitution_text: str, maxlen: int, startsub: int = 0) -> str: - """ - Truncate text - - eg. - :param teks: 'This is long silly dummy text' - :param subtitution_text: '...' - :param maxlen: 12 - :param startsub: 3 - :return: 'This...ly dummy text' - """ - if startsub > maxlen: - raise ValueError("Var startsub cannot be bigger than maxlen.") - elif len(teks) <= maxlen: - return teks - else: - lensu = len(subtitution_text) - beg = teks[:startsub] - mid = ( - subtitution_text - if lensu <= maxlen - startsub - else subtitution_text[: maxlen - startsub] - ) - end = teks[startsub + lensu - maxlen :] if lensu < maxlen - startsub else "" - return beg + mid + end - - -def safe_curs_set(state: int) -> None: - try: - curses.curs_set(state) - except: - return - - -def resolve_path(current_dir: str, relative_path: str) -> str: - """ - Resolve path containing dots - eg. '/foo/bar/book.html' + '../img.png' = '/foo/img.png' - NOTE: '/' suffix is important to tell that current dir in 'bar' - """ - # can also using os.path.normpath() - # but if the image in zipfile then posix path is mandatory - return urljoin(current_dir, relative_path) - - -def find_current_content_index( - toc_entries: Tuple[TocEntry, ...], toc_secid: Mapping[str, int], index: int, y: int -) -> int: - ntoc = 0 - for n, toc_entry in enumerate(toc_entries): - if toc_entry.content_index <= index: - if y >= toc_secid.get(toc_entry.section, 0): # type: ignore - ntoc = n - return ntoc - - -def count_letters(ebook: Ebook) -> LettersCount: - per_content_counts: List[int] = [] - cumulative_counts: List[int] = [] - # assert isinstance(ebook.contents, tuple) - for i in ebook.contents: - content = ebook.get_raw_text(i) - src_lines = parse_html(content) - assert isinstance(src_lines, tuple) - cumulative_counts.append(sum(per_content_counts)) - per_content_counts.append(sum([len(re.sub(r"\s", "", j)) for j in src_lines])) - - return LettersCount(all=sum(per_content_counts), cumulative=tuple(cumulative_counts)) - - -def count_letters_parallel(ebook: Ebook, child_conn) -> None: - child_conn.send(count_letters(ebook)) - child_conn.close() - - -def choice_win(allowdel=False): - """ - Conjure options window by wrapping a window function - which has a return type of tuple in the form of - (title, list_to_chose, initial_active_index, windows_key_to_toggle) - and return tuple of (returned_key, chosen_index, chosen_index_to_delete) - """ - - def inner_f(listgen): - @wraps(listgen) - def wrapper(self, *args, **kwargs): - rows, cols = self.screen.getmaxyx() - hi, wi = rows - 4, cols - 4 - Y, X = 2, 2 - chwin = curses.newwin(hi, wi, Y, X) - if self.is_color_supported: - chwin.bkgd(self.screen.getbkgd()) - - title, ch_list, index, key = listgen(self, *args, **kwargs) - - if len(title) > cols - 8: - title = title[: cols - 8] - - chwin.box() - chwin.keypad(True) - chwin.addstr(1, 2, title) - chwin.addstr(2, 2, "-" * len(title)) - if allowdel: - chwin.addstr(3, 2, "HINT: Press 'd' to delete.") - key_chwin = 0 - - totlines = len(ch_list) - chwin.refresh() - pad = curses.newpad(totlines, wi - 2) - if self.is_color_supported: - pad.bkgd(self.screen.getbkgd()) - - pad.keypad(True) - - padhi = rows - 5 - Y - 4 + 1 - (1 if allowdel else 0) - # padhi = rows - 5 - Y - 4 + 1 - 1 - y = 0 - if index in range(padhi // 2, totlines - padhi // 2): - y = index - padhi // 2 + 1 - span = [] - - for n, i in enumerate(ch_list): - # strs = " " + str(n+1).rjust(d) + " " + i[0] - # remove newline from choice entries - # mostly happens in FictionBook (.fb2) format - strs = " " + i.replace("\n", " ") - strs = strs[0 : wi - 3] - pad.addstr(n, 0, strs) - span.append(len(strs)) - - countstring = "" - while key_chwin not in self.keymap.Quit + key: - if countstring == "": - count = 1 - else: - count = int(countstring) - if key_chwin in tuple(Key(i) for i in range(48, 58)): # i.e., k is a numeral - countstring = countstring + key_chwin.char - else: - if key_chwin in self.keymap.ScrollUp + self.keymap.PageUp: - index -= count - if index < 0: - index = 0 - elif key_chwin in self.keymap.ScrollDown or key_chwin in self.keymap.PageDown: - index += count - if index + 1 >= totlines: - index = totlines - 1 - elif key_chwin in self.keymap.Follow: - chwin.clear() - chwin.refresh() - return None, index, None - elif key_chwin in self.keymap.BeginningOfCh: - index = 0 - elif key_chwin in self.keymap.EndOfCh: - index = totlines - 1 - elif key_chwin == Key("D") and allowdel: - return None, (0 if index == 0 else index - 1), index - # chwin.redrawwin() - # chwin.refresh() - elif key_chwin == Key("d") and allowdel: - resk, resp, _ = self.show_win_options( - "Delete '{}'?".format(ch_list[index]), - ["(Y)es", "(N)o"], - 0, - (Key("n"),), - ) - if resk is not None: - key_chwin = resk - continue - elif resp == 0: - return None, (0 if index == 0 else index - 1), index - chwin.redrawwin() - chwin.refresh() - elif key_chwin in {Key(i) for i in ["Y", "y", "N", "n"]} and ch_list == [ - "(Y)es", - "(N)o", - ]: - if key_chwin in {Key("Y"), Key("y")}: - return None, 0, None - else: - return None, 1, None - elif key_chwin in tuple_subtract(self._win_keys, key): - chwin.clear() - chwin.refresh() - return key_chwin, index, None - countstring = "" - - while index not in range(y, y + padhi): - if index < y: - y -= 1 - else: - y += 1 - - for n in range(totlines): - att = curses.A_REVERSE if index == n else curses.A_NORMAL - pre = ">>" if index == n else " " - pad.addstr(n, 0, pre) - pad.chgat(n, 0, span[n], pad.getbkgd() | att) - - pad.refresh(y, 0, Y + 4 + (1 if allowdel else 0), X + 4, rows - 5, cols - 6) - # pad.refresh(y, 0, Y+5, X+4, rows - 5, cols - 6) - key_chwin = Key(chwin.getch()) - if key_chwin == Key(curses.KEY_MOUSE): - mouse_event = curses.getmouse() - if mouse_event[4] == curses.BUTTON4_PRESSED: - key_chwin = self.keymap.ScrollUp[0] - elif mouse_event[4] == 2097152: - key_chwin = self.keymap.ScrollDown[0] - elif mouse_event[4] == curses.BUTTON1_DOUBLE_CLICKED: - if ( - mouse_event[2] >= 6 - and mouse_event[2] < rows - 4 - and mouse_event[2] < 6 + totlines - ): - index = mouse_event[2] - 6 + y - key_chwin = self.keymap.Follow[0] - elif ( - mouse_event[4] == curses.BUTTON1_CLICKED - and mouse_event[2] >= 6 - and mouse_event[2] < rows - 4 - and mouse_event[2] < 6 + totlines - ): - if index == mouse_event[2] - 6 + y: - key_chwin = self.keymap.Follow[0] - continue - index = mouse_event[2] - 6 + y - elif mouse_event[4] == curses.BUTTON3_CLICKED: - key_chwin = self.keymap.Quit[0] - - chwin.clear() - chwin.refresh() - return None, None, None - - return wrapper - - return inner_f - - -def text_win(textfunc): - @wraps(textfunc) - def wrapper(self, *args, **kwargs) -> Union[NoUpdate, Key]: - rows, cols = self.screen.getmaxyx() - hi, wi = rows - 4, cols - 4 - Y, X = 2, 2 - textw = curses.newwin(hi, wi, Y, X) - if self.is_color_supported: - textw.bkgd(self.screen.getbkgd()) - - title, raw_texts, key = textfunc(self, *args, **kwargs) - - if len(title) > cols - 8: - title = title[: cols - 8] - - texts = [] - for i in raw_texts.splitlines(): - texts += textwrap.wrap(i, wi - 6, drop_whitespace=False) - - textw.box() - textw.keypad(True) - textw.addstr(1, 2, title) - textw.addstr(2, 2, "-" * len(title)) - key_textw: Union[NoUpdate, Key] = NoUpdate() - - totlines = len(texts) - - pad = curses.newpad(totlines, wi - 2) - if self.is_color_supported: - pad.bkgd(self.screen.getbkgd()) - - pad.keypad(True) - for n, i in enumerate(texts): - pad.addstr(n, 0, i) - y = 0 - textw.refresh() - pad.refresh(y, 0, Y + 4, X + 4, rows - 5, cols - 6) - padhi = rows - 8 - Y - - while key_textw not in self.keymap.Quit + key: - if key_textw in self.keymap.ScrollUp and y > 0: - y -= 1 - elif key_textw in self.keymap.ScrollDown and y < totlines - hi + 6: - y += 1 - elif key_textw in self.keymap.PageUp: - y = pgup(y, padhi) - elif key_textw in self.keymap.PageDown: - y = pgdn(y, totlines, padhi) - elif key_textw in self.keymap.BeginningOfCh: - y = 0 - elif key_textw in self.keymap.EndOfCh: - y = pgend(totlines, padhi) - elif key_textw in tuple_subtract(self._win_keys, key): - textw.clear() - textw.refresh() - return key_textw - pad.refresh(y, 0, 6, 5, rows - 5, cols - 5) - key_textw = Key(textw.getch()) - - textw.clear() - textw.refresh() - return NoUpdate() - - return wrapper - - -# }}} - - -# Main Reading Interface {{{ - - -class Reader: - def __init__(self, screen, ebook: Ebook, config: Config, state: State): - - self.setting = config.setting - self.keymap = config.keymap - # to build help menu text - self.keymap_user_dict = config.keymap_user_dict - - self.seamless = self.setting.SeamlessBetweenChapters - - # keys that will make - # windows exit and return the said key - self._win_keys = ( - # curses.KEY_RESIZE is a must - (Key(curses.KEY_RESIZE),) - + self.keymap.TableOfContents - + self.keymap.Metadata - + self.keymap.Help - ) - - # screen initialization - self.screen = screen - self.screen.keypad(True) - safe_curs_set(0) - if self.setting.MouseSupport: - curses.mousemask(-1) - # curses.mouseinterval(0) - self.screen.clear() - - # screen color - self.is_color_supported: bool = False - try: - curses.use_default_colors() - curses.init_pair(1, self.setting.DefaultColorFG, self.setting.DefaultColorBG) - curses.init_pair(2, self.setting.DarkColorFG, self.setting.DarkColorBG) - curses.init_pair(3, self.setting.LightColorFG, self.setting.LightColorBG) - self.screen.bkgd(curses.color_pair(1)) - self.is_color_supported = True - except: - self.is_color_supported = False - - # show loader and start heavy resources processes - self.show_loader(subtext="initalizing ebook") - - # main ebook object - self.ebook = ebook - try: - self.ebook.initialize() - except (KeyboardInterrupt, Exception) as e: - self.ebook.cleanup() - if DEBUG: - raise e - else: - sys.exit("ERROR: Badly-structured ebook.\n" + str(e)) - - # state - self.state = state - - # page scroll animation - self.page_animation: Optional[Direction] = None - - # show reading progress - self.show_reading_progress: bool = self.setting.ShowProgressIndicator - self.reading_progress: Optional[float] = None # calculate after count_letters() - - # search storage - self.search_data: Optional[SearchData] = None - - # double spread - self.spread = 2 if self.setting.StartWithDoubleSpread else 1 - - # jumps marker container - self.jump_list: Dict[str, ReadingState] = dict() - - # TTS speaker utils - self._tts_speaker: Optional[SpeakerBaseModel] = construct_speaker( - self.setting.PreferredTTSEngine, self.setting.TTSEngineArgs - ) - self.tts_support: bool = bool(self._tts_speaker) - self.is_speaking: bool = False - - # multi process & progress percentage - self._multiprocess_support: bool = False if multiprocessing.cpu_count() == 1 else True - self._process_counting_letter: Optional[multiprocessing.Process] = None - self.letters_count: Optional[LettersCount] = None - - def run_counting_letters(self): - if self._multiprocess_support: - try: - self._proc_parent, self._proc_child = multiprocessing.Pipe() - self._process_counting_letter = multiprocessing.Process( - name="epy-subprocess-counting-letters", - target=count_letters_parallel, - args=(self.ebook, self._proc_child), - ) - # forking will raise - # zlib.error: Error -3 while decompressing data: invalid distance too far back - self._process_counting_letter.start() - except Exception as e: - if DEBUG: - raise e - self._multiprocess_support = False - if not self._multiprocess_support: - self.letters_count = count_letters(self.ebook) - - def try_assign_letters_count(self, *, force_wait=False) -> None: - if isinstance(self._process_counting_letter, multiprocessing.Process): - if force_wait and self._process_counting_letter.is_alive(): - self._process_counting_letter.join() - - if self._process_counting_letter.exitcode == 0: - self.letters_count = self._proc_parent.recv() - self._proc_parent.close() - self._process_counting_letter.terminate() - self._process_counting_letter.close() - self._process_counting_letter = None - - def calculate_reading_progress( - self, letters_per_content: List[int], reading_state: ReadingState - ) -> None: - if self.letters_count: - self.reading_progress = ( - self.letters_count.cumulative[reading_state.content_index] - + sum( - letters_per_content[: reading_state.row + (self.screen_rows * self.spread) - 1] - ) - ) / self.letters_count.all - - @property - def screen_rows(self) -> int: - return self.screen.getmaxyx()[0] - - @property - def screen_cols(self) -> int: - return self.screen.getmaxyx()[1] - - @property - def ext_dict_app(self) -> Optional[str]: - self._ext_dict_app: Optional[str] = None - - if shutil.which(self.setting.DictionaryClient.split()[0]): - self._ext_dict_app = self.setting.DictionaryClient - else: - for i in DICT_PRESET_LIST: - if shutil.which(i) is not None: - self._ext_dict_app = i - break - if self._ext_dict_app in {"sdcv"}: - self._ext_dict_app += " -n" - - return self._ext_dict_app - - @property - def image_viewer(self) -> Optional[str]: - self._image_viewer: Optional[str] = None - - if shutil.which(self.setting.DefaultViewer.split()[0]) is not None: - self._image_viewer = self.setting.DefaultViewer - elif sys.platform == "win32": - self._image_viewer = "start" - elif sys.platform == "darwin": - self._image_viewer = "open" - else: - for i in VIEWER_PRESET_LIST: - if shutil.which(i) is not None: - self._image_viewer = i - break - - if self._image_viewer in {"gio"}: - self._image_viewer += " open" - - return self._image_viewer - - def open_image(self, pad, name, bstr): - sfx = os.path.splitext(name)[1] - fd, path = tempfile.mkstemp(suffix=sfx) - try: - with os.fdopen(fd, "wb") as tmp: - # tmp.write(epub.file.read(src)) - tmp.write(bstr) - # run(VWR + " " + path, shell=True) - subprocess.call( - self.image_viewer + " " + path, - shell=True, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - k = pad.getch() - finally: - os.remove(path) - return k - - def show_loader(self, *, loader_str: str = "\u231B", subtext: Optional[str] = None): - self.screen.clear() - rows, cols = self.screen.getmaxyx() - middle_row = (rows - 1) // 2 - self.screen.addstr(middle_row, 0, loader_str.center(cols)) - if subtext: - self.screen.addstr(middle_row + 1, 0, subtext.center(cols)) - # self.screen.addstr(((rows-2)//2)+1, (cols-len(msg))//2, msg) - self.screen.refresh() - - @choice_win(True) - def show_win_options(self, title, options, active_index, key_set): - return title, options, active_index, key_set - - @text_win - def show_win_error(self, title, msg, key): - return title, msg, key - - @choice_win() - def toc(self, toc_entries: Tuple[TocEntry, ...], index: int): - return ( - "Table of Contents", - [i.label for i in toc_entries], - index, - self.keymap.TableOfContents, - ) - - @text_win - def show_win_metadata(self): - if os.path.isfile(self.ebook.path): - mdata = "[File Info]\nPATH: {}\nSIZE: {} MB\n \n[Book Info]\n".format( - self.ebook.path, round(os.path.getsize(self.ebook.path) / 1024 ** 2, 2) - ) - else: - mdata = "[File Info]\nPATH: {}\n \n[Book Info]\n".format(self.ebook.path) - - book_metadata = self.ebook.get_meta() - for field in dataclasses.fields(book_metadata): - value = getattr(book_metadata, field.name) - if value: - value = unescape(re.sub("<[^>]*>", "", value)) - mdata += f"{field.name.title()}: {value}\n" - - return "Metadata", mdata, self.keymap.Metadata - - @text_win - def show_win_help(self): - src = "Key Bindings:\n" - dig = max([len(i) for i in self.keymap_user_dict.values()]) + 2 - for i in self.keymap_user_dict.keys(): - src += "{} {}\n".format( - self.keymap_user_dict[i].rjust(dig), " ".join(re.findall("[A-Z][^A-Z]*", i)) - ) - return "Help", src, self.keymap.Help - - @text_win - def define_word(self, word): - rows, cols = self.screen.getmaxyx() - hi, wi = 5, 16 - Y, X = (rows - hi) // 2, (cols - wi) // 2 - - p = subprocess.Popen( - "{} {}".format(self.ext_dict_app, word), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True, - ) - - dictwin = curses.newwin(hi, wi, Y, X) - dictwin.box() - dictwin.addstr((hi - 1) // 2, (wi - 10) // 2, "Loading...") - dictwin.refresh() - - out, err = p.communicate() - - dictwin.clear() - dictwin.refresh() - - if err == b"": - return "Definition: " + word.upper(), out.decode(), self.keymap.DefineWord - else: - return "Error: " + self.ext_dict_app, err.decode(), self.keymap.DefineWord - - def show_win_choices_bookmarks(self): - idx = 0 - while True: - bookmarks = [i[0] for i in self.state.get_bookmarks(self.ebook)] - if not bookmarks: - return self.keymap.ShowBookmarks[0], None - - retk, idx, todel = self.show_win_options( - "Bookmarks", bookmarks, idx, self.keymap.ShowBookmarks - ) - if todel is not None: - self.state.delete_bookmark(self.ebook, bookmarks[todel]) - else: - return retk, idx - - def show_win_library(self): - while True: - library_items = self.state.get_from_history() - if not library_items: - return self.keymap.Library[0], None - - retk, choice_index, todel_index = self.show_win_options( - "Library", [str(item) for item in library_items], 0, self.keymap.Library - ) - if todel_index is not None: - self.state.delete_from_library(library_items[todel_index].filepath) - else: - return retk, choice_index - - def input_prompt(self, prompt: str) -> Union[NoUpdate, Key, str]: - """ - :param prompt: prompt text - :return: NoUpdate if cancelled or interrupted - Key if curses.KEY_RESIZE triggered - str for successful input - """ - # prevent pad hole when prompting for input while - # other window is active - # pad.refresh(y, 0, 0, x, rows-2, x+width) - rows, cols = self.screen.getmaxyx() - stat = curses.newwin(1, cols, rows - 1, 0) - if self.is_color_supported: - stat.bkgd(self.screen.getbkgd()) - stat.keypad(True) - curses.echo(True) - safe_curs_set(2) - - init_text = "" - - stat.addstr(0, 0, prompt, curses.A_REVERSE) - stat.addstr(0, len(prompt), init_text) - stat.refresh() - - try: - while True: - # NOTE: getch() only handles ascii - # to handle wide char like: é, use get_wch() - ipt = Key(stat.get_wch()) - # get_wch() return ambiguous type - # str for string input but int for function or special keys - # if type(ipt) == str: - # ipt = ord(ipt) - - if ipt == Key(27): - stat.clear() - stat.refresh() - curses.echo(False) - safe_curs_set(0) - return NoUpdate() - elif ipt == Key(10): - stat.clear() - stat.refresh() - curses.echo(False) - safe_curs_set(0) - return init_text - elif ipt in (Key(8), Key(127), Key(curses.KEY_BACKSPACE)): - init_text = init_text[:-1] - elif ipt == Key(curses.KEY_RESIZE): - stat.clear() - stat.refresh() - curses.echo(False) - safe_curs_set(0) - return Key(curses.KEY_RESIZE) - # elif len(init_text) <= maxlen: - else: - init_text += ipt.char - - stat.clear() - stat.addstr(0, 0, prompt, curses.A_REVERSE) - stat.addstr( - 0, - len(prompt), - init_text - if len(prompt + init_text) < cols - else "..." + init_text[len(prompt) - cols + 4 :], - ) - stat.refresh() - except KeyboardInterrupt: - stat.clear() - stat.refresh() - curses.echo(False) - safe_curs_set(0) - return NoUpdate() - - def searching( - self, board: InfiniBoard, src: Sequence[str], reading_state: ReadingState, tot - ) -> Union[NoUpdate, ReadingState, Key]: - # reusable loop indices - i: Any - j: Any - - rows, cols = self.screen.getmaxyx() - # unnecessary - # if self.spread == 2: - # reading_state = dataclasses.replace(reading_state, textwidth=(cols - 7) // 2) - - x = (cols - reading_state.textwidth) // 2 - if self.spread == 1: - x = (cols - reading_state.textwidth) // 2 - else: - x = 2 - - if not self.search_data: - candidate_text = self.input_prompt(" Regex:") - # if isinstance(candidate_text, str) and candidate_text != "": - if isinstance(candidate_text, str) and candidate_text: - self.search_data = SearchData(value=candidate_text) - else: - assert isinstance(candidate_text, NoUpdate) or isinstance(candidate_text, Key) - return candidate_text - - found = [] - try: - pattern = re.compile(self.search_data.value, re.IGNORECASE) - except re.error as reerrmsg: - self.search_data = None - tmpk = self.show_win_error("!Regex Error", str(reerrmsg), tuple()) - return tmpk - - for n, i in enumerate(src): - for j in pattern.finditer(i): - found.append([n, j.span()[0], j.span()[1] - j.span()[0]]) - - if not found: - if ( - self.search_data.direction == Direction.FORWARD - and reading_state.content_index + 1 < tot - ): - return ReadingState( - content_index=reading_state.content_index + 1, - textwidth=reading_state.textwidth, - row=0, - ) - elif ( - self.search_data.direction == Direction.BACKWARD and reading_state.content_index > 0 - ): - return ReadingState( - content_index=reading_state.content_index - 1, - textwidth=reading_state.textwidth, - row=0, - ) - else: - s: Union[NoUpdate, Key] = NoUpdate() - while True: - if s in self.keymap.Quit: - self.search_data = None - self.screen.clear() - self.screen.refresh() - return reading_state - # TODO: maybe >= 0? - elif s == Key("n") and reading_state.content_index == 0: - self.search_data = dataclasses.replace( - self.search_data, direction=Direction.FORWARD - ) - return ReadingState( - content_index=reading_state.content_index + 1, - textwidth=reading_state.textwidth, - row=0, - ) - elif s == Key("N") and reading_state.content_index + 1 == tot: - self.search_data = dataclasses.replace( - self.search_data, direction=Direction.BACKWARD - ) - return ReadingState( - content_index=reading_state.content_index - 1, - textwidth=reading_state.textwidth, - row=0, - ) - - self.screen.clear() - self.screen.addstr( - rows - 1, - 0, - " Finished searching: " + self.search_data.value[: cols - 22] + " ", - curses.A_REVERSE, - ) - board.write(reading_state.row, 1) - self.screen.refresh() - s = board.getch() - - sidx = len(found) - 1 - if self.search_data.direction == Direction.FORWARD: - if reading_state.row > found[-1][0]: - return ReadingState( - content_index=reading_state.content_index + 1, - textwidth=reading_state.textwidth, - row=0, - ) - for n, i in enumerate(found): - if i[0] >= reading_state.row: - sidx = n - break - - s = NoUpdate() - msg = ( - " Searching: " - + self.search_data.value - + " --- Res {}/{} Ch {}/{} ".format( - sidx + 1, len(found), reading_state.content_index + 1, tot - ) - ) - while True: - if s in self.keymap.Quit: - self.search_data = None - # for i in found: - # pad.chgat(i[0], i[1], i[2], pad.getbkgd()) - board.feed_temporary_style() - # pad.format() - # self.screen.clear() - # self.screen.refresh() - return reading_state - elif s == Key("n"): - self.search_data = dataclasses.replace( - self.search_data, direction=Direction.FORWARD - ) - if sidx == len(found) - 1: - if reading_state.content_index + 1 < tot: - return ReadingState( - content_index=reading_state.content_index + 1, - textwidth=reading_state.textwidth, - row=0, - ) - else: - s = NoUpdate() - msg = " Finished searching: " + self.search_data.value + " " - continue - else: - sidx += 1 - msg = ( - " Searching: " - + self.search_data.value - + " --- Res {}/{} Ch {}/{} ".format( - sidx + 1, len(found), reading_state.content_index + 1, tot - ) - ) - elif s == Key("N"): - self.search_data = dataclasses.replace( - self.search_data, direction=Direction.BACKWARD - ) - if sidx == 0: - if reading_state.content_index > 0: - return ReadingState( - content_index=reading_state.content_index - 1, - textwidth=reading_state.textwidth, - row=0, - ) - else: - s = NoUpdate() - msg = " Finished searching: " + self.search_data.value + " " - continue - else: - sidx -= 1 - msg = ( - " Searching: " - + self.search_data.value - + " --- Res {}/{} Ch {}/{} ".format( - sidx + 1, len(found), reading_state.content_index + 1, tot - ) - ) - elif s == Key(curses.KEY_RESIZE): - return Key(curses.KEY_RESIZE) - - # if reading_state.row + rows - 1 > pad.chunks[pad.find_chunkidx(reading_state.row)]: - # reading_state = dataclasses.replace( - # reading_state, row=pad.chunks[pad.find_chunkidx(reading_state.row)] + 1 - # ) - - while found[sidx][0] not in list( - range(reading_state.row, reading_state.row + (rows - 1) * self.spread) - ): - if found[sidx][0] > reading_state.row: - reading_state = dataclasses.replace( - reading_state, row=reading_state.row + ((rows - 1) * self.spread) - ) - else: - reading_state = dataclasses.replace( - reading_state, row=reading_state.row - ((rows - 1) * self.spread) - ) - if reading_state.row < 0: - reading_state = dataclasses.replace(reading_state, row=0) - - # formats = [InlineStyle(row=i[0], col=i[1], n_letters=i[2], attr=curses.A_REVERSE) for i in found] - # pad.feed_style(formats) - styles: List[InlineStyle] = [] - for n, i in enumerate(found): - attr = curses.A_REVERSE if n == sidx else curses.A_NORMAL - # pad.chgat(i[0], i[1], i[2], pad.getbkgd() | attr) - styles.append( - InlineStyle(row=i[0], col=i[1], n_letters=i[2], attr=board.getbkgd() | attr) - ) - board.feed_temporary_style(tuple(styles)) - - self.screen.clear() - self.screen.addstr(rows - 1, 0, msg, curses.A_REVERSE) - self.screen.refresh() - # pad.refresh(reading_state.row, 0, 0, x, rows - 2, x + reading_state.textwidth) - board.write(reading_state.row, 1) - s = board.getch() - - def speaking(self, text): - self.is_speaking = True - self.screen.addstr(self.screen_rows - 1, 0, " Speaking! ", curses.A_REVERSE) - self.screen.refresh() - self.screen.timeout(1) - try: - self._tts_speaker.speak(text) - - while True: - if self._tts_speaker.is_done(): - k = self.keymap.PageDown[0] - break - tmp = self.screen.getch() - k = NoUpdate() if tmp == -1 else Key(tmp) - if k == Key(curses.KEY_MOUSE): - mouse_event = curses.getmouse() - if mouse_event[4] == curses.BUTTON2_CLICKED: - k = self.keymap.Quit[0] - elif mouse_event[4] == curses.BUTTON1_CLICKED: - if mouse_event[1] < self.screen_cols // 2: - k = self.keymap.PageUp[0] - else: - k = self.keymap.PageDown[0] - elif mouse_event[4] == curses.BUTTON4_PRESSED: - k = self.keymap.ScrollUp[0] - elif mouse_event[4] == 2097152: - k = self.keymap.ScrollDown[0] - if ( - k - in self.keymap.Quit - + self.keymap.PageUp - + self.keymap.PageDown - + self.keymap.ScrollUp - + self.keymap.ScrollDown - + (curses.KEY_RESIZE,) - ): - self._tts_speaker.stop() - break - finally: - self.screen.timeout(-1) - self._tts_speaker.cleanup() - - if k in self.keymap.Quit: - self.is_speaking = False - k = NoUpdate() - return k - - def savestate(self, reading_state: ReadingState) -> None: - if self.seamless: - reading_state = self.convert_absolute_reading_state_to_relative(reading_state) - self.state.set_last_reading_state(self.ebook, reading_state) - self.state.update_library(self.ebook, self.reading_progress) - - def cleanup(self) -> None: - self.ebook.cleanup() - - if isinstance(self._process_counting_letter, multiprocessing.Process): - if self._process_counting_letter.is_alive(): - self._process_counting_letter.terminate() - # weird python multiprocessing issue, need to call .join() before .close() - # ValueError: Cannot close a process while it is still running. - # You should first call join() or terminate(). - self._process_counting_letter.join() - self._process_counting_letter.close() - - def convert_absolute_reading_state_to_relative(self, reading_state) -> ReadingState: - if not self.seamless: - raise RuntimeError( - "Reader.convert_absolute_reading_state_to_relative() only implemented when Seamless=True" - ) - return construct_relative_reading_state(reading_state, self.totlines_per_content) - - def convert_relative_reading_state_to_absolute( - self, reading_state: ReadingState - ) -> ReadingState: - if not self.seamless: - raise RuntimeError( - "Reader.convert_relative_reading_state_to_absolute() only implemented when Seamless=True" - ) - - absolute_row = reading_state.row + sum( - self.totlines_per_content[: reading_state.content_index] - ) - absolute_pctg = ( - absolute_row / sum(self.totlines_per_content) if reading_state.rel_pctg else None - ) - - return dataclasses.replace( - reading_state, content_index=0, row=absolute_row, rel_pctg=absolute_pctg - ) - - def get_all_book_contents( - self, reading_state: ReadingState - ) -> Tuple[TextStructure, Tuple[TocEntry, ...], Union[Tuple[str, ...], Tuple[ET.Element, ...]]]: - if not self.seamless: - raise RuntimeError("Reader.get_all_book_contents() only implemented when Seamless=True") - - contents = self.ebook.contents - toc_entries = self.ebook.toc_entries - - text_structure: TextStructure = TextStructure( - text_lines=tuple(), image_maps=dict(), section_rows=dict(), formatting=tuple() - ) - toc_entries_tmp: List[TocEntry] = [] - section_rows_tmp: Dict[str, int] = dict() - - # self.totlines_per_content only defined when Seamless=True - self.totlines_per_content: Tuple[int, ...] = tuple() - - for n, content in enumerate(contents): - self.show_loader(subtext=f"loading contents ({n+1}/{len(contents)})") - starting_line = sum(self.totlines_per_content) - assert isinstance(content, str) or isinstance(content, ET.Element) - text_structure_tmp = parse_html( - self.ebook.get_raw_text(content), - textwidth=reading_state.textwidth, - section_ids=set(toc_entry.section for toc_entry in toc_entries), # type: ignore - starting_line=starting_line, - ) - assert isinstance(text_structure_tmp, TextStructure) - # self.totlines_per_content.append(len(text_structure_tmp.text_lines)) - self.totlines_per_content += (len(text_structure_tmp.text_lines),) - - for toc_entry in toc_entries: - if toc_entry.content_index == n: - if toc_entry.section: - toc_entries_tmp.append(dataclasses.replace(toc_entry, content_index=0)) - else: - section_id_tmp = str(uuid.uuid4()) - toc_entries_tmp.append( - TocEntry(label=toc_entry.label, content_index=0, section=section_id_tmp) - ) - section_rows_tmp[section_id_tmp] = starting_line - - text_structure = merge_text_structures(text_structure, text_structure_tmp) - - text_structure = dataclasses.replace( - text_structure, section_rows={**text_structure.section_rows, **section_rows_tmp} - ) - - return text_structure, tuple(toc_entries_tmp), (self.ebook.contents[0],) - - def get_current_book_content( - self, reading_state: ReadingState - ) -> Tuple[TextStructure, Tuple[TocEntry, ...], Union[Tuple[str, ...], Tuple[ET.Element, ...]]]: - contents = self.ebook.contents - toc_entries = self.ebook.toc_entries - content_path = contents[reading_state.content_index] - content = self.ebook.get_raw_text(content_path) - text_structure = parse_html( # type: ignore - content, - textwidth=reading_state.textwidth, - section_ids=set(toc_entry.section for toc_entry in toc_entries), # type: ignore - ) - return text_structure, toc_entries, contents - - def read(self, reading_state: ReadingState) -> Union[ReadingState, Ebook]: - # reusable loop indices - i: Any - - k = self.keymap.RegexSearch[0] if self.search_data else NoUpdate() - rows, cols = self.screen.getmaxyx() - - mincols_doublespr = ( - DoubleSpreadPadding.LEFT.value - + 22 - + DoubleSpreadPadding.MIDDLE.value - + 22 - + DoubleSpreadPadding.RIGHT.value - ) - if cols < mincols_doublespr: - self.spread = 1 - if self.spread == 2: - reading_state = dataclasses.replace( - reading_state, - textwidth=( - cols - - sum( - [ - DoubleSpreadPadding.LEFT.value, - DoubleSpreadPadding.MIDDLE.value, - DoubleSpreadPadding.RIGHT.value, - ] - ) - ) - // 2, - ) - x = (cols - reading_state.textwidth) // 2 - if self.spread == 2: - x = DoubleSpreadPadding.LEFT.value - - self.show_loader(subtext="loading contents") - # get text structure, toc entries and contents of the book - if self.seamless: - text_structure, toc_entries, contents = self.get_all_book_contents(reading_state) - # adjustment - reading_state = self.convert_relative_reading_state_to_absolute(reading_state) - else: - text_structure, toc_entries, contents = self.get_current_book_content(reading_state) - - totlines = len(text_structure.text_lines) - - if reading_state.row < 0 and totlines <= rows * self.spread: - reading_state = dataclasses.replace(reading_state, row=0) - elif reading_state.rel_pctg is not None: - reading_state = dataclasses.replace( - reading_state, row=round(reading_state.rel_pctg * totlines) - ) - else: - reading_state = dataclasses.replace(reading_state, row=reading_state.row % totlines) - - board = InfiniBoard( - screen=self.screen, - text=text_structure.text_lines, - textwidth=reading_state.textwidth, - default_style=text_structure.formatting, - spread=self.spread, - ) - - letters_per_content: List[int] = [] - for i in text_structure.text_lines: - letters_per_content.append(len(re.sub(r"\s", "", i))) - - self.screen.clear() - self.screen.refresh() - # try-except clause if there is issue - # with curses resize event - board.write(reading_state.row) - - # if reading_state.section is not None - # then override reading_state.row to follow the section - if reading_state.section: - reading_state = dataclasses.replace( - reading_state, row=text_structure.section_rows.get(reading_state.section, 0) - ) - - checkpoint_row: Optional[int] = None - countstring = "" - - try: - while True: - if countstring == "": - count = 1 - else: - count = int(countstring) - if k in tuple(Key(i) for i in range(48, 58)): # i.e., k is a numeral - countstring = countstring + k.char - else: - if k in self.keymap.Quit: - if k == Key(27) and countstring != "": - countstring = "" - else: - self.try_assign_letters_count(force_wait=True) - self.calculate_reading_progress(letters_per_content, reading_state) - - self.savestate( - dataclasses.replace( - reading_state, rel_pctg=reading_state.row / totlines - ) - ) - sys.exit() - - elif k in self.keymap.TTSToggle and self.tts_support: - tospeak = "" - for i in text_structure.text_lines[ - reading_state.row : reading_state.row + (rows * self.spread) - ]: - if re.match(r"^\s*$", i) is not None: - tospeak += "\n. \n" - else: - tospeak += i + " " - k = self.speaking(tospeak) - if ( - totlines - reading_state.row <= rows - and reading_state.content_index == len(contents) - 1 - ): - self.is_speaking = False - continue - - elif k in self.keymap.DoubleSpreadToggle: - if cols < mincols_doublespr: - k = self.show_win_error( - "Screen is too small", - "Min: {} cols x {} rows".format(mincols_doublespr, 12), - (Key("D"),), - ) - self.spread = (self.spread % 2) + 1 - return ReadingState( - content_index=reading_state.content_index, - textwidth=reading_state.textwidth, - row=reading_state.row, - rel_pctg=reading_state.row / totlines, - ) - - elif k in self.keymap.ScrollUp: - if self.spread == 2: - k = self.keymap.PageUp[0] - continue - if count > 1: - checkpoint_row = reading_state.row - 1 - if reading_state.row >= count: - reading_state = dataclasses.replace( - reading_state, row=reading_state.row - count - ) - elif reading_state.row == 0 and reading_state.content_index != 0: - self.page_animation = Direction.BACKWARD - # return -1, width, -rows, None, "" - return ReadingState( - content_index=reading_state.content_index - 1, - textwidth=reading_state.textwidth, - row=-rows, - ) - else: - reading_state = dataclasses.replace(reading_state, row=0) - - elif k in self.keymap.PageUp: - if reading_state.row == 0 and reading_state.content_index != 0: - self.page_animation = Direction.BACKWARD - text_structure_content_before = parse_html( - self.ebook.get_raw_text(contents[reading_state.content_index - 1]), - textwidth=reading_state.textwidth, - ) - assert isinstance(text_structure_content_before, TextStructure) - return ReadingState( - content_index=reading_state.content_index - 1, - textwidth=reading_state.textwidth, - row=rows - * self.spread - * ( - len(text_structure_content_before.text_lines) - // (rows * self.spread) - ), - ) - else: - if reading_state.row >= rows * self.spread * count: - self.page_animation = Direction.BACKWARD - reading_state = dataclasses.replace( - reading_state, - row=reading_state.row - (rows * self.spread * count), - ) - else: - reading_state = dataclasses.replace(reading_state, row=0) - - elif k in self.keymap.ScrollDown: - if self.spread == 2: - k = self.keymap.PageDown[0] - continue - if count > 1: - checkpoint_row = reading_state.row + rows - 1 - if reading_state.row + count <= totlines - rows: - reading_state = dataclasses.replace( - reading_state, row=reading_state.row + count - ) - elif ( - reading_state.row >= totlines - rows - and reading_state.content_index != len(contents) - 1 - ): - self.page_animation = Direction.FORWARD - return ReadingState( - content_index=reading_state.content_index + 1, - textwidth=reading_state.textwidth, - row=0, - ) - - elif k in self.keymap.PageDown: - if totlines - reading_state.row > rows * self.spread: - self.page_animation = Direction.FORWARD - reading_state = dataclasses.replace( - reading_state, row=reading_state.row + (rows * self.spread) - ) - elif reading_state.content_index != len(contents) - 1: - self.page_animation = Direction.FORWARD - return ReadingState( - content_index=reading_state.content_index + 1, - textwidth=reading_state.textwidth, - row=0, - ) - - # elif k in K["HalfScreenUp"] | K["HalfScreenDown"]: - # countstring = str(rows // 2) - # k = list(K["ScrollUp" if k in K["HalfScreenUp"] else "ScrollDown"])[0] - # continue - - elif k in self.keymap.NextChapter: - ntoc = find_current_content_index( - toc_entries, - text_structure.section_rows, - reading_state.content_index, - reading_state.row, - ) - if ntoc < len(toc_entries) - 1: - if reading_state.content_index == toc_entries[ntoc + 1].content_index: - try: - reading_state = dataclasses.replace( - reading_state, - row=text_structure.section_rows[ - toc_entries[ntoc + 1].section # type: ignore - ], - ) - except KeyError: - pass - else: - return ReadingState( - content_index=toc_entries[ntoc + 1].content_index, - textwidth=reading_state.textwidth, - row=0, - section=toc_entries[ntoc + 1].section, - ) - - elif k in self.keymap.PrevChapter: - ntoc = find_current_content_index( - toc_entries, - text_structure.section_rows, - reading_state.content_index, - reading_state.row, - ) - if ntoc > 0: - if reading_state.content_index == toc_entries[ntoc - 1].content_index: - reading_state = dataclasses.replace( - reading_state, - row=text_structure.section_rows.get( - toc_entries[ntoc - 1].section, 0 # type: ignore - ), - ) - else: - return ReadingState( - content_index=toc_entries[ntoc - 1].content_index, - textwidth=reading_state.textwidth, - row=0, - section=toc_entries[ntoc - 1].section, - ) - - elif k in self.keymap.BeginningOfCh: - ntoc = find_current_content_index( - toc_entries, - text_structure.section_rows, - reading_state.content_index, - reading_state.row, - ) - try: - reading_state = dataclasses.replace( - reading_state, - row=text_structure.section_rows[toc_entries[ntoc].section], # type: ignore - ) - except (KeyError, IndexError): - reading_state = dataclasses.replace(reading_state, row=0) - - elif k in self.keymap.EndOfCh: - ntoc = find_current_content_index( - toc_entries, - text_structure.section_rows, - reading_state.content_index, - reading_state.row, - ) - try: - if ( - text_structure.section_rows[toc_entries[ntoc + 1].section] - rows # type: ignore - >= 0 - ): - reading_state = dataclasses.replace( - reading_state, - row=text_structure.section_rows[toc_entries[ntoc + 1].section] # type: ignore - - rows, - ) - else: - reading_state = dataclasses.replace( - reading_state, - row=text_structure.section_rows[toc_entries[ntoc].section], # type: ignore - ) - except (KeyError, IndexError): - reading_state = dataclasses.replace( - reading_state, row=pgend(totlines, rows) - ) - - elif k in self.keymap.TableOfContents: - if not toc_entries: - k = self.show_win_error( - "Table of Contents", - "N/A: TableOfContents is unavailable for this book.", - self.keymap.TableOfContents, - ) - continue - ntoc = find_current_content_index( - toc_entries, - text_structure.section_rows, - reading_state.content_index, - reading_state.row, - ) - rettock, fllwd, _ = self.toc(toc_entries, ntoc) - if rettock is not None: # and rettock in WINKEYS: - k = rettock - continue - elif fllwd is not None: - if reading_state.content_index == toc_entries[fllwd].content_index: - try: - reading_state = dataclasses.replace( - reading_state, - row=text_structure.section_rows[toc_entries[fllwd].section], - ) - except KeyError: - reading_state = dataclasses.replace(reading_state, row=0) - else: - return ReadingState( - content_index=toc_entries[fllwd].content_index, - textwidth=reading_state.textwidth, - row=0, - section=toc_entries[fllwd].section, - ) - - elif k in self.keymap.Metadata: - k = self.show_win_metadata() - if k in self._win_keys: - continue - - elif k in self.keymap.Help: - k = self.show_win_help() - if k in self._win_keys: - continue - - elif ( - k in self.keymap.Enlarge - and (reading_state.textwidth + count) < cols - 4 - and self.spread == 1 - ): - return dataclasses.replace( - reading_state, - textwidth=reading_state.textwidth + count, - rel_pctg=reading_state.row / totlines, - ) - - elif ( - k in self.keymap.Shrink - and reading_state.textwidth >= 22 - and self.spread == 1 - ): - return dataclasses.replace( - reading_state, - textwidth=reading_state.textwidth - count, - rel_pctg=reading_state.row / totlines, - ) - - elif k in self.keymap.SetWidth and self.spread == 1: - if countstring == "": - # if called without a count, toggle between 80 cols and full width - if reading_state.textwidth != 80 and cols - 4 >= 80: - return ReadingState( - content_index=reading_state.content_index, - textwidth=80, - row=reading_state.row, - rel_pctg=reading_state.row / totlines, - ) - else: - return ReadingState( - content_index=reading_state.content_index, - textwidth=cols - 4, - row=reading_state.row, - rel_pctg=reading_state.row / totlines, - ) - else: - reading_state = dataclasses.replace(reading_state, textwidth=count) - if reading_state.textwidth < 20: - reading_state = dataclasses.replace(reading_state, textwidth=20) - elif reading_state.textwidth >= cols - 4: - reading_state = dataclasses.replace(reading_state, textwidth=cols - 4) - - return ReadingState( - content_index=reading_state.content_index, - textwidth=reading_state.textwidth, - row=reading_state.row, - rel_pctg=reading_state.row / totlines, - ) - - elif k in self.keymap.RegexSearch: - ret_object = self.searching( - board, - text_structure.text_lines, - reading_state, - len(contents), - ) - if isinstance(ret_object, Key) or isinstance(ret_object, NoUpdate): - k = ret_object - # k = ret_object.value - continue - elif isinstance(ret_object, ReadingState) and self.search_data: - return ret_object - # else: - elif isinstance(ret_object, ReadingState): - # y = ret_object - reading_state = ret_object - - elif k in self.keymap.OpenImage and self.image_viewer: - imgs_in_screen = list( - set( - range(reading_state.row, reading_state.row + rows * self.spread + 1) - ) - & set(text_structure.image_maps.keys()) - ) - if not imgs_in_screen: - k = NoUpdate() - continue - - imgs_in_screen.sort() - image_path: Optional[str] = None - if len(imgs_in_screen) == 1: - image_path = text_structure.image_maps[imgs_in_screen[0]] - elif len(imgs_in_screen) > 1: - imgs_rel_to_row = [i - reading_state.row for i in imgs_in_screen] - p: Union[NoUpdate, Key] = NoUpdate() - i = 0 - while p not in self.keymap.Quit and p not in self.keymap.Follow: - self.screen.move( - imgs_rel_to_row[i] % rows, - ( - x - if imgs_rel_to_row[i] // rows == 0 - else cols - - DoubleSpreadPadding.RIGHT.value - - reading_state.textwidth - ) - + reading_state.textwidth // 2, - ) - self.screen.refresh() - safe_curs_set(2) - p = board.getch() - if p in self.keymap.ScrollDown: - i += 1 - elif p in self.keymap.ScrollUp: - i -= 1 - i = i % len(imgs_rel_to_row) - - safe_curs_set(0) - if p in self.keymap.Follow: - image_path = text_structure.image_maps[imgs_in_screen[i]] - - if image_path: - try: - # if self.ebook.__class__.__name__ in {"Epub", "Mobi", "Azw"}: - if isinstance(self.ebook, (Epub, Mobi, Azw)): - # self.seamless adjustment - if self.seamless: - current_content_index = ( - self.convert_absolute_reading_state_to_relative( - reading_state - ).content_index - ) - else: - current_content_index = reading_state.content_index - # for n, content in enumerate(self.ebook.contents): - # content_path = content - # if reading_state.row < sum(totlines_per_content[:n]): - # break - - content_path = self.ebook.contents[current_content_index] - assert isinstance(content_path, str) - image_path = resolve_path(content_path, image_path) - imgnm, imgbstr = self.ebook.get_img_bytestr(image_path) - k = self.open_image(board, imgnm, imgbstr) - continue - except Exception as e: - self.show_win_error("Error Opening Image", str(e), tuple()) - if DEBUG: - raise e - - elif ( - k in self.keymap.SwitchColor - and self.is_color_supported - and countstring in {"", "0", "1", "2"} - ): - if countstring == "": - count_color = curses.pair_number(self.screen.getbkgd()) - if count_color not in {2, 3}: - count_color = 1 - count_color = count_color % 3 - else: - count_color = count - self.screen.bkgd(curses.color_pair(count_color + 1)) - # pad.format() - return ReadingState( - content_index=reading_state.content_index, - textwidth=reading_state.textwidth, - row=reading_state.row, - ) - - elif k in self.keymap.AddBookmark: - bmname = self.input_prompt(" Add bookmark:") - if isinstance(bmname, str) and bmname: - try: - self.state.insert_bookmark( - self.ebook, - bmname, - dataclasses.replace( - reading_state, rel_pctg=reading_state.row / totlines - ), - ) - except sqlite3.IntegrityError: - k = self.show_win_error( - "Error: Add Bookmarks", - f"Bookmark with name '{bmname}' already exists.", - (Key("B"),), - ) - continue - else: - k = bmname - continue - - elif k in self.keymap.ShowBookmarks: - bookmarks = self.state.get_bookmarks(self.ebook) - if not bookmarks: - k = self.show_win_error( - "Bookmarks", - "N/A: Bookmarks are not found in this book.", - self.keymap.ShowBookmarks, - ) - continue - else: - retk, idxchoice = self.show_win_choices_bookmarks() - if retk is not None: - k = retk - continue - elif idxchoice is not None: - bookmark_to_jump = self.state.get_bookmarks(self.ebook)[idxchoice][ - 1 - ] - if ( - bookmark_to_jump.content_index == reading_state.content_index - and bookmark_to_jump.textwidth == reading_state.textwidth - ): - reading_state = bookmark_to_jump - else: - return ReadingState( - content_index=bookmark_to_jump.content_index, - textwidth=reading_state.textwidth, - row=bookmark_to_jump.row, - rel_pctg=bookmark_to_jump.rel_pctg, - ) - - elif k in self.keymap.DefineWord and self.ext_dict_app: - word = self.input_prompt(" Define:") - if isinstance(word, str) and word: - defin = self.define_word(word) - if defin in self._win_keys: - k = defin - continue - else: - k = word - continue - - elif k in self.keymap.MarkPosition: - jumnum = board.getch() - if isinstance(jumnum, Key) and jumnum in tuple( - Key(i) for i in range(48, 58) - ): - self.jump_list[jumnum.char] = reading_state - else: - k = NoUpdate() - continue - - elif k in self.keymap.JumpToPosition: - jumnum = board.getch() - if ( - isinstance(jumnum, Key) - and jumnum in tuple(Key(i) for i in range(48, 58)) - and jumnum.char in self.jump_list - ): - marked_reading_state = self.jump_list[jumnum.char] - return dataclasses.replace( - marked_reading_state, - textwidth=reading_state.textwidth, - rel_pctg=None - if marked_reading_state.textwidth == reading_state.textwidth - else marked_reading_state.rel_pctg, - section="", - ) - else: - k = NoUpdate() - continue - - elif k in self.keymap.ShowHideProgress: - self.show_reading_progress = not self.show_reading_progress - - elif k in self.keymap.Library: - self.try_assign_letters_count(force_wait=True) - self.calculate_reading_progress(letters_per_content, reading_state) - - self.savestate( - dataclasses.replace( - reading_state, rel_pctg=reading_state.row / totlines - ) - ) - library_items = self.state.get_from_history() - if not library_items: - k = self.show_win_error( - "Library", - "N/A: No reading history.", - self.keymap.Library, - ) - continue - else: - retk, choice_index = self.show_win_library() - if retk is not None: - k = retk - continue - elif choice_index is not None: - return get_ebook_obj(library_items[choice_index].filepath) - - elif k == Key(curses.KEY_RESIZE): - self.savestate( - dataclasses.replace( - reading_state, rel_pctg=reading_state.row / totlines - ) - ) - # stated in pypi windows-curses page: - # to call resize_term right after KEY_RESIZE - if sys.platform == "win32": - curses.resize_term(rows, cols) - rows, cols = self.screen.getmaxyx() - else: - rows, cols = self.screen.getmaxyx() - curses.resize_term(rows, cols) - if cols < 22 or rows < 12: - sys.exit("ERROR: Screen was too small (min 22cols x 12rows).") - if cols <= reading_state.textwidth + 4: - return ReadingState( - content_index=reading_state.content_index, - textwidth=cols - 4, - row=reading_state.row, - rel_pctg=reading_state.row / totlines, - ) - else: - return ReadingState( - content_index=reading_state.content_index, - textwidth=reading_state.textwidth, - row=reading_state.row, - ) - - countstring = "" - - if checkpoint_row: - board.feed_temporary_style( - ( - InlineStyle( - row=checkpoint_row, - col=0, - n_letters=reading_state.textwidth, - attr=curses.A_UNDERLINE, - ), - ) - ) - - try: - if self.setting.PageScrollAnimation and self.page_animation: - self.screen.clear() - for i in range(1, reading_state.textwidth + 1): - curses.napms(1) - # self.screen.clear() - board.write_n(reading_state.row, i, self.page_animation) - self.screen.refresh() - self.page_animation = None - - self.screen.clear() - self.screen.addstr(0, 0, countstring) - board.write(reading_state.row) - - # check if letters counting process is done - self.try_assign_letters_count() - - # reading progress - self.calculate_reading_progress(letters_per_content, reading_state) - - # display reading progress - if ( - self.reading_progress - and self.show_reading_progress - and (cols - reading_state.textwidth - 2) // 2 > 3 - ): - reading_progress_str = "{}%".format(int(self.reading_progress * 100)) - self.screen.addstr( - 0, cols - len(reading_progress_str), reading_progress_str - ) - - self.screen.refresh() - except curses.error: - pass - - if self.is_speaking: - k = self.keymap.TTSToggle[0] - continue - - k = board.getch() - if k == Key(curses.KEY_MOUSE): - mouse_event = curses.getmouse() - if mouse_event[4] == curses.BUTTON1_CLICKED: - if mouse_event[1] < cols // 2: - k = self.keymap.PageUp[0] - else: - k = self.keymap.PageDown[0] - elif mouse_event[4] == curses.BUTTON3_CLICKED: - k = self.keymap.TableOfContents[0] - elif mouse_event[4] == curses.BUTTON4_PRESSED: - k = self.keymap.ScrollUp[0] - elif mouse_event[4] == 2097152: - k = self.keymap.ScrollDown[0] - elif mouse_event[4] == curses.BUTTON4_PRESSED + curses.BUTTON_CTRL: - k = self.keymap.Enlarge[0] - elif mouse_event[4] == 2097152 + curses.BUTTON_CTRL: - k = self.keymap.Shrink[0] - elif mouse_event[4] == curses.BUTTON2_CLICKED: - k = self.keymap.TTSToggle[0] - - if checkpoint_row: - board.feed_temporary_style() - checkpoint_row = None - - except KeyboardInterrupt: - self.savestate( - dataclasses.replace(reading_state, rel_pctg=reading_state.row / totlines) - ) - sys.exit() - - -# }}} - - -# Reading Init {{{ - - -def preread(stdscr, filepath: str): - - ebook = get_ebook_obj(filepath) - state = State() - config = Config() - - reader = Reader(screen=stdscr, ebook=ebook, config=config, state=state) - - def handle_signal(signum, _): - """ - Method to raise SystemExit based on signal received - to trigger `try-finally` clause - """ - msg = f"[{os.getpid()}] killed" - if signal.Signals(signum) == signal.SIGTERM: - msg = f"[{os.getpid()}] terminated" - sys.exit(msg) - signal.signal(signal.SIGTERM, handle_signal) - - try: - reader.run_counting_letters() - - reading_state = state.get_last_reading_state(reader.ebook) - if reader.screen_cols <= reading_state.textwidth + 4: - reading_state = dataclasses.replace(reading_state, textwidth=reader.screen_cols - 4) - else: - reading_state = dataclasses.replace(reading_state, rel_pctg=None) - - while True: - reading_state_or_ebook = reader.read(reading_state) - - if isinstance(reading_state_or_ebook, Ebook): - return reading_state_or_ebook.path - else: - reading_state = reading_state_or_ebook - if reader.seamless: - reading_state = reader.convert_absolute_reading_state_to_relative(reading_state) - - finally: - reader.cleanup() - - -# }}} - - -# Commandline {{{ - - -def parse_cli_args() -> argparse.Namespace: - prog = "epy" - positional_arg_help_str = "[PATH | # | PATTERN | URL]" - args_parser = argparse.ArgumentParser( - prog=prog, - usage=f"%(prog)s [-h] [-r] [-d] [-v] {positional_arg_help_str}", - formatter_class=argparse.RawDescriptionHelpFormatter, - description="Read ebook in terminal", - epilog=textwrap.dedent( - f"""\ - examples: - {prog} /path/to/ebook read /path/to/ebook file - {prog} 3 read #3 file from reading history - {prog} count monte read file matching 'count monte' - from reading history - """ - ), - ) - args_parser.add_argument("-r", "--history", action="store_true", help="print reading history") - args_parser.add_argument("-d", "--dump", action="store_true", help="dump the content of ebook") - args_parser.add_argument( - "-v", - "--version", - action="version", - version=f"v{__version__}", - help="print version and exit", - ) - args_parser.add_argument( - "ebook", - action="store", - nargs="*", - metavar=positional_arg_help_str, - help="ebook path, history number, pattern or URL", - ) - return args_parser.parse_args() - - -def find_file() -> Tuple[str, bool]: - args = parse_cli_args() - state = State() - cleanup_library(state) - - if args.history: - print_reading_history(state) - sys.exit() - - if len(args.ebook) == 0: - last_read = state.get_last_read() - if last_read: - return last_read, args.dump - else: - sys.exit("ERROR: Found no last read ebook file.") - - elif len(args.ebook) == 1: - nth = coerce_to_int(args.ebook[0]) - if nth is not None: - file = get_nth_file_from_library(state, nth) - if file: - return file.filepath, args.dump - else: - print(f"ERROR: #{nth} file not found.") - print_reading_history(state) - sys.exit(1) - elif is_url(args.ebook[0]): - return args.ebook[0], args.dump - elif os.path.isfile(args.ebook[0]): - return args.ebook[0], args.dump - - pattern = " ".join(args.ebook) - match = get_matching_library_item(state, pattern) - if match: - return match.filepath, args.dump - else: - sys.exit("ERROR: Found no matching ebook from history.") - - -def main(): - filepath, dump_only = find_file() - if dump_only: - sys.exit(dump_ebook_content(filepath)) - - while True: - filepath = curses.wrapper(preread, filepath) - - -# }}} - - -if __name__ == "__main__": - # On Windows, calling this method is necessary - # On Linux/OSX, this method does nothing - multiprocessing.freeze_support() - main() |