diff options
author | benadha <benawiadha@gmail.com> | 2022-01-02 11:30:31 +0700 |
---|---|---|
committer | benadha <benawiadha@gmail.com> | 2022-01-02 11:30:31 +0700 |
commit | 30f1f5187d1f6a5be8b98524d00f080770ff1a76 (patch) | |
tree | b55b2e2d5e1c703f3d697faf7d260441643995df /epy.py | |
parent | b3354651dbca28a1fe29c717920f360d99ebe516 (diff) | |
download | epy-30f1f5187d1f6a5be8b98524d00f080770ff1a76.tar.gz |
Fixed plethora type issuesv2022.1.2
Diffstat (limited to 'epy.py')
-rwxr-xr-x | epy.py | 473 |
1 files changed, 316 insertions, 157 deletions
@@ -14,7 +14,7 @@ Options: """ -__version__ = "2022.1.1" +__version__ = "2022.1.2" __license__ = "GPL-3.0" __author__ = "Benawi Adha" __email__ = "benawiadha@gmail.com" @@ -39,7 +39,7 @@ import uuid import xml.etree.ElementTree as ET import zipfile -from typing import Optional, Union, Sequence, Tuple, List, Mapping, Set, Type, Any +from typing import Optional, Union, Sequence, Tuple, List, Dict, Mapping, Set, Type, Any from dataclasses import dataclass, field from difflib import SequenceMatcher as SM from enum import Enum @@ -49,13 +49,19 @@ from html.parser import HTMLParser from urllib.parse import unquote try: - import mobi + import mobi # type: ignore MOBI_SUPPORT = True except ModuleNotFoundError: MOBI_SUPPORT = False +try: + DEBUG = int(str(os.getenv("DEBUG"))) == 1 +except ValueError: + DEBUG = False + + # ----------------------- MODELS --------------------------- @@ -116,6 +122,7 @@ class SpeakerMimic(SpeakerBaseModel): stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT, ) + assert self.process.stdin self.process.stdin.write(text) self.process.stdin.close() @@ -372,7 +379,51 @@ class Keymap: TableOfContents: Tuple[Key, ...] -class Epub: +class Ebook: + def __init__(self, fileepub: str): + raise NotImplementedError("Ebook.__init__() not implemented") + + @property + def path(self) -> str: + return self._path + + @path.setter + def path(self, value: str) -> None: + self._path = value + + @property + def contents(self) -> Union[Tuple[str, ...], Tuple[ET.Element, ...]]: + return self._contents + + @contents.setter + def contents(self, value: Union[Tuple[str, ...], Tuple[ET.Element, ...]]) -> None: + self._contents = value + + @property + def toc_entries(self) -> Tuple[TocEntry, ...]: + return self._toc_entries + + @toc_entries.setter + def toc_entries(self, value: Tuple[TocEntry, ...]) -> None: + self._toc_entries = value + + def get_meta(self) -> Tuple[Tuple[str, str], ...]: + raise NotImplementedError("Ebook.get_meta() not implemented") + + def initialize(self) -> None: + raise NotImplementedError("Ebook.initialize() not implemented") + + def get_raw_text(self, content: Union[str, ET.Element]) -> str: + raise NotImplementedError("Ebook.get_raw_text() not implemented") + + def get_img_bytestr(self, impath: str) -> Tuple[str, bytes]: + raise NotImplementedError("Ebook.get_img_bytestr() not implemented") + + def cleanup(self) -> None: + raise NotImplementedError("Ebook.cleanup() not implemented") + + +class Epub(Ebook): NS = { "DAISY": "http://www.daisy.org/z3986/2005/ncx/", "OPF": "http://www.idpf.org/2007/opf", @@ -383,18 +434,16 @@ class Epub: def __init__(self, fileepub: str): self.path: str = os.path.abspath(fileepub) - self.file: zipfile.ZipFile = zipfile.ZipFile(fileepub, "r") + self.file: Union[zipfile.ZipFile, str] = zipfile.ZipFile(fileepub, "r") - # populate these attribute + # populate these attributes # by calling self.initialize() - self.version: str self.root_filepath: str self.root_dirpath: str - self.toc_path: str - self.contents: Tuple[str, ...] = tuple() - self.toc_entries: Tuple[TocEntry, ...] = tuple() def get_meta(self) -> Tuple[Tuple[str, str], ...]: + assert isinstance(self.file, zipfile.ZipFile) + meta: List[Tuple[str, str]] = [] # why self.file.read(self.root_filepath) problematic cont = ET.fromstring(self.file.open(self.root_filepath).read()) @@ -404,81 +453,116 @@ class Epub: return tuple(meta) def initialize(self) -> None: - cont = ET.parse(self.file.open("META-INF/container.xml")) - self.root_filepath = cont.find("CONT:rootfiles/CONT:rootfile", self.NS).attrib["full-path"] + assert isinstance(self.file, zipfile.ZipFile) + + container = ET.parse(self.file.open("META-INF/container.xml")) + rootfile_elem = container.find("CONT:rootfiles/CONT:rootfile", self.NS) + assert rootfile_elem is not None + self.root_filepath = rootfile_elem.attrib["full-path"] self.root_dirpath = ( os.path.dirname(self.root_filepath) + "/" if os.path.dirname(self.root_filepath) != "" else "" ) - cont = ET.parse(self.file.open(self.root_filepath)) + # EPUB3 - self.version = cont.getroot().get("version") - if self.version == "2.0": + content_opf = ET.parse(self.file.open(self.root_filepath)) + version = content_opf.getroot().get("version") + if version not in {"2.0", "3.0"}: + raise RuntimeError(f"Unsuppoerted Epub version: {version}") + if version == "2.0": # "OPF:manifest/*[@id='ncx']" - self.toc_path = self.root_dirpath + cont.find( + relative_toc = content_opf.find( "OPF:manifest/*[@media-type='application/x-dtbncx+xml']", self.NS - ).get("href") - elif self.version == "3.0": - self.toc_path = self.root_dirpath + cont.find( - "OPF:manifest/*[@properties='nav']", self.NS - ).get("href") + ) + elif version == "3.0": + relative_toc = content_opf.find("OPF:manifest/*[@properties='nav']", self.NS) + assert relative_toc is not None + relative_toc_path = relative_toc.get("href") + assert relative_toc_path is not None + toc_path = self.root_dirpath + relative_toc_path # cont = ET.parse(self.file.open(self.root_filepath)).getroot() - manifest = [] - for i in cont.findall("OPF:manifest/*", self.NS): + manifests: List[Tuple[str, str]] = [] + for manifest_elem in content_opf.findall("OPF:manifest/*", self.NS): # EPUB3 - # if i.get("id") != "ncx" and i.get("properties") != "nav": - if i.get("media-type") != "application/x-dtbncx+xml" and i.get("properties") != "nav": - manifest.append([i.get("id"), i.get("href")]) + # if manifest_elem.get("id") != "ncx" and manifest_elem.get("properties") != "nav": + if ( + manifest_elem.get("media-type") != "application/x-dtbncx+xml" + and manifest_elem.get("properties") != "nav" + ): + manifest_id = manifest_elem.get("id") + assert manifest_id is not None + manifest_href = manifest_elem.get("href") + assert manifest_href is not None + manifests.append((manifest_id, manifest_href)) book_contents: List[str] = [] - spine: List[str] = [] + spines: List[str] = [] contents: List[str] = [] - for i in cont.findall("OPF:spine/*", self.NS): - spine.append(i.get("idref")) - for i in spine: - for j in manifest: - if i == j[0]: - book_contents.append(self.root_dirpath + unquote(j[1])) - contents.append(unquote(j[1])) - manifest.remove(j) + for spine_elem in content_opf.findall("OPF:spine/*", self.NS): + idref = spine_elem.get("idref") + assert idref is not None + spines.append(idref) + for spine in spines: + for manifest in manifests: + if spine == manifest[0]: + book_contents.append(self.root_dirpath + unquote(manifest[1])) + contents.append(unquote(manifest[1])) + manifests.remove(manifest) # TODO: test is break necessary break self.contents = tuple(book_contents) try: - toc = ET.parse(self.file.open(self.toc_path)).getroot() + toc = ET.parse(self.file.open(toc_path)).getroot() # EPUB3 - if self.version == "2.0": + if version == "2.0": navPoints = toc.findall("DAISY:navMap//DAISY:navPoint", self.NS) - elif self.version == "3.0": + elif version == "3.0": navPoints = toc.findall("XHTML:body//XHTML:nav[@EPUB:type='toc']//XHTML:a", self.NS) toc_entries: List[TocEntry] = [] - for i in navPoints: - if self.version == "2.0": - src = i.find("DAISY:content", self.NS).get("src") - name = i.find("DAISY:navLabel/DAISY:text", self.NS).text - elif self.version == "3.0": - src = i.get("href") - name = "".join(list(i.itertext())) - src = src.split("#") + for navPoint in navPoints: + if version == "2.0": + src_elem = navPoint.find("DAISY:content", self.NS) + assert src_elem is not None + src = src_elem.get("src") + + name_elem = navPoint.find("DAISY:navLabel/DAISY:text", self.NS) + assert name_elem is not None + name = name_elem.text + elif version == "3.0": + src_elem = navPoint + assert src_elem is not None + src = src_elem.get("href") + + name = "".join(list(navPoint.itertext())) + + assert src is not None + src_id = src.split("#") + try: - idx = contents.index(unquote(src[0])) + idx = contents.index(unquote(src_id[0])) except ValueError: continue + assert name is not None toc_entries.append( TocEntry( - label=name, content_index=idx, section=src[1] if len(src) == 2 else None + label=name, + content_index=idx, + section=src_id[1] if len(src_id) == 2 else None, ) ) self.toc_entries = tuple(toc_entries) except AttributeError: pass - def get_raw_text(self, content_path: str) -> str: + def get_raw_text(self, content_path: Union[str, ET.Element]) -> str: + assert isinstance(self.file, zipfile.ZipFile) + assert isinstance(content_path, str) + # using try-except block to catch # zlib.error: Error -3 while decompressing data: invalid distance too far back # caused by forking PROC_COUNTLETTERS @@ -491,10 +575,11 @@ class Epub: return content.decode("utf-8") def get_img_bytestr(self, impath: str) -> Tuple[str, bytes]: + assert isinstance(self.file, zipfile.ZipFile) return impath, self.file.read(impath) def cleanup(self) -> None: - return + pass class Mobi(Epub): @@ -504,12 +589,9 @@ class Mobi(Epub): # populate these attribute # by calling self.initialize() - self.version: str self.root_filepath: str self.root_dirpath: str self.toc_path: str - self.contents: Tuple[str] = () - self.toc_entries: Tuple[TocEntry] = () def get_meta(self) -> Tuple[Tuple[str, str], ...]: meta: List[Tuple[str, str]] = [] @@ -522,30 +604,42 @@ class Mobi(Epub): return tuple(meta) def initialize(self) -> None: + assert isinstance(self.file, str) + self.root_dirpath = os.path.join(self.file, "mobi7") self.toc_path = os.path.join(self.root_dirpath, "toc.ncx") - self.version = "2.0" + version = "2.0" with open(os.path.join(self.root_dirpath, "content.opf")) as f: cont = ET.parse(f).getroot() - manifest = [] - for i in cont.findall("OPF:manifest/*", self.NS): + + manifests: List[Tuple[str, str]] = [] + for manifest_elem in cont.findall("OPF:manifest/*", self.NS): # EPUB3 - # if i.get("id") != "ncx" and i.get("properties") != "nav": - if i.get("media-type") != "application/x-dtbncx+xml" and i.get("properties") != "nav": - manifest.append([i.get("id"), i.get("href")]) + # if manifest_elem.get("id") != "ncx" and manifest_elem.get("properties") != "nav": + if ( + manifest_elem.get("media-type") != "application/x-dtbncx+xml" + and manifest_elem.get("properties") != "nav" + ): + manifest_id = manifest_elem.get("id") + assert manifest_id is not None + manifest_href = manifest_elem.get("href") + assert manifest_href is not None + manifests.append((manifest_id, manifest_href)) book_contents: List[str] = [] - spine: List[str] = [] + spines: List[str] = [] contents: List[str] = [] - for i in cont.findall("OPF:spine/*", self.NS): - spine.append(i.get("idref")) - for i in spine: - for j in manifest: - if i == j[0]: - book_contents.append(os.path.join(self.root_dirpath, unquote(j[1]))) - contents.append(unquote(j[1])) - manifest.remove(j) + for spine_elem in cont.findall("OPF:spine/*", self.NS): + idref = spine_elem.get("idref") + assert idref is not None + spines.append(idref) + for spine in spines: + for manifest in manifests: + if spine == manifest[0]: + book_contents.append(os.path.join(self.root_dirpath, unquote(manifest[1]))) + contents.append(unquote(manifest[1])) + manifests.remove(manifest) # TODO: test is break necessary break self.contents = tuple(book_contents) @@ -553,31 +647,46 @@ class Mobi(Epub): with open(self.toc_path) as f: toc = ET.parse(f).getroot() # EPUB3 - if self.version == "2.0": + if version == "2.0": navPoints = toc.findall("DAISY:navMap//DAISY:navPoint", self.NS) - elif self.version == "3.0": + elif version == "3.0": navPoints = toc.findall("XHTML:body//XHTML:nav[@EPUB:type='toc']//XHTML:a", self.NS) toc_entries: List[TocEntry] = [] - for i in navPoints: - if self.version == "2.0": - src = i.find("DAISY:content", self.NS).get("src") - name = i.find("DAISY:navLabel/DAISY:text", self.NS).text - elif self.version == "3.0": - src = i.get("href") - name = "".join(list(i.itertext())) - src = src.split("#") + for navPoint in navPoints: + if version == "2.0": + src_elem = navPoint.find("DAISY:content", self.NS) + assert src_elem is not None + src = src_elem.get("src") + + name_elem = navPoint.find("DAISY:navLabel/DAISY:text", self.NS) + assert name_elem is not None + name = name_elem.text + elif version == "3.0": + src_elem = navPoint + assert src_elem is not None + src = src_elem.get("href") + + name = "".join(list(navPoint.itertext())) + + assert src is not None + src_id = src.split("#") + try: - idx = contents.index(unquote(src[0])) + idx = contents.index(unquote(src_id[0])) except ValueError: continue + assert name is not None toc_entries.append( - TocEntry(label=name, content_index=idx, section=src[1] if len(src) == 2 else None) + TocEntry( + label=name, content_index=idx, section=src_id[1] if len(src_id) == 2 else None + ) ) self.toc_entries = tuple(toc_entries) - def get_raw_text(self, content_path: str) -> str: + def get_raw_text(self, content_path: Union[str, ET.Element]) -> str: + assert isinstance(content_path, str) # using try-except block to catch # zlib.error: Error -3 while decompressing data: invalid distance too far back # caused by forking PROC_COUNTLETTERS @@ -599,6 +708,7 @@ class Mobi(Epub): return impath, src def cleanup(self) -> None: + assert isinstance(self.file, str) shutil.rmtree(self.file) return @@ -614,7 +724,7 @@ class Azw3(Epub): return -class FictionBook: +class FictionBook(Ebook): NS = {"FB2": "http://www.gribuser.ru/xml/fictionbook/2.0"} def __init__(self, filefb: str): @@ -623,12 +733,11 @@ class FictionBook: # populate these attribute # by calling self.initialize() - self.root: str - self.contents: Tuple[str] = () - self.toc_entries: Tuple[TocEntry] = () + self.root: ET.Element def get_meta(self) -> Tuple[Tuple[str, str], ...]: desc = self.root.find("FB2:description", self.NS) + assert desc is not None alltags = desc.findall("*/*") return tuple((re.sub("{.*?}", "", i.tag), " ".join(i.itertext())) for i in alltags) @@ -648,16 +757,22 @@ class FictionBook: ) self.toc_entries = tuple(toc_entries) - def get_raw_text(self, node) -> str: + def get_raw_text(self, node: Union[str, ET.Element]) -> str: + assert isinstance(node, ET.Element) ET.register_namespace("", "http://www.gribuser.ru/xml/fictionbook/2.0") # sys.exit(ET.tostring(node, encoding="utf8", method="html").decode("utf-8").replace("ns1:","")) return ET.tostring(node, encoding="utf8", method="html").decode("utf-8").replace("ns1:", "") def get_img_bytestr(self, imgid: str) -> Tuple[str, bytes]: + # TODO: test if image works imgid = imgid.replace("#", "") - img = self.root.find("*[@id='{}']".format(imgid)) - imgtype = img.get("content-type").split("/")[1] - return imgid + "." + imgtype, base64.b64decode(img.text) + img_elem = self.root.find("*[@id='{}']".format(imgid)) + assert img_elem is not None + imgtype = img_elem.get("content-type") + img_elem_text = img_elem.text + assert imgtype is not None + assert img_elem_text is not None + return imgid + "." + imgtype.split("/")[1], base64.b64decode(img_elem_text) def cleanup(self) -> None: return @@ -699,7 +814,7 @@ class HTMLtoLines(HTMLParser): self.sectsindex = {} self.initital = [] self.initbold = [] - self.imgs: Mapping[int, str] = dict() + self.imgs: Dict[int, str] = dict() def handle_starttag(self, tag, attrs): if re.match("h[1-6]", tag) is not None: @@ -821,12 +936,16 @@ class HTMLtoLines(HTMLParser): def get_structured_text( self, textwidth: Optional[int] = 0, starting_line: int = 0 ) -> Union[Tuple[str, ...], TextStructure]: + # reusable loop indices + i: Any + j: Any + text: List[str] = [] - images: Mapping[int, str] = dict() # {line_num: path/in/zip} - sect: Mapping[str, int] = dict() # {section_id: line_num} + images: Dict[int, str] = dict() # {line_num: path/in/zip} + sect: Dict[str, int] = dict() # {section_id: line_num} formatting: List[InlineStyle] = [] - tmpital = [] + tmpital: List[List[int]] = [] for i in self.initital: # handle uneven markup # like <i> but no </i> @@ -859,6 +978,7 @@ class HTMLtoLines(HTMLParser): return tuple(self.text) for n, i in enumerate(self.text): + startline = len(text) # findsect = re.search(r"(?<= \(#).*?(?=\) )", i) # if findsect is not None and findsect.group() in self.sects: @@ -1167,13 +1287,13 @@ class State(AppData): def filepath(self) -> str: return os.path.join(self.prefix, "states.db") if self.prefix else os.devnull - def get_from_history(self) -> Tuple[str]: + def get_from_history(self) -> Tuple[str, ...]: try: conn = sqlite3.connect(self.filepath) cur = conn.cursor() cur.execute("SELECT filepath FROM reading_states") results = cur.fetchall() - return tuple(i[0] for i in results) + return tuple(i[0] for i in results) # type: ignore finally: conn.close() @@ -1197,7 +1317,7 @@ class State(AppData): finally: conn.close() - def set_last_read(self, ebook: Union[Epub, Mobi, Azw3, FictionBook]) -> None: + def set_last_read(self, ebook: Ebook) -> None: try: conn = sqlite3.connect(self.filepath) conn.execute("INSERT OR REPLACE INTO last_read VALUES (0, ?)", (ebook.path,)) @@ -1205,7 +1325,7 @@ class State(AppData): finally: conn.close() - def get_last_reading_state(self, ebook: Union[Epub, Mobi, Azw3, FictionBook]) -> ReadingState: + def get_last_reading_state(self, ebook: Ebook) -> ReadingState: try: conn = sqlite3.connect(self.filepath) conn.row_factory = sqlite3.Row @@ -1220,9 +1340,7 @@ class State(AppData): finally: conn.close() - def set_last_reading_state( - self, ebook: Union[Epub, Mobi, Azw3, FictionBook], reading_state: ReadingState - ) -> None: + def set_last_reading_state(self, ebook: Ebook, reading_state: ReadingState) -> None: try: conn = sqlite3.connect(self.filepath) conn.execute( @@ -1236,9 +1354,7 @@ class State(AppData): finally: conn.close() - def insert_bookmark( - self, ebook: Union[Epub, Mobi, Azw3, FictionBook], name: str, reading_state: ReadingState - ) -> None: + def insert_bookmark(self, ebook: Ebook, name: str, reading_state: ReadingState) -> None: try: conn = sqlite3.connect(self.filepath) conn.execute( @@ -1257,7 +1373,7 @@ class State(AppData): finally: conn.close() - def delete_bookmark(self, ebook: Union[Epub, Mobi, Azw3, FictionBook], name: str) -> None: + def delete_bookmark(self, ebook: Ebook, name: str) -> None: try: conn = sqlite3.connect(self.filepath) conn.execute("DELETE FROM bookmarks WHERE filepath=? AND name=?", (ebook.path, name)) @@ -1273,9 +1389,7 @@ class State(AppData): finally: conn.close() - def get_bookmarks( - self, ebook: Union[Epub, Mobi, Azw3, FictionBook] - ) -> List[Tuple[str, ReadingState]]: + def get_bookmarks(self, ebook: Ebook) -> List[Tuple[str, ReadingState]]: try: conn = sqlite3.connect(self.filepath) conn.row_factory = sqlite3.Row @@ -1350,9 +1464,9 @@ class InfiniBoard: def __init__( self, screen, - text: Tuple[str], + text: Tuple[str, ...], textwidth: int = 80, - default_style: Tuple[InlineStyle] = (), + default_style: Tuple[InlineStyle, ...] = tuple(), spread: int = 1, ): self.screen = screen @@ -1362,7 +1476,7 @@ class InfiniBoard: self.text = text self.total_lines = len(text) self.default_style: Tuple[InlineStyle, ...] = default_style - self.temporary_style = () + self.temporary_style: Tuple[InlineStyle, ...] = () self.spread = spread if self.spread == 2: @@ -1416,6 +1530,7 @@ class InfiniBoard: and row + self.screen_rows - bottom_padding + n_row < self.total_lines ): text_line = self.text[row + self.screen_rows - bottom_padding + n_row] + # TODO: clean this up if re.search("\\[IMG:[0-9]+\\]", text_line): self.screen.addstr( n_row, self.x_alt, text_line.center(self.textwidth), curses.A_BOLD @@ -1482,15 +1597,13 @@ class InfiniBoard: def construct_speaker( preferred: Optional[str] = None, args: List[str] = [] -) -> Optional[Type[SpeakerBaseModel]]: +) -> Optional[SpeakerBaseModel]: speakers = ( sorted(SPEAKERS, key=lambda x: int(x.cmd == preferred), reverse=True) if preferred else SPEAKERS ) - speaker: Optional[Type[SpeakerBaseModel]] = next( - (speaker for speaker in speakers if speaker.available), None - ) + speaker = next((speaker for speaker in speakers if speaker.available), None) return speaker(args) if speaker else None @@ -1542,24 +1655,33 @@ def construct_relative_reading_state( :param totlines_per_content: sequence of total lines per book content :return: new ReadingState relative to per content of the book """ + index = 0 cumulative_contents_lines = 0 all_contents_lines = sum(totlines_per_content) - for n, content_lines in enumerate(totlines_per_content): + # for n, content_lines in enumerate(totlines_per_content): + # cumulative_contents_lines += content_lines + # if cumulative_contents_lines > abs_reading_state.row: + # return + while True: + content_lines = totlines_per_content[index] cumulative_contents_lines += content_lines if cumulative_contents_lines > abs_reading_state.row: - return ReadingState( - content_index=n, - textwidth=abs_reading_state.textwidth, - row=abs_reading_state.row - cumulative_contents_lines + content_lines, - rel_pctg=abs_reading_state.rel_pctg - - ((cumulative_contents_lines - content_lines) / all_contents_lines) - if abs_reading_state.rel_pctg - else None, - section=abs_reading_state.section, - ) + break + index += 1 + + return ReadingState( + content_index=index, + textwidth=abs_reading_state.textwidth, + row=abs_reading_state.row - cumulative_contents_lines + content_lines, + rel_pctg=abs_reading_state.rel_pctg + - ((cumulative_contents_lines - content_lines) / all_contents_lines) + if abs_reading_state.rel_pctg + else None, + section=abs_reading_state.section, + ) -def get_ebook_obj(filepath: str) -> Union[Epub, Mobi, Azw3, FictionBook]: +def get_ebook_obj(filepath: str) -> Ebook: file_ext = os.path.splitext(filepath)[1] if file_ext == ".epub": return Epub(filepath) @@ -1645,7 +1767,7 @@ def safe_curs_set(state: int) -> None: return -def dots_path(curr, tofi): +def dots_path(curr, tofi) -> str: candir = curr.split("/") tofi = tofi.split("/") alld = tofi.count("..") @@ -1660,7 +1782,7 @@ def dots_path(curr, tofi): def find_current_content_index( - toc_entries: Tuple[TocEntry], toc_secid: Mapping[str, int], index: int, y: int + toc_entries: Tuple[TocEntry, ...], toc_secid: Mapping[str, int], index: int, y: int ) -> int: ntoc = 0 for n, toc_entry in enumerate(toc_entries): @@ -1670,19 +1792,21 @@ def find_current_content_index( return ntoc -def count_letters(ebook: Union[Epub, Mobi, Azw3, FictionBook]) -> LettersCount: +def count_letters(ebook: Ebook) -> LettersCount: per_content_counts: List[int] = [] cumulative_counts: List[int] = [] + # assert isinstance(ebook.contents, tuple) for i in ebook.contents: content = ebook.get_raw_text(i) src_lines = parse_html(content) + assert isinstance(src_lines, tuple) cumulative_counts.append(sum(per_content_counts)) per_content_counts.append(sum([len(re.sub("\s", "", j)) for j in src_lines])) return LettersCount(all=sum(per_content_counts), cumulative=tuple(cumulative_counts)) -def count_letters_parallel(ebook: Union[Epub, Mobi, Azw3, FictionBook], child_conn) -> None: +def count_letters_parallel(ebook: Ebook, child_conn) -> None: child_conn.send(count_letters(ebook)) child_conn.close() @@ -1920,9 +2044,15 @@ def text_win(textfunc): class Reader: - def __init__( - self, screen, ebook: Union[Epub, Mobi, Azw3, FictionBook], config: Config, state: State - ): + + # Only implement this when seamless=True + adjust_seamless_reading_state = staticmethod( + lambda reading_state: (_ for _ in ()).throw( + NotImplementedError("Reader.adjust_seamless_reading_state() not implemented") + ) + ) + + def __init__(self, screen, ebook: Ebook, config: Config, state: State): self.setting = config.setting self.keymap = config.keymap @@ -1969,7 +2099,10 @@ class Reader: try: self.ebook.initialize() except Exception as e: - sys.exit("ERROR: Badly-structured ebook.\n" + str(e)) + if DEBUG: + raise e + else: + sys.exit("ERROR: Badly-structured ebook.\n" + str(e)) # state self.state = state @@ -1987,10 +2120,10 @@ class Reader: self.spread = 2 if self.setting.StartWithDoubleSpread else 1 # jumps marker container - self.jump_list: Mapping[str, ReadingState] = dict() + self.jump_list: Dict[str, ReadingState] = dict() # TTS speaker utils - self._tts_speaker: Optional[Type[SpeakerBaseModel]] = construct_speaker( + self._tts_speaker: Optional[SpeakerBaseModel] = construct_speaker( self.setting.PreferredTTSEngine, self.setting.TTSEngineArgs ) self._tts_support: bool = bool(self._tts_speaker) @@ -2044,7 +2177,7 @@ class Reader: return self._ext_dict_app @property - def image_viewer(self) -> str: + def image_viewer(self) -> Optional[str]: self._image_viewer: Optional[str] = None if shutil.which(self.setting.DefaultViewer.split()[0]) is not None: @@ -2099,7 +2232,7 @@ class Reader: return title, msg, key @choice_win() - def toc(self, toc_entries: Tuple[TocEntry], index: int): + def toc(self, toc_entries: Tuple[TocEntry, ...], index: int): return ( "Table of Contents", [i.label for i in toc_entries], @@ -2172,6 +2305,12 @@ class Reader: return retk, idx def input_prompt(self, prompt: str) -> Union[NoUpdate, Key, str]: + """ + :param prompt: prompt text + :return: NoUpdate if cancelled or interrupted + Key if curses.KEY_RESIZE triggered + str for successful input + """ # prevent pad hole when prompting for input while # other window is active # pad.refresh(y, 0, 0, x, rows-2, x+width) @@ -2180,7 +2319,7 @@ class Reader: if self.is_color_supported: stat.bkgd(self.screen.getbkgd()) stat.keypad(True) - curses.echo(1) + curses.echo(True) safe_curs_set(2) init_text = "" @@ -2202,13 +2341,13 @@ class Reader: if ipt == Key(27): stat.clear() stat.refresh() - curses.echo(0) + curses.echo(False) safe_curs_set(0) return NoUpdate() elif ipt == Key(10): stat.clear() stat.refresh() - curses.echo(0) + curses.echo(False) safe_curs_set(0) return init_text elif ipt in (Key(8), Key(127), Key(curses.KEY_BACKSPACE)): @@ -2216,7 +2355,7 @@ class Reader: elif ipt == Key(curses.KEY_RESIZE): stat.clear() stat.refresh() - curses.echo(0) + curses.echo(False) safe_curs_set(0) return Key(curses.KEY_RESIZE) # elif len(init_text) <= maxlen: @@ -2236,13 +2375,16 @@ class Reader: except KeyboardInterrupt: stat.clear() stat.refresh() - curses.echo(0) + curses.echo(False) safe_curs_set(0) return NoUpdate() def searching( self, board: InfiniBoard, src: Sequence[str], reading_state: ReadingState, tot ) -> Union[NoUpdate, ReadingState, Key]: + # reusable loop indices + i: Any + j: Any rows, cols = self.screen.getmaxyx() # unnecessary @@ -2257,9 +2399,11 @@ class Reader: if not self.search_data: candidate_text = self.input_prompt(" Regex:") + # if isinstance(candidate_text, str) and candidate_text != "": if isinstance(candidate_text, str) and candidate_text: self.search_data = SearchData(value=candidate_text) else: + assert isinstance(candidate_text, NoUpdate) or isinstance(candidate_text, Key) return candidate_text found = [] @@ -2293,7 +2437,7 @@ class Reader: row=0, ) else: - s = 0 + s: Union[NoUpdate, Key] = NoUpdate() while True: if s in self.keymap.Quit: self.search_data = None @@ -2344,7 +2488,7 @@ class Reader: sidx = n break - s = 0 + s = NoUpdate() msg = ( " Searching: " + self.search_data.value @@ -2374,7 +2518,7 @@ class Reader: row=0, ) else: - s = 0 + s = NoUpdate() msg = " Finished searching: " + self.search_data.value + " " continue else: @@ -2398,7 +2542,7 @@ class Reader: row=0, ) else: - s = 0 + s = NoUpdate() msg = " Finished searching: " + self.search_data.value + " " continue else: @@ -2516,6 +2660,8 @@ class Reader: self._process_counting_letter.close() def read(self, reading_state: ReadingState) -> ReadingState: + # reusable loop indices + i: Any k = self.keymap.RegexSearch[0] if self.search_data else NoUpdate() rows, cols = self.screen.getmaxyx() @@ -2555,17 +2701,20 @@ class Reader: text_lines=tuple(), image_maps=dict(), section_rows=dict(), formatting=tuple() ) toc_entries_tmp: List[TocEntry] = [] - section_rows_tmp: Mapping[str, int] = dict() - totlines_per_content: Sequence[int] = [] # only defined when Seamless==True + section_rows_tmp: Dict[str, int] = dict() + totlines_per_content: Tuple[int, ...] = tuple() # only defined when Seamless==True for n, content in enumerate(contents): starting_line = sum(totlines_per_content) + assert isinstance(content, str) or isinstance(content, ET.Element) text_structure_tmp = parse_html( self.ebook.get_raw_text(content), textwidth=reading_state.textwidth, section_ids=set(toc_entry.section for toc_entry in toc_entries), starting_line=starting_line, ) - totlines_per_content.append(len(text_structure_tmp.text_lines)) + assert isinstance(text_structure_tmp, TextStructure) + # totlines_per_content.append(len(text_structure_tmp.text_lines)) + totlines_per_content += (len(text_structure_tmp.text_lines),) for toc_entry in toc_entries: if toc_entry.content_index == n: @@ -2583,8 +2732,8 @@ class Reader: text_structure = merge_text_structures(text_structure, text_structure_tmp) # adjustment - contents = [contents[0]] - toc_entries = toc_entries_tmp + contents = (contents[0],) + toc_entries = tuple(toc_entries_tmp) text_structure = dataclasses.replace( text_structure, section_rows={**text_structure.section_rows, **section_rows_tmp} ) @@ -2610,7 +2759,7 @@ class Reader: else: content_path = contents[reading_state.content_index] content = self.ebook.get_raw_text(content_path) - text_structure = parse_html( + text_structure = parse_html( # type: ignore content, textwidth=reading_state.textwidth, section_ids=set(toc_entry.section for toc_entry in toc_entries), @@ -2735,6 +2884,7 @@ class Reader: self.ebook.get_raw_text(contents[reading_state.content_index - 1]), textwidth=reading_state.textwidth, ) + assert isinstance(text_structure_content_before, TextStructure) return ReadingState( content_index=reading_state.content_index - 1, textwidth=reading_state.textwidth, @@ -3156,16 +3306,19 @@ class Reader: elif k in self.keymap.MarkPosition: jumnum = board.getch() - if jumnum in tuple(Key(i) for i in range(48, 58)): + if isinstance(jumnum, Key) and jumnum in tuple( + Key(i) for i in range(48, 58) + ): self.jump_list[jumnum.char] = reading_state else: - k = jumnum + k = NoUpdate() continue elif k in self.keymap.JumpToPosition: jumnum = board.getch() if ( - jumnum in tuple(Key(i) for i in range(48, 58)) + isinstance(jumnum, Key) + and jumnum in tuple(Key(i) for i in range(48, 58)) and jumnum.char in self.jump_list ): marked_reading_state = self.jump_list[jumnum.char] @@ -3336,6 +3489,10 @@ def parse_cli_args() -> str: Try parsing cli args and return filepath of ebook to read or quitting based on args and app state """ + # reusable loop indices + i: Any + j: Any + termc, termr = shutil.get_terminal_size() args = [] @@ -3425,7 +3582,8 @@ def parse_cli_args() -> str: dig = len(str(len(reading_history) + 1)) tcols = termc - dig - 2 for n, i in enumerate(reading_history): - p = i.replace(os.getenv("HOME"), "~") if os.getenv("HOME") else i + homedir = os.getenv("HOME") + p = i.replace(homedir, "~") if homedir else i print( "{}{} {}".format( str(n + 1).rjust(dig), @@ -3450,6 +3608,7 @@ def parse_cli_args() -> str: for i in ebook.contents: content = ebook.get_raw_text(i) src_lines = parse_html(content) + assert isinstance(src_lines, tuple) # sys.stdout.reconfigure(encoding="utf-8") # Python>=3.7 for j in src_lines: sys.stdout.buffer.write((j + "\n\n").encode("utf-8")) |