#!/usr/bin/env python3
# vim:tabstop=4:shiftwidth=4:softtabstop=4:smarttab:expandtab:foldmethod=marker
"""\
usage: epy [-h] [-r] [-d] [-v] [PATH | # | PATTERN | URL]
Read ebook in terminal
positional arguments:
[ PATH | # | PATTERN | URL ]
ebook path, history number, pattern or URL
optional arguments:
-h, --help show this help message and exit
-r, --history print reading history
-d, --dump dump the content of ebook
-v, --version print version and exit
examples:
epy /path/to/ebook read /path/to/ebook file
epy 3 read #3 file from reading history
epy count monte read file matching 'count monte'
from reading history
"""
__version__ = "2022.2.5"
__license__ = "GPL-3.0"
__author__ = "Benawi Adha"
__email__ = "benawiadha@gmail.com"
__url__ = "https://github.com/wustho/epy"
# Imports {{{
import argparse
import base64
import contextlib
import curses
import dataclasses
import hashlib
import json
import multiprocessing
import os
import re
import shutil
import sqlite3
import subprocess
import sys
import tempfile
import textwrap
import uuid
import xml.etree.ElementTree as ET
import zipfile
from typing import Optional, Union, Sequence, Tuple, List, Dict, Mapping, Set, Type, Any
from dataclasses import dataclass, field
from datetime import datetime
from difflib import SequenceMatcher as SM
from enum import Enum
from functools import wraps
from html import unescape
from html.parser import HTMLParser
from pathlib import PurePosixPath
from urllib.error import HTTPError, URLError
from urllib.parse import unquote, urljoin, urlparse
from urllib.request import Request, urlopen
try:
from epy_extras import unpackBook # type: ignore
MOBI_SUPPORT = True
except ModuleNotFoundError:
MOBI_SUPPORT = False
# }}}
# Debug Utils {{{
try:
# Debug swith
# $ DEBUG=1 ./epy.py
DEBUG = int(str(os.getenv("DEBUG"))) == 1
STDSCR = None
def debug(context: int = 5) -> None:
# if not isinstance(STDSCR, curses.window):
# raise RuntimeError("STDSCR not set")
if STDSCR:
curses.nocbreak()
STDSCR.keypad(False) # type: ignore
curses.echo()
curses.endwin()
try:
import ipdb # type: ignore
ipdb.set_trace(context=context)
except ModuleNotFoundError:
breakpoint()
except ValueError:
DEBUG = False
# }}}
# Data Models {{{
# add image viewers here
# sorted by most widely used
VIEWER_PRESET_LIST = (
"feh",
"imv",
"gio",
"gnome-open",
"gvfs-open",
"xdg-open",
"kde-open",
"firefox",
)
class Direction(Enum):
FORWARD = "forward"
BACKWARD = "backward"
class DoubleSpreadPadding(Enum):
LEFT = 10
MIDDLE = 7
RIGHT = 10
@dataclass(frozen=True)
class BookMetadata:
title: Optional[str] = None
creator: Optional[str] = None
description: Optional[str] = None
publisher: Optional[str] = None
date: Optional[str] = None
language: Optional[str] = None
format: Optional[str] = None
identifier: Optional[str] = None
source: Optional[str] = None
@dataclass(frozen=True)
class LibraryItem:
last_read: datetime
filepath: str
title: Optional[str] = None
author: Optional[str] = None
reading_progress: Optional[float] = None
def __str__(self) -> str:
if self.reading_progress is None:
reading_progress_str = "N/A"
else:
reading_progress_str = f"{int(self.reading_progress * 100)}%"
reading_progress_str = reading_progress_str.rjust(4)
book_name: str
filename = self.filepath.replace(os.path.expanduser("~"), "~", 1)
if self.title is not None and self.author is not None:
book_name = f"{self.title} - {self.author} ({filename})"
elif self.title is None and self.author:
book_name = f"{filename} - {self.author}"
else:
book_name = filename
last_read_str = self.last_read.strftime("%I:%M%p %b %d")
return f"{reading_progress_str} {last_read_str}: {book_name}"
@dataclass(frozen=True)
class ReadingState:
"""
Data model for reading state.
`row` has to be explicitly assigned with value
because Seamless feature needs it to adjust from
relative (to book's content index) row to absolute
(to book's entire content) row.
`rel_pctg` and `section` default to None and if
either of them is assigned with value, then it
will be overriding the `row` value.
"""
content_index: int
textwidth: int
row: int
rel_pctg: Optional[float] = None
section: Optional[str] = None
@dataclass(frozen=True)
class SearchData:
direction: Direction = Direction.FORWARD
value: str = ""
@dataclass(frozen=True)
class LettersCount:
"""
all: total letters in book
cumulative: list of total letters for previous contents
eg. let's say cumulative = (0, 50, 89, ...) it means
0 is total cumulative letters of book contents[-1] to contents[0]
50 is total cumulative letters of book contents[0] to contents[1]
89 is total cumulative letters of book contents[0] to contents[2]
"""
all: int
cumulative: Tuple[int, ...]
@dataclass(frozen=True)
class CharPos:
"""
Describes character position in text.
eg. ["Lorem ipsum dolor sit amet,", # row=0
"consectetur adipiscing elit."] # row=1
^CharPos(row=1, col=3)
"""
row: int
col: int
@dataclass(frozen=True)
class TextMark:
"""
Describes marking in text.
eg. Interval [CharPos(row=0, col=3), CharPos(row=1, col=4)]
notice the marking inclusive [] for both side instead of right exclusive [)
"""
start: CharPos
end: Optional[CharPos] = None
def is_valid(self) -> bool:
"""
Assert validity and check if the mark is unterminated
eg.
This is italic text
Missing tag
"""
if self.end is not None:
if self.start.row == self.end.row:
return self.start.col <= self.end.col
else:
return self.start.row < self.end.row
return False
@dataclass(frozen=True)
class TextSpan:
"""
Like TextMark but using span of letters (n_letters)
"""
start: CharPos
n_letters: int
@dataclass(frozen=True)
class InlineStyle:
"""
eg. InlineStyle(attr=curses.A_BOLD, row=3, cols=4, n_letters=3)
"""
row: int
col: int
n_letters: int
attr: int
@dataclass(frozen=True)
class TocEntry:
label: str
content_index: int
section: Optional[str]
@dataclass(frozen=True)
class TextStructure:
"""
Object that describes how the text
should be displayed in screen.
text_lines: ("list of lines", "of text", ...)
image_maps: {line_num: path/to/image/in/ebook/zip}
section_rows: {section_id: line_num}
formatting: (InlineStyle, ...)
"""
text_lines: Tuple[str, ...]
image_maps: Mapping[int, str]
section_rows: Mapping[str, int]
formatting: Tuple[InlineStyle, ...]
@dataclass(frozen=True)
class NoUpdate:
pass
class Key:
"""
Because ord("k") chr(34) are confusing
"""
def __init__(self, char_or_int: Union[str, int]):
self.value: int = char_or_int if isinstance(char_or_int, int) else ord(char_or_int)
self.char: str = char_or_int if isinstance(char_or_int, str) else chr(char_or_int)
def __eq__(self, other: Any) -> bool:
if isinstance(other, Key):
return self.value == other.value
return False
def __ne__(self, other: Any) -> bool:
return self.__eq__(other)
def __hash__(self) -> int:
return hash(self.value)
@dataclass(frozen=True)
class Settings:
DefaultViewer: str = "auto"
DictionaryClient: str = "auto"
ShowProgressIndicator: bool = True
PageScrollAnimation: bool = True
MouseSupport: bool = False
StartWithDoubleSpread: bool = False
# -1 is default terminal fg/bg colors
DefaultColorFG: int = -1
DefaultColorBG: int = -1
DarkColorFG: int = 252
DarkColorBG: int = 235
LightColorFG: int = 238
LightColorBG: int = 253
SeamlessBetweenChapters: bool = False
PreferredTTSEngine: Optional[str] = None
TTSEngineArgs: List[str] = field(default_factory=list)
@dataclass(frozen=True)
class CfgDefaultKeymaps:
ScrollUp: str = "k"
ScrollDown: str = "j"
PageUp: str = "h"
PageDown: str = "l"
# HalfScreenUp: str = "h"
# HalfScreenDown: str
NextChapter: str = "L"
PrevChapter: str = "H"
BeginningOfCh: str = "g"
EndOfCh: str = "G"
Shrink: str = "-"
Enlarge: str = "+"
SetWidth: str = "="
Metadata: str = "M"
DefineWord: str = "d"
TableOfContents: str = "t"
Follow: str = "f"
OpenImage: str = "o"
RegexSearch: str = "/"
ShowHideProgress: str = "s"
MarkPosition: str = "m"
JumpToPosition: str = "`"
AddBookmark: str = "b"
ShowBookmarks: str = "B"
Quit: str = "q"
Help: str = "?"
SwitchColor: str = "c"
TTSToggle: str = "!"
DoubleSpreadToggle: str = "D"
Library: str = "R"
@dataclass(frozen=True)
class CfgBuiltinKeymaps:
ScrollUp: Tuple[int, ...] = (curses.KEY_UP,)
ScrollDown: Tuple[int, ...] = (curses.KEY_DOWN,)
PageUp: Tuple[int, ...] = (curses.KEY_PPAGE, curses.KEY_LEFT)
PageDown: Tuple[int, ...] = (curses.KEY_NPAGE, ord(" "), curses.KEY_RIGHT)
BeginningOfCh: Tuple[int, ...] = (curses.KEY_HOME,)
EndOfCh: Tuple[int, ...] = (curses.KEY_END,)
TableOfContents: Tuple[int, ...] = (9, ord("\t"))
Follow: Tuple[int, ...] = (10,)
Quit: Tuple[int, ...] = (3, 27, 304)
@dataclass(frozen=True)
class Keymap:
# HalfScreenDown: Tuple[Key, ...]
# HalfScreenUp: Tuple[Key, ...]
AddBookmark: Tuple[Key, ...]
BeginningOfCh: Tuple[Key, ...]
DefineWord: Tuple[Key, ...]
DoubleSpreadToggle: Tuple[Key, ...]
EndOfCh: Tuple[Key, ...]
Enlarge: Tuple[Key, ...]
Follow: Tuple[Key, ...]
Help: Tuple[Key, ...]
JumpToPosition: Tuple[Key, ...]
Library: Tuple[Key, ...]
MarkPosition: Tuple[Key, ...]
Metadata: Tuple[Key, ...]
NextChapter: Tuple[Key, ...]
OpenImage: Tuple[Key, ...]
PageDown: Tuple[Key, ...]
PageUp: Tuple[Key, ...]
PrevChapter: Tuple[Key, ...]
Quit: Tuple[Key, ...]
RegexSearch: Tuple[Key, ...]
ScrollDown: Tuple[Key, ...]
ScrollUp: Tuple[Key, ...]
SetWidth: Tuple[Key, ...]
ShowBookmarks: Tuple[Key, ...]
ShowHideProgress: Tuple[Key, ...]
Shrink: Tuple[Key, ...]
SwitchColor: Tuple[Key, ...]
TTSToggle: Tuple[Key, ...]
TableOfContents: Tuple[Key, ...]
# }}}
# Speaker / TTS Engine Wrappers {{{
class SpeakerBaseModel:
cmd: str = "tts_engine_binary"
available: bool = False
def __init__(self, args: List[str] = []):
self.args = args
def speak(self, text: str) -> None:
raise NotImplementedError("Speaker.speak() not implemented")
def is_done(self) -> bool:
raise NotImplementedError("Speaker.is_done() not implemented")
def stop(self) -> None:
raise NotImplementedError("Speaker.stop() not implemented")
def cleanup(self) -> None:
raise NotImplementedError("Speaker.cleanup() not implemented")
class SpeakerMimic(SpeakerBaseModel):
cmd = "mimic"
available = bool(shutil.which("mimic"))
def speak(self, text: str) -> None:
self.process = subprocess.Popen(
[self.cmd, *self.args],
text=True,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT,
)
assert self.process.stdin
self.process.stdin.write(text)
self.process.stdin.close()
def is_done(self) -> bool:
return self.process.poll() is not None
def stop(self) -> None:
self.process.terminate()
# self.process.kill()
def cleanup(self) -> None:
pass
class SpeakerPico(SpeakerBaseModel):
cmd = "pico2wave"
available = all([shutil.which(dep) for dep in ["pico2wave", "play"]])
def speak(self, text: str) -> None:
_, self.tmp_path = tempfile.mkstemp(suffix=".wav")
try:
subprocess.run(
[self.cmd, *self.args, "-w", self.tmp_path, text],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
check=True,
)
except subprocess.CalledProcessError as e:
if "invalid pointer" not in e.output:
sys.exit(e.output)
self.process = subprocess.Popen(
["play", self.tmp_path],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def is_done(self) -> bool:
return self.process.poll() is not None
def stop(self) -> None:
self.process.terminate()
# self.process.kill()
def cleanup(self) -> None:
os.remove(self.tmp_path)
# register wrappers here
SPEAKERS: List[Type[SpeakerBaseModel]] = [SpeakerMimic, SpeakerPico]
# }}}
# Ebooks {{{
class Ebook:
def __init__(self, fileepub: str):
raise NotImplementedError("Ebook.__init__() not implemented")
@property
def path(self) -> str:
return self._path
@path.setter
def path(self, value: str) -> None:
self._path = value
@property
def contents(self) -> Union[Tuple[str, ...], Tuple[ET.Element, ...]]:
return self._contents
@contents.setter
def contents(self, value: Union[Tuple[str, ...], Tuple[ET.Element, ...]]) -> None:
self._contents = value
@property
def toc_entries(self) -> Tuple[TocEntry, ...]:
return self._toc_entries
@toc_entries.setter
def toc_entries(self, value: Tuple[TocEntry, ...]) -> None:
self._toc_entries = value
def get_meta(self) -> BookMetadata:
raise NotImplementedError("Ebook.get_meta() not implemented")
def initialize(self) -> None:
raise NotImplementedError("Ebook.initialize() not implemented")
def get_raw_text(self, content: Union[str, ET.Element]) -> str:
raise NotImplementedError("Ebook.get_raw_text() not implemented")
def get_img_bytestr(self, impath: str) -> Tuple[str, bytes]:
raise NotImplementedError("Ebook.get_img_bytestr() not implemented")
def cleanup(self) -> None:
raise NotImplementedError("Ebook.cleanup() not implemented")
class Epub(Ebook):
NAMESPACE = {
"DAISY": "http://www.daisy.org/z3986/2005/ncx/",
"OPF": "http://www.idpf.org/2007/opf",
"CONT": "urn:oasis:names:tc:opendocument:xmlns:container",
"XHTML": "http://www.w3.org/1999/xhtml",
"EPUB": "http://www.idpf.org/2007/ops",
# Dublin Core
"DC": "http://purl.org/dc/elements/1.1/",
}
def __init__(self, fileepub: str):
self.path: str = os.path.abspath(fileepub)
self.file: Union[zipfile.ZipFile, str] = zipfile.ZipFile(fileepub, "r")
# populate these attributes
# by calling self.initialize()
self.root_filepath: str
self.root_dirpath: str
def get_meta(self) -> BookMetadata:
assert isinstance(self.file, zipfile.ZipFile)
# why self.file.read(self.root_filepath) problematic
# content_opf = ET.fromstring(self.file.open(self.root_filepath).read())
content_opf = ET.parse(self.file.open(self.root_filepath))
return Epub._get_metadata(content_opf)
@staticmethod
def _get_metadata(content_opf: ET.ElementTree) -> BookMetadata:
metadata: Dict[str, Optional[str]] = {}
for field in dataclasses.fields(BookMetadata):
element = content_opf.find(f".//DC:{field.name}", Epub.NAMESPACE)
if element is not None:
metadata[field.name] = element.text
return BookMetadata(**metadata)
@staticmethod
def _get_contents(content_opf: ET.ElementTree) -> Tuple[str, ...]:
# cont = ET.parse(self.file.open(self.root_filepath)).getroot()
manifests: List[Tuple[str, str]] = []
for manifest_elem in content_opf.findall("OPF:manifest/*", Epub.NAMESPACE):
# EPUB3
# if manifest_elem.get("id") != "ncx" and manifest_elem.get("properties") != "nav":
if (
manifest_elem.get("media-type") != "application/x-dtbncx+xml"
and manifest_elem.get("properties") != "nav"
):
manifest_id = manifest_elem.get("id")
assert manifest_id is not None
manifest_href = manifest_elem.get("href")
assert manifest_href is not None
manifests.append((manifest_id, manifest_href))
spines: List[str] = []
contents: List[str] = []
for spine_elem in content_opf.findall("OPF:spine/*", Epub.NAMESPACE):
idref = spine_elem.get("idref")
assert idref is not None
spines.append(idref)
for spine in spines:
for manifest in manifests:
if spine == manifest[0]:
# book_contents.append(root_dirpath + unquote(manifest[1]))
contents.append(unquote(manifest[1]))
manifests.remove(manifest)
# TODO: test is break necessary
break
return tuple(contents)
@staticmethod
def _get_tocs(toc: ET.Element, version: str, contents: Sequence[str]) -> Tuple[TocEntry, ...]:
try:
# EPUB3
if version in {"1.0", "2.0"}:
navPoints = toc.findall("DAISY:navMap//DAISY:navPoint", Epub.NAMESPACE)
elif version == "3.0":
navPoints = toc.findall(
"XHTML:body//XHTML:nav[@EPUB:type='toc']//XHTML:a", Epub.NAMESPACE
)
toc_entries: List[TocEntry] = []
for navPoint in navPoints:
if version in {"1.0", "2.0"}:
src_elem = navPoint.find("DAISY:content", Epub.NAMESPACE)
assert src_elem is not None
src = src_elem.get("src")
name_elem = navPoint.find("DAISY:navLabel/DAISY:text", Epub.NAMESPACE)
assert name_elem is not None
name = name_elem.text
elif version == "3.0":
src_elem = navPoint
assert src_elem is not None
src = src_elem.get("href")
name = "".join(list(navPoint.itertext()))
assert src is not None
src_id = src.split("#")
try:
idx = contents.index(unquote(src_id[0]))
except ValueError:
continue
# assert name is not None
# NOTE: skip empty label
if name is not None:
toc_entries.append(
TocEntry(
label=name,
content_index=idx,
section=src_id[1] if len(src_id) == 2 else None,
)
)
except AttributeError as e:
if DEBUG:
raise e
return tuple(toc_entries)
def initialize(self) -> None:
assert isinstance(self.file, zipfile.ZipFile)
container = ET.parse(self.file.open("META-INF/container.xml"))
rootfile_elem = container.find("CONT:rootfiles/CONT:rootfile", Epub.NAMESPACE)
assert rootfile_elem is not None
self.root_filepath = rootfile_elem.attrib["full-path"]
self.root_dirpath = (
os.path.dirname(self.root_filepath) + "/"
if os.path.dirname(self.root_filepath) != ""
else ""
)
content_opf = ET.parse(self.file.open(self.root_filepath))
version = content_opf.getroot().get("version")
contents = Epub._get_contents(content_opf)
self.contents = tuple(self.root_dirpath + content for content in contents)
if version in {"1.0", "2.0"}:
# "OPF:manifest/*[@id='ncx']"
relative_toc = content_opf.find(
"OPF:manifest/*[@media-type='application/x-dtbncx+xml']", Epub.NAMESPACE
)
elif version == "3.0":
relative_toc = content_opf.find("OPF:manifest/*[@properties='nav']", Epub.NAMESPACE)
else:
raise RuntimeError(f"Unsupported Epub version: {version}")
assert relative_toc is not None
relative_toc_path = relative_toc.get("href")
assert relative_toc_path is not None
toc_path = self.root_dirpath + relative_toc_path
toc = ET.parse(self.file.open(toc_path)).getroot()
self.toc_entries = Epub._get_tocs(toc, version, contents) # *self.contents (absolute path)
def get_raw_text(self, content_path: Union[str, ET.Element]) -> str:
assert isinstance(self.file, zipfile.ZipFile)
assert isinstance(content_path, str)
max_tries: Optional[int] = None # 1 if DEBUG else None
# use try-except block to catch
# zlib.error: Error -3 while decompressing data: invalid distance too far back
# seems like caused by multiprocessing
tries = 0
while True:
try:
content = self.file.open(content_path).read()
break
except Exception as e:
tries += 1
if max_tries is not None and tries >= max_tries:
raise e
return content.decode("utf-8")
def get_img_bytestr(self, impath: str) -> Tuple[str, bytes]:
assert isinstance(self.file, zipfile.ZipFile)
return impath, self.file.read(impath)
def cleanup(self) -> None:
pass
class Mobi(Epub):
def __init__(self, filemobi: str):
self.path = os.path.abspath(filemobi)
self.file = tempfile.mkdtemp(prefix="epy-")
# populate these attribute
# by calling self.initialize()
self.root_filepath: str
self.root_dirpath: str
def get_meta(self) -> BookMetadata:
# why self.file.read(self.root_filepath) problematic
with open(os.path.join(self.root_dirpath, "content.opf")) as f:
content_opf = ET.parse(f) # .getroot()
return Epub._get_metadata(content_opf)
def initialize(self) -> None:
assert isinstance(self.file, str)
with contextlib.redirect_stdout(None):
unpackBook(self.path, self.file, epubver="A", use_hd=True)
# TODO: add cleanup here
self.root_dirpath = os.path.join(self.file, "mobi7")
self.toc_path = os.path.join(self.root_dirpath, "toc.ncx")
version = "2.0"
with open(os.path.join(self.root_dirpath, "content.opf")) as f:
content_opf = ET.parse(f) # .getroot()
contents = Epub._get_contents(content_opf)
self.contents = tuple(os.path.join(self.root_dirpath, content) for content in contents)
with open(self.toc_path) as f:
toc = ET.parse(f).getroot()
self.toc_entries = Epub._get_tocs(toc, version, contents) # *self.contents (absolute path)
def get_raw_text(self, content_path: Union[str, ET.Element]) -> str:
assert isinstance(content_path, str)
with open(content_path, encoding="utf8") as f:
content = f.read()
# return content.decode("utf-8")
return content
def get_img_bytestr(self, impath: str) -> Tuple[str, bytes]:
# TODO: test on windows
# if impath "Images/asdf.png" is problematic
image_abspath = os.path.join(self.root_dirpath, impath)
image_abspath = os.path.normpath(image_abspath) # handle crossplatform path
with open(image_abspath, "rb") as f:
src = f.read()
return impath, src
def cleanup(self) -> None:
assert isinstance(self.file, str)
shutil.rmtree(self.file)
return
class Azw(Epub):
def __init__(self, fileepub):
self.path = os.path.abspath(fileepub)
self.tmpdir = tempfile.mkdtemp(prefix="epy-")
basename, _ = os.path.splitext(os.path.basename(self.path))
self.tmpepub = os.path.join(self.tmpdir, "mobi8", basename + ".epub")
def initialize(self):
with contextlib.redirect_stdout(None):
unpackBook(self.path, self.tmpdir, epubver="A", use_hd=True)
self.file = zipfile.ZipFile(self.tmpepub, "r")
Epub.initialize(self)
def cleanup(self) -> None:
shutil.rmtree(self.tmpdir)
return
class FictionBook(Ebook):
NAMESPACE = {"FB2": "http://www.gribuser.ru/xml/fictionbook/2.0"}
def __init__(self, filefb: str):
self.path = os.path.abspath(filefb)
self.file = filefb
# populate these attribute
# by calling self.initialize()
self.root: ET.Element
def get_meta(self) -> BookMetadata:
title_elem = self.root.find(".//FB2:book-title", FictionBook.NAMESPACE)
first_name_elem = self.root.find(".//FB2:first-name", FictionBook.NAMESPACE)
last_name_elem = self.root.find(".//FB2:last-name", FictionBook.NAMESPACE)
date_elem = self.root.find(".//FB2:date", FictionBook.NAMESPACE)
identifier_elem = self.root.find(".//FB2:id", FictionBook.NAMESPACE)
author = first_name_elem.text if first_name_elem is not None else None
if last_name_elem is not None:
if author is not None and author != "":
author += f" {last_name_elem.text}"
else:
author = last_name_elem.text
return BookMetadata(
title=title_elem.text if title_elem is not None else None,
creator=author,
date=date_elem.text if date_elem is not None else None,
identifier=identifier_elem.text if identifier_elem is not None else None,
)
def initialize(self) -> None:
cont = ET.parse(self.file)
self.root = cont.getroot()
self.contents = tuple(self.root.findall("FB2:body/*", FictionBook.NAMESPACE))
# TODO
toc_entries: List[TocEntry] = []
for n, i in enumerate(self.contents):
title = i.find("FB2:title", FictionBook.NAMESPACE)
if title is not None:
toc_entries.append(
TocEntry(label="".join(title.itertext()), content_index=n, section=None)
)
self.toc_entries = tuple(toc_entries)
def get_raw_text(self, node: Union[str, ET.Element]) -> str:
assert isinstance(node, ET.Element)
ET.register_namespace("", "http://www.gribuser.ru/xml/fictionbook/2.0")
# sys.exit(ET.tostring(node, encoding="utf8", method="html").decode("utf-8").replace("ns1:",""))
return ET.tostring(node, encoding="utf8", method="html").decode("utf-8").replace("ns1:", "")
def get_img_bytestr(self, imgid: str) -> Tuple[str, bytes]:
# TODO: test if image works
imgid = imgid.replace("#", "")
img_elem = self.root.find("*[@id='{}']".format(imgid))
assert img_elem is not None
imgtype = img_elem.get("content-type")
img_elem_text = img_elem.text
assert imgtype is not None
assert img_elem_text is not None
return imgid + "." + imgtype.split("/")[1], base64.b64decode(img_elem_text)
def cleanup(self) -> None:
return
class URL(Ebook):
_header = {
"User-Agent": f"epy/v{__version__}",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.8",
}
def __init__(self, url: str):
self.path = url
self.file = url
self.contents = ("_",)
self.toc_entries = tuple()
def get_meta(self) -> BookMetadata:
return BookMetadata()
def initialize(self) -> None:
try:
with urlopen(Request(self.path, headers=URL._header)) as response:
self.html = response.read().decode()
except HTTPError as e:
raise e
except URLError as e:
raise e
def get_raw_text(self, _) -> str:
return self.html
def get_img_bytestr(self, src: str) -> Tuple[str, bytes]:
image_url = src if is_url(src) else urljoin(self.path, src)
# TODO: catch error on request
with urlopen(Request(image_url, headers=URL._header)) as response:
byte_str = response.read()
return PurePosixPath(urlparse(src).path).name, byte_str
def cleanup(self) -> None:
return
# }}}
# HTML & Text Parser {{{
class HTMLtoLines(HTMLParser):
para = {"p", "div"}
inde = {"q", "dt", "dd", "blockquote"}
pref = {"pre"}
bull = {"li"}
hide = {"script", "style", "head"}
ital = {"i", "em"}
bold = {"b", "strong"}
# hide = {"script", "style", "head", ", "sub}
# sup_lookup = "⁰¹²³⁴⁵⁶⁷⁸⁹"
# sub_lookup = "₀₁₂₃₄₅₆₇₈₉"
attr_bold = curses.A_BOLD
try:
attr_italic = curses.A_ITALIC
except AttributeError:
try:
attr_italic = curses.A_UNDERLINE
except AttributeError:
attr_italic = curses.A_NORMAL
@staticmethod
def _mark_to_spans(text: Sequence[str], marks: Sequence[TextMark]) -> List[TextSpan]:
"""
Convert text marks in line of text to per line text span.
Keeping duplicate spans.
"""
spans: List[TextSpan] = []
for mark in marks:
if mark.is_valid():
# mypy issue, should be handled by mark.is_valid()
assert mark.end is not None
if mark.start.row == mark.end.row:
spans.append(
TextSpan(start=mark.start, n_letters=mark.end.col - mark.start.col)
)
else:
spans.append(
TextSpan(
start=mark.start, n_letters=len(text[mark.start.row]) - mark.start.col
)
)
for nth_line in range(mark.start.row + 1, mark.end.row):
spans.append(
TextSpan(
start=CharPos(row=nth_line, col=0), n_letters=len(text[nth_line])
)
)
spans.append(
TextSpan(start=CharPos(row=mark.end.row, col=0), n_letters=mark.end.col)
)
return spans # list(set(spans))
@staticmethod
def _adjust_wrapped_spans(
wrapped_lines: Sequence[str],
span: TextSpan,
*,
line_adjustment: int = 0,
left_adjustment: int = 0,
) -> List[TextSpan]:
"""
Adjust text span to wrapped lines.
Not perfect, but should be good enough considering
the limitation on commandline interface.
"""
# current_row = span.start.row + line_adjustment
current_row = line_adjustment
start_col = span.start.col
end_col = start_col + span.n_letters
prev = 0 # chars length before current line
spans: List[TextSpan] = []
for n, line in enumerate(wrapped_lines):
# + 1 compensates textwrap.wrap(*args, replace_whitespace=True, drop_whitespace=True)
line_len = len(line) + 1
current = prev + line_len # chars length before next line
# -:unmarked *:marked
# |------*****--------|
if start_col in range(prev, current) and end_col in range(prev, current):
spans.append(
TextSpan(
start=CharPos(row=current_row + n, col=start_col - prev + left_adjustment),
n_letters=span.n_letters,
)
)
# |----------*********|
elif start_col in range(prev, current):
spans.append(
TextSpan(
start=CharPos(row=current_row + n, col=start_col - prev + left_adjustment),
n_letters=current - start_col - 1, # -1: dropped whitespace
)
)
# |********-----------|
elif end_col in range(prev, current):
spans.append(
TextSpan(
start=CharPos(row=current_row + n, col=0 + left_adjustment),
n_letters=end_col - prev + 1, # +1: dropped whitespace
)
)
# |*******************|
elif prev in range(start_col, end_col) and current in range(start_col, end_col):
spans.append(
TextSpan(
start=CharPos(row=current_row + n, col=0 + left_adjustment),
n_letters=line_len - 1, # -1: dropped whitespace
)
)
elif prev > end_col:
break
prev = current
return spans
@staticmethod
def _group_spans_by_row(blocks: Sequence[TextSpan]) -> Mapping[int, List[TextSpan]]:
groups: Dict[int, List[TextSpan]] = {}
for block in blocks:
row = block.start.row
if row in groups:
groups[row].append(block)
else:
groups[row] = [block]
return groups
def __init__(self, sects={""}):
HTMLParser.__init__(self)
self.text = [""]
self.ishead = False
self.isinde = False
self.isbull = False
self.ispref = False
self.ishidden = False
self.idhead = set()
self.idinde = set()
self.idbull = set()
self.idpref = set()
self.idimgs = set()
self.sects = sects
self.sectsindex = {}
self.italic_marks: List[TextMark] = []
self.bold_marks: List[TextMark] = []
self.imgs: Dict[int, str] = dict()
def handle_starttag(self, tag, attrs):
if re.match("h[1-6]", tag) is not None:
self.ishead = True
elif tag in self.inde:
self.isinde = True
elif tag in self.pref:
self.ispref = True
elif tag in self.bull:
self.isbull = True
elif tag in self.hide:
self.ishidden = True
elif tag == "sup":
self.text[-1] += "^{"
elif tag == "sub":
self.text[-1] += "_{"
# NOTE: "img" and "image"
# In HTML, both are startendtag (no need endtag)
# but in XHTML both need endtag
elif tag in {"img", "image"}:
for i in attrs:
if (tag == "img" and i[0] == "src") or (tag == "image" and i[0].endswith("href")):
this_line = len(self.text)
self.idimgs.add(this_line)
self.imgs[this_line] = unquote(i[1])
self.text.append("[IMAGE]")
# formatting
elif tag in self.ital:
if len(self.italic_marks) == 0 or self.italic_marks[-1].is_valid():
char_pos = CharPos(row=len(self.text) - 1, col=len(self.text[-1]))
self.italic_marks.append(TextMark(start=char_pos))
elif tag in self.bold:
if len(self.bold_marks) == 0 or self.bold_marks[-1].is_valid():
char_pos = CharPos(row=len(self.text) - 1, col=len(self.text[-1]))
self.bold_marks.append(TextMark(start=char_pos))
if self.sects != {""}:
for i in attrs:
if i[0] == "id" and i[1] in self.sects:
# self.text[-1] += " (#" + i[1] + ") "
# self.sectsindex.append([len(self.text), i[1]])
self.sectsindex[len(self.text) - 1] = i[1]
def handle_startendtag(self, tag, attrs):
if tag == "br":
self.text += [""]
elif tag in {"img", "image"}:
for i in attrs:
# if (tag == "img" and i[0] == "src")\
# or (tag == "image" and i[0] == "xlink:href"):
if (tag == "img" and i[0] == "src") or (tag == "image" and i[0].endswith("href")):
this_line = len(self.text)
self.idimgs.add(this_line)
self.imgs[this_line] = unquote(i[1])
self.text.append("[IMAGE]")
self.text.append("")
# sometimes attribute "id" is inside "startendtag"
# especially html from mobi module (kindleunpack fork)
if self.sects != {""}:
for i in attrs:
if i[0] == "id" and i[1] in self.sects:
# self.text[-1] += " (#" + i[1] + ") "
self.sectsindex[len(self.text) - 1] = i[1]
def handle_endtag(self, tag):
if re.match("h[1-6]", tag) is not None:
self.text.append("")
self.text.append("")
self.ishead = False
elif tag in self.para:
self.text.append("")
elif tag in self.hide:
self.ishidden = False
elif tag in self.inde:
if self.text[-1] != "":
self.text.append("")
self.isinde = False
elif tag in self.pref:
if self.text[-1] != "":
self.text.append("")
self.ispref = False
elif tag in self.bull:
if self.text[-1] != "":
self.text.append("")
self.isbull = False
elif tag in {"sub", "sup"}:
self.text[-1] += "}"
elif tag in {"img", "image"}:
self.text.append("")
# formatting
elif tag in self.ital:
char_pos = CharPos(row=len(self.text) - 1, col=len(self.text[-1]))
last_mark = self.italic_marks[-1]
self.italic_marks[-1] = dataclasses.replace(last_mark, end=char_pos)
elif tag in self.bold:
char_pos = CharPos(row=len(self.text) - 1, col=len(self.text[-1]))
last_mark = self.bold_marks[-1]
self.bold_marks[-1] = dataclasses.replace(last_mark, end=char_pos)
def handle_data(self, raw):
if raw and not self.ishidden:
if self.text[-1] == "":
tmp = raw.lstrip()
else:
tmp = raw
if self.ispref:
line = unescape(tmp)
else:
line = unescape(re.sub(r"\s+", " ", tmp))
self.text[-1] += line
if self.ishead:
self.idhead.add(len(self.text) - 1)
elif self.isbull:
self.idbull.add(len(self.text) - 1)
elif self.isinde:
self.idinde.add(len(self.text) - 1)
elif self.ispref:
self.idpref.add(len(self.text) - 1)
def get_structured_text(
self, textwidth: Optional[int] = 0, starting_line: int = 0
) -> Union[Tuple[str, ...], TextStructure]:
if not textwidth:
return tuple(self.text)
# reusable loop indices
i: Any
text: List[str] = []
images: Dict[int, str] = dict() # {line_num: path/in/zip}
sect: Dict[str, int] = dict() # {section_id: line_num}
formatting: List[InlineStyle] = []
italic_spans: List[TextSpan] = HTMLtoLines._mark_to_spans(self.text, self.italic_marks)
bold_spans: List[TextSpan] = HTMLtoLines._mark_to_spans(self.text, self.bold_marks)
italic_groups = HTMLtoLines._group_spans_by_row(italic_spans)
bold_groups = HTMLtoLines._group_spans_by_row(bold_spans)
for n, line in enumerate(self.text):
startline = len(text)
# findsect = re.search(r"(?<= \(#).*?(?=\) )", line)
# if findsect is not None and findsect.group() in self.sects:
# line = line.replace(" (#" + findsect.group() + ") ", "")
# # line = line.replace(" (#" + findsect.group() + ") ", " "*(5+len(findsect.group())))
# sect[findsect.group()] = len(text)
if n in self.sectsindex.keys():
sect[self.sectsindex[n]] = starting_line + len(text)
if n in self.idhead:
# text += [line.rjust(textwidth // 2 + len(line) // 2)] + [""]
text += [line.center(textwidth)] + [""]
formatting += [
InlineStyle(
row=starting_line + i, col=0, n_letters=len(text[i]), attr=self.attr_bold
)
for i in range(startline, len(text))
]
elif n in self.idinde:
text += [" " + i for i in textwrap.wrap(line, textwidth - 3)] + [""]
elif n in self.idbull:
tmp = textwrap.wrap(line, textwidth - 3)
text += [" - " + i if i == tmp[0] else " " + i for i in tmp] + [""]
elif n in self.idpref:
tmp = line.splitlines()
wraptmp = []
for tmp_line in tmp:
wraptmp += [i for i in textwrap.wrap(tmp_line, textwidth - 6)]
text += [" " + i for i in wraptmp] + [""]
elif n in self.idimgs:
images[starting_line + len(text)] = self.imgs[n]
text += [line.center(textwidth)]
formatting += [
InlineStyle(
row=starting_line + len(text) - 1,
col=0,
n_letters=len(text[-1]),
attr=self.attr_bold,
)
]
text += [""]
else:
text += textwrap.wrap(line, textwidth) + [""]
endline = len(text) # -1
left_adjustment = 3 if n in self.idbull | self.idinde else 0
for spans in italic_groups.get(n, []):
italics = HTMLtoLines._adjust_wrapped_spans(
text[startline:endline],
spans,
line_adjustment=startline,
left_adjustment=left_adjustment,
)
for span in italics:
formatting.append(
InlineStyle(
row=starting_line + span.start.row,
col=span.start.col,
n_letters=span.n_letters,
attr=self.attr_italic,
)
)
for spans in bold_groups.get(n, []):
bolds = HTMLtoLines._adjust_wrapped_spans(
text[startline:endline],
spans,
line_adjustment=startline,
left_adjustment=left_adjustment,
)
for span in bolds:
formatting.append(
InlineStyle(
row=starting_line + span.start.row,
col=span.start.col,
n_letters=span.n_letters,
attr=self.attr_bold,
)
)
# chapter suffix
text += ["***".center(textwidth)]
return TextStructure(
text_lines=tuple(text),
image_maps=images,
section_rows=sect,
formatting=tuple(formatting),
)
# }}}
# App Configuration {{{
class AppData:
@property
def prefix(self) -> Optional[str]:
"""Return None if there exists no homedir | userdir"""
prefix: Optional[str] = None
# UNIX filesystem
homedir = os.getenv("HOME")
# WIN filesystem
userdir = os.getenv("USERPROFILE")
if homedir:
if os.path.isdir(os.path.join(homedir, ".config")):
prefix = os.path.join(homedir, ".config", "epy")
else:
prefix = os.path.join(homedir, ".epy")
elif userdir:
prefix = os.path.join(userdir, ".epy")
if prefix:
os.makedirs(prefix, exist_ok=True)
return prefix
class Config(AppData):
def __init__(self):
setting_dict = dataclasses.asdict(Settings())
keymap_dict = dataclasses.asdict(CfgDefaultKeymaps())
keymap_builtin_dict = dataclasses.asdict(CfgBuiltinKeymaps())
if os.path.isfile(self.filepath):
with open(self.filepath) as f:
cfg_user = json.load(f)
setting_dict = Config.update_dict(setting_dict, cfg_user["Setting"])
keymap_dict = Config.update_dict(keymap_dict, cfg_user["Keymap"])
else:
self.save({"Setting": setting_dict, "Keymap": keymap_dict})
keymap_dict_tuple = {k: tuple(v) for k, v in keymap_dict.items()}
keymap_updated = {
k: tuple([Key(i) for i in v])
for k, v in Config.update_keys_tuple(keymap_dict_tuple, keymap_builtin_dict).items()
}
if sys.platform == "win32":
setting_dict["PageScrollAnimation"] = False
self.setting = Settings(**setting_dict)
self.keymap = Keymap(**keymap_updated)
# to build help menu text
self.keymap_user_dict = keymap_dict
@property
def filepath(self) -> str:
return os.path.join(self.prefix, "configuration.json") if self.prefix else os.devnull
def save(self, cfg_dict):
with open(self.filepath, "w") as file:
json.dump(cfg_dict, file, indent=2)
@staticmethod
def update_dict(
old_dict: Mapping[str, Union[str, int, bool]],
new_dict: Mapping[str, Union[str, int, bool]],
place_new=False,
) -> Mapping[str, Union[str, int, bool]]:
"""Returns a copy of `old_dict` after updating it with `new_dict`"""
result = {**old_dict}
for k, v in new_dict.items():
if k in result:
result[k] = new_dict[k]
elif place_new:
result[k] = new_dict[k]
return result
@staticmethod
def update_keys_tuple(
old_keys: Mapping[str, Tuple[str, ...]],
new_keys: Mapping[str, Tuple[str, ...]],
place_new: bool = False,
) -> Mapping[str, Tuple[str, ...]]:
"""Returns a copy of `old_keys` after updating it with `new_keys`
by appending the tuple value and removes duplicate"""
result = {**old_keys}
for k, v in new_keys.items():
if k in result:
result[k] = tuple(set(result[k] + new_keys[k]))
elif place_new:
result[k] = tuple(set(new_keys[k]))
return result
class State(AppData):
"""
Use sqlite3 instead of JSON (in older version)
to shift the weight from memory to process
"""
def __init__(self):
if not os.path.isfile(self.filepath):
self.init_db()
@property
def filepath(self) -> str:
return os.path.join(self.prefix, "states.db") if self.prefix else os.devnull
def get_from_history(self) -> List[LibraryItem]:
try:
conn = sqlite3.connect(self.filepath)
cur = conn.cursor()
cur.execute(
"""
SELECT last_read, filepath, title, author, reading_progress
FROM library ORDER BY last_read DESC
"""
)
results = cur.fetchall()
library_items: List[LibraryItem] = []
for result in results:
library_items.append(
LibraryItem(
last_read=datetime.fromisoformat(result[0]),
filepath=result[1],
title=result[2],
author=result[3],
reading_progress=result[4],
)
)
return library_items
finally:
conn.close()
def delete_from_library(self, filepath: str) -> None:
try:
conn = sqlite3.connect(self.filepath)
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("DELETE FROM reading_states WHERE filepath=?", (filepath,))
conn.commit()
finally:
conn.close()
def get_last_read(self) -> Optional[str]:
library = self.get_from_history()
return library[0].filepath if library else None
def update_library(self, ebook: Ebook, reading_progress: Optional[float]) -> None:
try:
metadata = ebook.get_meta()
conn = sqlite3.connect(self.filepath)
conn.execute(
"""
INSERT OR REPLACE INTO library (filepath, title, author, reading_progress)
VALUES (?, ?, ?, ?)
""",
(ebook.path, metadata.title, metadata.creator, reading_progress),
)
conn.commit()
finally:
conn.close()
def get_last_reading_state(self, ebook: Ebook) -> ReadingState:
try:
conn = sqlite3.connect(self.filepath)
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute("SELECT * FROM reading_states WHERE filepath=?", (ebook.path,))
result = cur.fetchone()
if result:
result = dict(result)
del result["filepath"]
return ReadingState(**result, section=None)
return ReadingState(content_index=0, textwidth=80, row=0, rel_pctg=None, section=None)
finally:
conn.close()
def set_last_reading_state(self, ebook: Ebook, reading_state: ReadingState) -> None:
try:
conn = sqlite3.connect(self.filepath)
conn.execute(
"""
INSERT OR REPLACE INTO reading_states
VALUES (:filepath, :content_index, :textwidth, :row, :rel_pctg)
""",
{"filepath": ebook.path, **dataclasses.asdict(reading_state)},
)
conn.commit()
finally:
conn.close()
def insert_bookmark(self, ebook: Ebook, name: str, reading_state: ReadingState) -> None:
try:
conn = sqlite3.connect(self.filepath)
conn.execute(
"""
INSERT INTO bookmarks
VALUES (:id, :filepath, :name, :content_index, :textwidth, :row, :rel_pctg)
""",
{
"id": hashlib.sha1(f"{ebook.path}{name}".encode()).hexdigest()[:10],
"filepath": ebook.path,
"name": name,
**dataclasses.asdict(reading_state),
},
)
conn.commit()
finally:
conn.close()
def delete_bookmark(self, ebook: Ebook, name: str) -> None:
try:
conn = sqlite3.connect(self.filepath)
conn.execute("DELETE FROM bookmarks WHERE filepath=? AND name=?", (ebook.path, name))
conn.commit()
finally:
conn.close()
def get_bookmarks(self, ebook: Ebook) -> List[Tuple[str, ReadingState]]:
try:
conn = sqlite3.connect(self.filepath)
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute("SELECT * FROM bookmarks WHERE filepath=?", (ebook.path,))
results = cur.fetchall()
bookmarks: List[Tuple[str, ReadingState]] = []
for result in results:
tmp_dict = dict(result)
name = tmp_dict["name"]
tmp_dict = {
k: v
for k, v in tmp_dict.items()
if k in ("content_index", "textwidth", "row", "rel_pctg")
}
bookmarks.append((name, ReadingState(**tmp_dict)))
return bookmarks
finally:
conn.close()
def init_db(self) -> None:
try:
conn = sqlite3.connect(self.filepath)
conn.executescript(
"""
CREATE TABLE reading_states (
filepath TEXT PRIMARY KEY,
content_index INTEGER,
textwidth INTEGER,
row INTEGER,
rel_pctg REAL
);
CREATE TABLE library (
last_read DATETIME DEFAULT (datetime('now','localtime')),
filepath TEXT PRIMARY KEY,
title TEXT,
author TEXT,
reading_progress REAL,
FOREIGN KEY (filepath) REFERENCES reading_states(filepath)
ON DELETE CASCADE
);
CREATE TABLE bookmarks (
id TEXT PRIMARY KEY,
filepath TEXT,
name TEXT,
content_index INTEGER,
textwidth INTEGER,
row INTEGER,
rel_pctg REAL,
FOREIGN KEY (filepath) REFERENCES reading_states(filepath)
ON DELETE CASCADE
);
"""
)
conn.commit()
finally:
conn.close()
# }}}
# Text Board {{{
class InfiniBoard:
"""
Wrapper for curses screen to render infinite texts.
The idea is instead of pre render all the text before reading,
this will only renders part of text on demand by which available
page on screen.
And what this does is only drawing text/string on curses screen
without .clear() or .refresh() to optimize performance.
"""
def __init__(
self,
screen,
text: Tuple[str, ...],
textwidth: int = 80,
default_style: Tuple[InlineStyle, ...] = tuple(),
spread: int = 1,
):
self.screen = screen
self.screen_rows, self.screen_cols = self.screen.getmaxyx()
self.textwidth = textwidth
self.x = ((self.screen_cols - self.textwidth) // 2) + 1
self.text = text
self.total_lines = len(text)
self.default_style: Tuple[InlineStyle, ...] = default_style
self.temporary_style: Tuple[InlineStyle, ...] = ()
self.spread = spread
if self.spread == 2:
self.x = DoubleSpreadPadding.LEFT.value
self.x_alt = (
DoubleSpreadPadding.LEFT.value + self.textwidth + DoubleSpreadPadding.MIDDLE.value
)
def feed_temporary_style(self, styles: Optional[Tuple[InlineStyle, ...]] = None) -> None:
"""Reset styling if `styles` is None"""
self.temporary_style = styles if styles else ()
def render_styles(
self, row: int, styles: Tuple[InlineStyle, ...] = (), bottom_padding: int = 0
) -> None:
for i in styles:
if i.row in range(row, row + self.screen_rows - bottom_padding):
self.chgat(row, i.row, i.col, i.n_letters, self.screen.getbkgd() | i.attr)
if self.spread == 2 and i.row in range(
row + self.screen_rows - bottom_padding,
row + 2 * (self.screen_rows - bottom_padding),
):
self.chgat(
row,
i.row - (self.screen_rows - bottom_padding),
-self.x + self.x_alt + i.col,
i.n_letters,
self.screen.getbkgd() | i.attr,
)
def getch(self) -> Union[NoUpdate, Key]:
input = self.screen.getch()
if input == -1:
return NoUpdate()
return Key(input)
def getbkgd(self):
return self.screen.getbkgd()
def chgat(self, row: int, y: int, x: int, n: int, attr: int) -> None:
self.screen.chgat(y - row, self.x + x, n, attr)
def write(self, row: int, bottom_padding: int = 0) -> None:
for n_row in range(min(self.screen_rows - bottom_padding, self.total_lines - row)):
text_line = self.text[row + n_row]
self.screen.addstr(n_row, self.x, text_line)
if (
self.spread == 2
and row + self.screen_rows - bottom_padding + n_row < self.total_lines
):
text_line = self.text[row + self.screen_rows - bottom_padding + n_row]
# TODO: clean this up
if re.search("\\[IMG:[0-9]+\\]", text_line):
self.screen.addstr(
n_row, self.x_alt, text_line.center(self.textwidth), curses.A_BOLD
)
else:
self.screen.addstr(n_row, self.x_alt, text_line)
self.render_styles(row, self.default_style, bottom_padding)
self.render_styles(row, self.temporary_style, bottom_padding)
# self.screen.refresh()
def write_n(
self,
row: int,
n: int = 1,
direction: Direction = Direction.FORWARD,
bottom_padding: int = 0,
) -> None:
assert n > 0
for n_row in range(min(self.screen_rows - bottom_padding, self.total_lines - row)):
text_line = self.text[row + n_row]
if direction == Direction.FORWARD:
# self.screen.addnstr(n_row, self.x + self.textwidth - n, self.text[row+n_row], n)
# `+ " " * (self.textwidth - len(self.text[row + n_row]))` is workaround to
# to prevent curses trace because not calling screen.clear()
self.screen.addnstr(
n_row,
self.x + self.textwidth - n,
text_line + " " * (self.textwidth - len(text_line)),
n,
)
if (
self.spread == 2
and row + self.screen_rows - bottom_padding + n_row < self.total_lines
):
text_line_alt = self.text[row + n_row + self.screen_rows - bottom_padding]
self.screen.addnstr(
n_row,
self.x_alt + self.textwidth - n,
text_line_alt + " " * (self.textwidth - len(text_line_alt)),
n,
)
else:
if text_line[self.textwidth - n :]:
self.screen.addnstr(n_row, self.x, text_line[self.textwidth - n :], n)
if (
self.spread == 2
and row + self.screen_rows - bottom_padding + n_row < self.total_lines
):
text_line_alt = self.text[row + n_row + self.screen_rows - bottom_padding]
self.screen.addnstr(
n_row,
self.x_alt,
text_line_alt[self.textwidth - n :],
n,
)
# }}}
# Helpers & Utils {{{
def coerce_to_int(string: str) -> Optional[int]:
try:
return int(string)
except ValueError:
return None
def cleanup_library(state: State) -> None:
"""Cleanup non-existent file from library"""
library_items = state.get_from_history()
for item in library_items:
if not os.path.isfile(item.filepath) and not is_url(item.filepath):
state.delete_from_library(item.filepath)
def get_nth_file_from_library(state: State, n) -> Optional[LibraryItem]:
library_items = state.get_from_history()
try:
return library_items[n - 1]
except IndexError:
return None
def get_matching_library_item(
state: State, pattern: str, threshold: float = 0.5
) -> Optional[LibraryItem]:
matches: List[Tuple[LibraryItem, float]] = [] # [(library_item, match_value), ...]
library_items = state.get_from_history()
if not library_items:
return None
for item in library_items:
tomatch = f"{item.title} - {item.author}" # item.filepath
match_value = sum(
[i.size for i in SM(None, tomatch.lower(), pattern.lower()).get_matching_blocks()]
) / float(len(pattern))
matches.append(
(
item,
match_value,
)
)
sorted_matches = sorted(matches, key=lambda x: -x[1])
first_match_item, first_match_value = sorted_matches[0]
if first_match_item and first_match_value >= threshold:
return first_match_item
else:
return None
def print_reading_history(state: State) -> None:
termc, _ = shutil.get_terminal_size()
library_items = state.get_from_history()
if not library_items:
print("No Reading History.")
return
print("Reading History:")
dig = len(str(len(library_items) + 1))
tcols = termc - dig - 2
for n, item in enumerate(library_items):
print(
"{} {}".format(
str(n + 1).rjust(dig),
truncate(str(item), "...", tcols, tcols - 3),
)
)
def is_url(string: str) -> bool:
try:
tmp = urlparse(string)
return all([tmp.scheme, tmp.netloc])
except ValueError:
return False
def construct_speaker(
preferred: Optional[str] = None, args: List[str] = []
) -> Optional[SpeakerBaseModel]:
speakers = (
sorted(SPEAKERS, key=lambda x: int(x.cmd == preferred), reverse=True)
if preferred
else SPEAKERS
)
speaker = next((speaker for speaker in speakers if speaker.available), None)
return speaker(args) if speaker else None
def parse_html(
html_src: str,
*,
textwidth: Optional[int] = None,
section_ids: Optional[Set[str]] = None,
starting_line: int = 0,
) -> Union[Tuple[str, ...], TextStructure]:
"""
Parse html string into TextStructure
:param html_src: html str to parse
:param textwidth: textwidth to count max length of returned TextStructure
if None given, sequence of text as paragraph is returned
:param section_ids: set of section ids to look for inside html tag attr
:return: Tuple[str, ...] if textwidth not given else TextStructure
"""
if not section_ids:
section_ids = set()
parser = HTMLtoLines(section_ids)
# try:
parser.feed(html_src)
parser.close()
# except:
# pass
return parser.get_structured_text(textwidth, starting_line)
def dump_ebook_content(filepath: str) -> None:
ebook = get_ebook_obj(filepath)
try:
try:
ebook.initialize()
except Exception as e:
sys.exit("ERROR: Badly-structured ebook.\n" + str(e))
for i in ebook.contents:
content = ebook.get_raw_text(i)
src_lines = parse_html(content)
assert isinstance(src_lines, tuple)
# sys.stdout.reconfigure(encoding="utf-8") # Python>=3.7
for j in src_lines:
sys.stdout.buffer.write((j + "\n\n").encode("utf-8"))
finally:
ebook.cleanup()
def merge_text_structures(
text_structure_first: TextStructure, text_structure_second: TextStructure
) -> TextStructure:
return TextStructure(
text_lines=text_structure_first.text_lines + text_structure_second.text_lines,
image_maps={**text_structure_first.image_maps, **text_structure_second.image_maps},
section_rows={**text_structure_first.section_rows, **text_structure_second.section_rows},
formatting=text_structure_first.formatting + text_structure_second.formatting,
)
def construct_relative_reading_state(
abs_reading_state: ReadingState, totlines_per_content: Sequence[int]
) -> ReadingState:
"""
:param abs_reading_state: ReadingState absolute to whole book when Setting.Seamless==True
:param totlines_per_content: sequence of total lines per book content
:return: new ReadingState relative to per content of the book
"""
index = 0
cumulative_contents_lines = 0
all_contents_lines = sum(totlines_per_content)
# for n, content_lines in enumerate(totlines_per_content):
# cumulative_contents_lines += content_lines
# if cumulative_contents_lines > abs_reading_state.row:
# return
while True:
content_lines = totlines_per_content[index]
cumulative_contents_lines += content_lines
if cumulative_contents_lines > abs_reading_state.row:
break
index += 1
return ReadingState(
content_index=index,
textwidth=abs_reading_state.textwidth,
row=abs_reading_state.row - cumulative_contents_lines + content_lines,
rel_pctg=abs_reading_state.rel_pctg
- ((cumulative_contents_lines - content_lines) / all_contents_lines)
if abs_reading_state.rel_pctg
else None,
section=abs_reading_state.section,
)
def get_ebook_obj(filepath: str) -> Ebook:
file_ext = os.path.splitext(filepath)[1].lower()
if is_url(filepath):
return URL(filepath)
elif file_ext in {".epub", ".epub3"}:
return Epub(filepath)
elif file_ext == ".fb2":
return FictionBook(filepath)
elif MOBI_SUPPORT and file_ext == ".mobi":
return Mobi(filepath)
elif MOBI_SUPPORT and file_ext in {".azw", ".azw3"}:
return Azw(filepath)
elif not MOBI_SUPPORT and file_ext in {".mobi", ".azw3"}:
sys.exit(
"ERROR: Format not supported. (Supported: epub, fb2). "
"To get mobi and azw3 support, install epy via pip. "
)
else:
sys.exit("ERROR: Format not supported. (Supported: epub, fb2)")
def tuple_subtract(tuple_one: Tuple[Any, ...], tuple_two: Tuple[Any, ...]) -> Tuple[Any, ...]:
"""
Returns tuple with members in tuple_one
but not in tuple_two
"""
return tuple(i for i in tuple_one if i not in tuple_two)
def pgup(current_row: int, window_height: int, counter: int = 1) -> int:
if current_row >= (window_height) * counter:
return current_row - (window_height) * counter
else:
return 0
def pgdn(current_row: int, total_lines: int, window_height: int, counter: int = 1) -> int:
if current_row + (window_height * counter) <= total_lines - window_height:
return current_row + (window_height * counter)
else:
current_row = total_lines - window_height
if current_row < 0:
return 0
return current_row
def pgend(total_lines: int, window_height: int) -> int:
if total_lines - window_height >= 0:
return total_lines - window_height
else:
return 0
def truncate(teks: str, subtitution_text: str, maxlen: int, startsub: int = 0) -> str:
"""
Truncate text
eg.
:param teks: 'This is long silly dummy text'
:param subtitution_text: '...'
:param maxlen: 12
:param startsub: 3
:return: 'This...ly dummy text'
"""
if startsub > maxlen:
raise ValueError("Var startsub cannot be bigger than maxlen.")
elif len(teks) <= maxlen:
return teks
else:
lensu = len(subtitution_text)
beg = teks[:startsub]
mid = (
subtitution_text
if lensu <= maxlen - startsub
else subtitution_text[: maxlen - startsub]
)
end = teks[startsub + lensu - maxlen :] if lensu < maxlen - startsub else ""
return beg + mid + end
def safe_curs_set(state: int) -> None:
try:
curses.curs_set(state)
except:
return
def resolve_path(current_dir: str, relative_path: str) -> str:
"""
Resolve path containing dots
eg. '/foo/bar/book.html' + '../img.png' = '/foo/img.png'
NOTE: '/' suffix is important to tell that current dir in 'bar'
"""
# can also using os.path.normpath()
# but if the image in zipfile then posix path is mandatory
return urljoin(current_dir, relative_path)
def find_current_content_index(
toc_entries: Tuple[TocEntry, ...], toc_secid: Mapping[str, int], index: int, y: int
) -> int:
ntoc = 0
for n, toc_entry in enumerate(toc_entries):
if toc_entry.content_index <= index:
if y >= toc_secid.get(toc_entry.section, 0): # type: ignore
ntoc = n
return ntoc
def count_letters(ebook: Ebook) -> LettersCount:
per_content_counts: List[int] = []
cumulative_counts: List[int] = []
# assert isinstance(ebook.contents, tuple)
for i in ebook.contents:
content = ebook.get_raw_text(i)
src_lines = parse_html(content)
assert isinstance(src_lines, tuple)
cumulative_counts.append(sum(per_content_counts))
per_content_counts.append(sum([len(re.sub(r"\s", "", j)) for j in src_lines]))
return LettersCount(all=sum(per_content_counts), cumulative=tuple(cumulative_counts))
def count_letters_parallel(ebook: Ebook, child_conn) -> None:
child_conn.send(count_letters(ebook))
child_conn.close()
def choice_win(allowdel=False):
"""
Conjure options window by wrapping a window function
which has a return type of tuple in the form of
(title, list_to_chose, initial_active_index, windows_key_to_toggle)
and return tuple of (returned_key, chosen_index, chosen_index_to_delete)
"""
def inner_f(listgen):
@wraps(listgen)
def wrapper(self, *args, **kwargs):
rows, cols = self.screen.getmaxyx()
hi, wi = rows - 4, cols - 4
Y, X = 2, 2
chwin = curses.newwin(hi, wi, Y, X)
if self.is_color_supported:
chwin.bkgd(self.screen.getbkgd())
title, ch_list, index, key = listgen(self, *args, **kwargs)
if len(title) > cols - 8:
title = title[: cols - 8]
chwin.box()
chwin.keypad(True)
chwin.addstr(1, 2, title)
chwin.addstr(2, 2, "-" * len(title))
if allowdel:
chwin.addstr(3, 2, "HINT: Press 'd' to delete.")
key_chwin = 0
totlines = len(ch_list)
chwin.refresh()
pad = curses.newpad(totlines, wi - 2)
if self.is_color_supported:
pad.bkgd(self.screen.getbkgd())
pad.keypad(True)
padhi = rows - 5 - Y - 4 + 1 - (1 if allowdel else 0)
# padhi = rows - 5 - Y - 4 + 1 - 1
y = 0
if index in range(padhi // 2, totlines - padhi // 2):
y = index - padhi // 2 + 1
span = []
for n, i in enumerate(ch_list):
# strs = " " + str(n+1).rjust(d) + " " + i[0]
# remove newline from choice entries
# mostly happens in FictionBook (.fb2) format
strs = " " + i.replace("\n", " ")
strs = strs[0 : wi - 3]
pad.addstr(n, 0, strs)
span.append(len(strs))
countstring = ""
while key_chwin not in self.keymap.Quit + key:
if countstring == "":
count = 1
else:
count = int(countstring)
if key_chwin in tuple(Key(i) for i in range(48, 58)): # i.e., k is a numeral
countstring = countstring + key_chwin.char
else:
if key_chwin in self.keymap.ScrollUp + self.keymap.PageUp:
index -= count
if index < 0:
index = 0
elif key_chwin in self.keymap.ScrollDown or key_chwin in self.keymap.PageDown:
index += count
if index + 1 >= totlines:
index = totlines - 1
elif key_chwin in self.keymap.Follow:
chwin.clear()
chwin.refresh()
return None, index, None
elif key_chwin in self.keymap.BeginningOfCh:
index = 0
elif key_chwin in self.keymap.EndOfCh:
index = totlines - 1
elif key_chwin == Key("D") and allowdel:
return None, (0 if index == 0 else index - 1), index
# chwin.redrawwin()
# chwin.refresh()
elif key_chwin == Key("d") and allowdel:
resk, resp, _ = self.show_win_options(
"Delete '{}'?".format(ch_list[index]),
["(Y)es", "(N)o"],
0,
(Key("n"),),
)
if resk is not None:
key_chwin = resk
continue
elif resp == 0:
return None, (0 if index == 0 else index - 1), index
chwin.redrawwin()
chwin.refresh()
elif key_chwin in {Key(i) for i in ["Y", "y", "N", "n"]} and ch_list == [
"(Y)es",
"(N)o",
]:
if key_chwin in {Key("Y"), Key("y")}:
return None, 0, None
else:
return None, 1, None
elif key_chwin in tuple_subtract(self._win_keys, key):
chwin.clear()
chwin.refresh()
return key_chwin, index, None
countstring = ""
while index not in range(y, y + padhi):
if index < y:
y -= 1
else:
y += 1
for n in range(totlines):
att = curses.A_REVERSE if index == n else curses.A_NORMAL
pre = ">>" if index == n else " "
pad.addstr(n, 0, pre)
pad.chgat(n, 0, span[n], pad.getbkgd() | att)
pad.refresh(y, 0, Y + 4 + (1 if allowdel else 0), X + 4, rows - 5, cols - 6)
# pad.refresh(y, 0, Y+5, X+4, rows - 5, cols - 6)
key_chwin = Key(chwin.getch())
if key_chwin == Key(curses.KEY_MOUSE):
mouse_event = curses.getmouse()
if mouse_event[4] == curses.BUTTON4_PRESSED:
key_chwin = self.keymap.ScrollUp[0]
elif mouse_event[4] == 2097152:
key_chwin = self.keymap.ScrollDown[0]
elif mouse_event[4] == curses.BUTTON1_DOUBLE_CLICKED:
if (
mouse_event[2] >= 6
and mouse_event[2] < rows - 4
and mouse_event[2] < 6 + totlines
):
index = mouse_event[2] - 6 + y
key_chwin = self.keymap.Follow[0]
elif (
mouse_event[4] == curses.BUTTON1_CLICKED
and mouse_event[2] >= 6
and mouse_event[2] < rows - 4
and mouse_event[2] < 6 + totlines
):
if index == mouse_event[2] - 6 + y:
key_chwin = self.keymap.Follow[0]
continue
index = mouse_event[2] - 6 + y
elif mouse_event[4] == curses.BUTTON3_CLICKED:
key_chwin = self.keymap.Quit[0]
chwin.clear()
chwin.refresh()
return None, None, None
return wrapper
return inner_f
def text_win(textfunc):
@wraps(textfunc)
def wrapper(self, *args, **kwargs) -> Union[NoUpdate, Key]:
rows, cols = self.screen.getmaxyx()
hi, wi = rows - 4, cols - 4
Y, X = 2, 2
textw = curses.newwin(hi, wi, Y, X)
if self.is_color_supported:
textw.bkgd(self.screen.getbkgd())
title, raw_texts, key = textfunc(self, *args, **kwargs)
if len(title) > cols - 8:
title = title[: cols - 8]
texts = []
for i in raw_texts.splitlines():
texts += textwrap.wrap(i, wi - 6, drop_whitespace=False)
textw.box()
textw.keypad(True)
textw.addstr(1, 2, title)
textw.addstr(2, 2, "-" * len(title))
key_textw: Union[NoUpdate, Key] = NoUpdate()
totlines = len(texts)
pad = curses.newpad(totlines, wi - 2)
if self.is_color_supported:
pad.bkgd(self.screen.getbkgd())
pad.keypad(True)
for n, i in enumerate(texts):
pad.addstr(n, 0, i)
y = 0
textw.refresh()
pad.refresh(y, 0, Y + 4, X + 4, rows - 5, cols - 6)
padhi = rows - 8 - Y
while key_textw not in self.keymap.Quit + key:
if key_textw in self.keymap.ScrollUp and y > 0:
y -= 1
elif key_textw in self.keymap.ScrollDown and y < totlines - hi + 6:
y += 1
elif key_textw in self.keymap.PageUp:
y = pgup(y, padhi)
elif key_textw in self.keymap.PageDown:
y = pgdn(y, totlines, padhi)
elif key_textw in self.keymap.BeginningOfCh:
y = 0
elif key_textw in self.keymap.EndOfCh:
y = pgend(totlines, padhi)
elif key_textw in tuple_subtract(self._win_keys, key):
textw.clear()
textw.refresh()
return key_textw
pad.refresh(y, 0, 6, 5, rows - 5, cols - 5)
key_textw = Key(textw.getch())
textw.clear()
textw.refresh()
return NoUpdate()
return wrapper
# }}}
# Main Reading Interface {{{
class Reader:
def __init__(self, screen, ebook: Ebook, config: Config, state: State):
self.setting = config.setting
self.keymap = config.keymap
# to build help menu text
self.keymap_user_dict = config.keymap_user_dict
self.seamless = self.setting.SeamlessBetweenChapters
# keys that will make
# windows exit and return the said key
self._win_keys = (
# curses.KEY_RESIZE is a must
(Key(curses.KEY_RESIZE),)
+ self.keymap.TableOfContents
+ self.keymap.Metadata
+ self.keymap.Help
)
# screen initialization
self.screen = screen
self.screen.keypad(True)
safe_curs_set(0)
if self.setting.MouseSupport:
curses.mousemask(-1)
# curses.mouseinterval(0)
self.screen.clear()
# screen color
self.is_color_supported: bool = False
try:
curses.use_default_colors()
curses.init_pair(1, self.setting.DefaultColorFG, self.setting.DefaultColorBG)
curses.init_pair(2, self.setting.DarkColorFG, self.setting.DarkColorBG)
curses.init_pair(3, self.setting.LightColorFG, self.setting.LightColorBG)
self.screen.bkgd(curses.color_pair(1))
self.is_color_supported = True
except:
self.is_color_supported = False
# show loader and start heavy resources processes
self.show_loader(subtext="initalizing ebook")
# main ebook object
self.ebook = ebook
try:
self.ebook.initialize()
except (KeyboardInterrupt, Exception) as e:
self.ebook.cleanup()
if DEBUG:
raise e
else:
sys.exit("ERROR: Badly-structured ebook.\n" + str(e))
# state
self.state = state
# page scroll animation
self.page_animation: Optional[Direction] = None
# show reading progress
self.show_reading_progress: bool = self.setting.ShowProgressIndicator
self.reading_progress: Optional[float] = None # calculate after count_letters()
# search storage
self.search_data: Optional[SearchData] = None
# double spread
self.spread = 2 if self.setting.StartWithDoubleSpread else 1
# jumps marker container
self.jump_list: Dict[str, ReadingState] = dict()
# TTS speaker utils
self._tts_speaker: Optional[SpeakerBaseModel] = construct_speaker(
self.setting.PreferredTTSEngine, self.setting.TTSEngineArgs
)
self.tts_support: bool = bool(self._tts_speaker)
self.is_speaking: bool = False
# multi process & progress percentage
self._multiprocess_support: bool = False if multiprocessing.cpu_count() == 1 else True
self._process_counting_letter: Optional[multiprocessing.Process] = None
self.letters_count: Optional[LettersCount] = None
def run_counting_letters(self):
if self._multiprocess_support:
try:
self._proc_parent, self._proc_child = multiprocessing.Pipe()
self._process_counting_letter = multiprocessing.Process(
name="epy-subprocess-counting-letters",
target=count_letters_parallel,
args=(self.ebook, self._proc_child),
)
# forking will raise
# zlib.error: Error -3 while decompressing data: invalid distance too far back
self._process_counting_letter.start()
except Exception as e:
if DEBUG:
raise e
self._multiprocess_support = False
if not self._multiprocess_support:
self.letters_count = count_letters(self.ebook)
def try_assign_letters_count(self, *, force_wait=False) -> None:
if isinstance(self._process_counting_letter, multiprocessing.Process):
if force_wait and self._process_counting_letter.is_alive():
self._process_counting_letter.join()
if self._process_counting_letter.exitcode == 0:
self.letters_count = self._proc_parent.recv()
self._proc_parent.close()
self._process_counting_letter.terminate()
self._process_counting_letter.close()
self._process_counting_letter = None
def calculate_reading_progress(
self, letters_per_content: List[int], reading_state: ReadingState
) -> None:
if self.letters_count:
self.reading_progress = (
self.letters_count.cumulative[reading_state.content_index]
+ sum(
letters_per_content[: reading_state.row + (self.screen_rows * self.spread) - 1]
)
) / self.letters_count.all
@property
def screen_rows(self) -> int:
return self.screen.getmaxyx()[0]
@property
def screen_cols(self) -> int:
return self.screen.getmaxyx()[1]
@property
def ext_dict_app(self) -> Optional[str]:
self._ext_dict_app: Optional[str] = None
dict_app_preset_list = ["sdcv", "dict"]
if shutil.which(self.setting.DictionaryClient.split()[0]):
self._ext_dict_app = self.setting.DictionaryClient
else:
for i in dict_app_preset_list:
if shutil.which(i) is not None:
self._ext_dict_app = i
break
if self._ext_dict_app in {"sdcv"}:
self._ext_dict_app += " -n"
return self._ext_dict_app
@property
def image_viewer(self) -> Optional[str]:
self._image_viewer: Optional[str] = None
if shutil.which(self.setting.DefaultViewer.split()[0]) is not None:
self._image_viewer = self.setting.DefaultViewer
elif sys.platform == "win32":
self._image_viewer = "start"
elif sys.platform == "darwin":
self._image_viewer = "open"
else:
for i in VIEWER_PRESET_LIST:
if shutil.which(i) is not None:
self._image_viewer = i
break
if self._image_viewer in {"gio"}:
self._image_viewer += " open"
return self._image_viewer
def open_image(self, pad, name, bstr):
sfx = os.path.splitext(name)[1]
fd, path = tempfile.mkstemp(suffix=sfx)
try:
with os.fdopen(fd, "wb") as tmp:
# tmp.write(epub.file.read(src))
tmp.write(bstr)
# run(VWR + " " + path, shell=True)
subprocess.call(
self.image_viewer + " " + path,
shell=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
k = pad.getch()
finally:
os.remove(path)
return k
def show_loader(self, *, loader_str: str = "\u231B", subtext: Optional[str] = None):
self.screen.clear()
rows, cols = self.screen.getmaxyx()
middle_row = (rows - 1) // 2
self.screen.addstr(middle_row, 0, loader_str.center(cols))
if subtext:
self.screen.addstr(middle_row + 1, 0, subtext.center(cols))
# self.screen.addstr(((rows-2)//2)+1, (cols-len(msg))//2, msg)
self.screen.refresh()
@choice_win(True)
def show_win_options(self, title, options, active_index, key_set):
return title, options, active_index, key_set
@text_win
def show_win_error(self, title, msg, key):
return title, msg, key
@choice_win()
def toc(self, toc_entries: Tuple[TocEntry, ...], index: int):
return (
"Table of Contents",
[i.label for i in toc_entries],
index,
self.keymap.TableOfContents,
)
@text_win
def show_win_metadata(self):
if os.path.isfile(self.ebook.path):
mdata = "[File Info]\nPATH: {}\nSIZE: {} MB\n \n[Book Info]\n".format(
self.ebook.path, round(os.path.getsize(self.ebook.path) / 1024 ** 2, 2)
)
else:
mdata = "[File Info]\nPATH: {}\n \n[Book Info]\n".format(self.ebook.path)
book_metadata = self.ebook.get_meta()
for field in dataclasses.fields(book_metadata):
value = getattr(book_metadata, field.name)
if value:
value = unescape(re.sub("<[^>]*>", "", value))
mdata += f"{field.name.title()}: {value}\n"
return "Metadata", mdata, self.keymap.Metadata
@text_win
def show_win_help(self):
src = "Key Bindings:\n"
dig = max([len(i) for i in self.keymap_user_dict.values()]) + 2
for i in self.keymap_user_dict.keys():
src += "{} {}\n".format(
self.keymap_user_dict[i].rjust(dig), " ".join(re.findall("[A-Z][^A-Z]*", i))
)
return "Help", src, self.keymap.Help
@text_win
def define_word(self, word):
rows, cols = self.screen.getmaxyx()
hi, wi = 5, 16
Y, X = (rows - hi) // 2, (cols - wi) // 2
p = subprocess.Popen(
"{} {}".format(self.ext_dict_app, word),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
)
dictwin = curses.newwin(hi, wi, Y, X)
dictwin.box()
dictwin.addstr((hi - 1) // 2, (wi - 10) // 2, "Loading...")
dictwin.refresh()
out, err = p.communicate()
dictwin.clear()
dictwin.refresh()
if err == b"":
return "Definition: " + word.upper(), out.decode(), self.keymap.DefineWord
else:
return "Error: " + self.ext_dict_app, err.decode(), self.keymap.DefineWord
def show_win_choices_bookmarks(self):
idx = 0
while True:
bookmarks = [i[0] for i in self.state.get_bookmarks(self.ebook)]
if not bookmarks:
return self.keymap.ShowBookmarks[0], None
retk, idx, todel = self.show_win_options(
"Bookmarks", bookmarks, idx, self.keymap.ShowBookmarks
)
if todel is not None:
self.state.delete_bookmark(self.ebook, bookmarks[todel])
else:
return retk, idx
def show_win_library(self):
while True:
library_items = self.state.get_from_history()
if not library_items:
return self.keymap.Library[0], None
retk, choice_index, todel_index = self.show_win_options(
"Library", [str(item) for item in library_items], 0, self.keymap.Library
)
if todel_index is not None:
self.state.delete_from_library(library_items[todel_index].filepath)
else:
return retk, choice_index
def input_prompt(self, prompt: str) -> Union[NoUpdate, Key, str]:
"""
:param prompt: prompt text
:return: NoUpdate if cancelled or interrupted
Key if curses.KEY_RESIZE triggered
str for successful input
"""
# prevent pad hole when prompting for input while
# other window is active
# pad.refresh(y, 0, 0, x, rows-2, x+width)
rows, cols = self.screen.getmaxyx()
stat = curses.newwin(1, cols, rows - 1, 0)
if self.is_color_supported:
stat.bkgd(self.screen.getbkgd())
stat.keypad(True)
curses.echo(True)
safe_curs_set(2)
init_text = ""
stat.addstr(0, 0, prompt, curses.A_REVERSE)
stat.addstr(0, len(prompt), init_text)
stat.refresh()
try:
while True:
# NOTE: getch() only handles ascii
# to handle wide char like: é, use get_wch()
ipt = Key(stat.get_wch())
# get_wch() return ambiguous type
# str for string input but int for function or special keys
# if type(ipt) == str:
# ipt = ord(ipt)
if ipt == Key(27):
stat.clear()
stat.refresh()
curses.echo(False)
safe_curs_set(0)
return NoUpdate()
elif ipt == Key(10):
stat.clear()
stat.refresh()
curses.echo(False)
safe_curs_set(0)
return init_text
elif ipt in (Key(8), Key(127), Key(curses.KEY_BACKSPACE)):
init_text = init_text[:-1]
elif ipt == Key(curses.KEY_RESIZE):
stat.clear()
stat.refresh()
curses.echo(False)
safe_curs_set(0)
return Key(curses.KEY_RESIZE)
# elif len(init_text) <= maxlen:
else:
init_text += ipt.char
stat.clear()
stat.addstr(0, 0, prompt, curses.A_REVERSE)
stat.addstr(
0,
len(prompt),
init_text
if len(prompt + init_text) < cols
else "..." + init_text[len(prompt) - cols + 4 :],
)
stat.refresh()
except KeyboardInterrupt:
stat.clear()
stat.refresh()
curses.echo(False)
safe_curs_set(0)
return NoUpdate()
def searching(
self, board: InfiniBoard, src: Sequence[str], reading_state: ReadingState, tot
) -> Union[NoUpdate, ReadingState, Key]:
# reusable loop indices
i: Any
j: Any
rows, cols = self.screen.getmaxyx()
# unnecessary
# if self.spread == 2:
# reading_state = dataclasses.replace(reading_state, textwidth=(cols - 7) // 2)
x = (cols - reading_state.textwidth) // 2
if self.spread == 1:
x = (cols - reading_state.textwidth) // 2
else:
x = 2
if not self.search_data:
candidate_text = self.input_prompt(" Regex:")
# if isinstance(candidate_text, str) and candidate_text != "":
if isinstance(candidate_text, str) and candidate_text:
self.search_data = SearchData(value=candidate_text)
else:
assert isinstance(candidate_text, NoUpdate) or isinstance(candidate_text, Key)
return candidate_text
found = []
try:
pattern = re.compile(self.search_data.value, re.IGNORECASE)
except re.error as reerrmsg:
self.search_data = None
tmpk = self.show_win_error("!Regex Error", str(reerrmsg), tuple())
return tmpk
for n, i in enumerate(src):
for j in pattern.finditer(i):
found.append([n, j.span()[0], j.span()[1] - j.span()[0]])
if not found:
if (
self.search_data.direction == Direction.FORWARD
and reading_state.content_index + 1 < tot
):
return ReadingState(
content_index=reading_state.content_index + 1,
textwidth=reading_state.textwidth,
row=0,
)
elif (
self.search_data.direction == Direction.BACKWARD and reading_state.content_index > 0
):
return ReadingState(
content_index=reading_state.content_index - 1,
textwidth=reading_state.textwidth,
row=0,
)
else:
s: Union[NoUpdate, Key] = NoUpdate()
while True:
if s in self.keymap.Quit:
self.search_data = None
self.screen.clear()
self.screen.refresh()
return reading_state
# TODO: maybe >= 0?
elif s == Key("n") and reading_state.content_index == 0:
self.search_data = dataclasses.replace(
self.search_data, direction=Direction.FORWARD
)
return ReadingState(
content_index=reading_state.content_index + 1,
textwidth=reading_state.textwidth,
row=0,
)
elif s == Key("N") and reading_state.content_index + 1 == tot:
self.search_data = dataclasses.replace(
self.search_data, direction=Direction.BACKWARD
)
return ReadingState(
content_index=reading_state.content_index - 1,
textwidth=reading_state.textwidth,
row=0,
)
self.screen.clear()
self.screen.addstr(
rows - 1,
0,
" Finished searching: " + self.search_data.value[: cols - 22] + " ",
curses.A_REVERSE,
)
board.write(reading_state.row, 1)
self.screen.refresh()
s = board.getch()
sidx = len(found) - 1
if self.search_data.direction == Direction.FORWARD:
if reading_state.row > found[-1][0]:
return ReadingState(
content_index=reading_state.content_index + 1,
textwidth=reading_state.textwidth,
row=0,
)
for n, i in enumerate(found):
if i[0] >= reading_state.row:
sidx = n
break
s = NoUpdate()
msg = (
" Searching: "
+ self.search_data.value
+ " --- Res {}/{} Ch {}/{} ".format(
sidx + 1, len(found), reading_state.content_index + 1, tot
)
)
while True:
if s in self.keymap.Quit:
self.search_data = None
# for i in found:
# pad.chgat(i[0], i[1], i[2], pad.getbkgd())
board.feed_temporary_style()
# pad.format()
# self.screen.clear()
# self.screen.refresh()
return reading_state
elif s == Key("n"):
self.search_data = dataclasses.replace(
self.search_data, direction=Direction.FORWARD
)
if sidx == len(found) - 1:
if reading_state.content_index + 1 < tot:
return ReadingState(
content_index=reading_state.content_index + 1,
textwidth=reading_state.textwidth,
row=0,
)
else:
s = NoUpdate()
msg = " Finished searching: " + self.search_data.value + " "
continue
else:
sidx += 1
msg = (
" Searching: "
+ self.search_data.value
+ " --- Res {}/{} Ch {}/{} ".format(
sidx + 1, len(found), reading_state.content_index + 1, tot
)
)
elif s == Key("N"):
self.search_data = dataclasses.replace(
self.search_data, direction=Direction.BACKWARD
)
if sidx == 0:
if reading_state.content_index > 0:
return ReadingState(
content_index=reading_state.content_index - 1,
textwidth=reading_state.textwidth,
row=0,
)
else:
s = NoUpdate()
msg = " Finished searching: " + self.search_data.value + " "
continue
else:
sidx -= 1
msg = (
" Searching: "
+ self.search_data.value
+ " --- Res {}/{} Ch {}/{} ".format(
sidx + 1, len(found), reading_state.content_index + 1, tot
)
)
elif s == Key(curses.KEY_RESIZE):
return Key(curses.KEY_RESIZE)
# if reading_state.row + rows - 1 > pad.chunks[pad.find_chunkidx(reading_state.row)]:
# reading_state = dataclasses.replace(
# reading_state, row=pad.chunks[pad.find_chunkidx(reading_state.row)] + 1
# )
while found[sidx][0] not in list(
range(reading_state.row, reading_state.row + (rows - 1) * self.spread)
):
if found[sidx][0] > reading_state.row:
reading_state = dataclasses.replace(
reading_state, row=reading_state.row + ((rows - 1) * self.spread)
)
else:
reading_state = dataclasses.replace(
reading_state, row=reading_state.row - ((rows - 1) * self.spread)
)
if reading_state.row < 0:
reading_state = dataclasses.replace(reading_state, row=0)
# formats = [InlineStyle(row=i[0], col=i[1], n_letters=i[2], attr=curses.A_REVERSE) for i in found]
# pad.feed_style(formats)
styles: List[InlineStyle] = []
for n, i in enumerate(found):
attr = curses.A_REVERSE if n == sidx else curses.A_NORMAL
# pad.chgat(i[0], i[1], i[2], pad.getbkgd() | attr)
styles.append(
InlineStyle(row=i[0], col=i[1], n_letters=i[2], attr=board.getbkgd() | attr)
)
board.feed_temporary_style(tuple(styles))
self.screen.clear()
self.screen.addstr(rows - 1, 0, msg, curses.A_REVERSE)
self.screen.refresh()
# pad.refresh(reading_state.row, 0, 0, x, rows - 2, x + reading_state.textwidth)
board.write(reading_state.row, 1)
s = board.getch()
def speaking(self, text):
self.is_speaking = True
self.screen.addstr(self.screen_rows - 1, 0, " Speaking! ", curses.A_REVERSE)
self.screen.refresh()
self.screen.timeout(1)
try:
self._tts_speaker.speak(text)
while True:
if self._tts_speaker.is_done():
k = self.keymap.PageDown[0]
break
tmp = self.screen.getch()
k = NoUpdate() if tmp == -1 else Key(tmp)
if k == Key(curses.KEY_MOUSE):
mouse_event = curses.getmouse()
if mouse_event[4] == curses.BUTTON2_CLICKED:
k = self.keymap.Quit[0]
elif mouse_event[4] == curses.BUTTON1_CLICKED:
if mouse_event[1] < self.screen_cols // 2:
k = self.keymap.PageUp[0]
else:
k = self.keymap.PageDown[0]
elif mouse_event[4] == curses.BUTTON4_PRESSED:
k = self.keymap.ScrollUp[0]
elif mouse_event[4] == 2097152:
k = self.keymap.ScrollDown[0]
if (
k
in self.keymap.Quit
+ self.keymap.PageUp
+ self.keymap.PageDown
+ self.keymap.ScrollUp
+ self.keymap.ScrollDown
+ (curses.KEY_RESIZE,)
):
self._tts_speaker.stop()
break
finally:
self.screen.timeout(-1)
self._tts_speaker.cleanup()
if k in self.keymap.Quit:
self.is_speaking = False
k = NoUpdate()
return k
def savestate(self, reading_state: ReadingState) -> None:
if self.seamless:
reading_state = self.convert_absolute_reading_state_to_relative(reading_state)
self.state.set_last_reading_state(self.ebook, reading_state)
self.state.update_library(self.ebook, self.reading_progress)
def cleanup(self) -> None:
self.ebook.cleanup()
if isinstance(self._process_counting_letter, multiprocessing.Process):
if self._process_counting_letter.is_alive():
self._process_counting_letter.terminate()
# weird python multiprocessing issue, need to call .join() before .close()
# ValueError: Cannot close a process while it is still running.
# You should first call join() or terminate().
self._process_counting_letter.join()
self._process_counting_letter.close()
def convert_absolute_reading_state_to_relative(self, reading_state) -> ReadingState:
if not self.seamless:
raise RuntimeError(
"Reader.convert_absolute_reading_state_to_relative() only implemented when Seamless=True"
)
return construct_relative_reading_state(reading_state, self.totlines_per_content)
def convert_relative_reading_state_to_absolute(
self, reading_state: ReadingState
) -> ReadingState:
if not self.seamless:
raise RuntimeError(
"Reader.convert_relative_reading_state_to_absolute() only implemented when Seamless=True"
)
absolute_row = reading_state.row + sum(
self.totlines_per_content[: reading_state.content_index]
)
absolute_pctg = (
absolute_row / sum(self.totlines_per_content) if reading_state.rel_pctg else None
)
return dataclasses.replace(
reading_state, content_index=0, row=absolute_row, rel_pctg=absolute_pctg
)
def get_all_book_contents(
self, reading_state: ReadingState
) -> Tuple[TextStructure, Tuple[TocEntry, ...], Union[Tuple[str, ...], Tuple[ET.Element, ...]]]:
if not self.seamless:
raise RuntimeError("Reader.get_all_book_contents() only implemented when Seamless=True")
contents = self.ebook.contents
toc_entries = self.ebook.toc_entries
text_structure: TextStructure = TextStructure(
text_lines=tuple(), image_maps=dict(), section_rows=dict(), formatting=tuple()
)
toc_entries_tmp: List[TocEntry] = []
section_rows_tmp: Dict[str, int] = dict()
# self.totlines_per_content only defined when Seamless=True
self.totlines_per_content: Tuple[int, ...] = tuple()
for n, content in enumerate(contents):
self.show_loader(subtext=f"loading contents ({n+1}/{len(contents)})")
starting_line = sum(self.totlines_per_content)
assert isinstance(content, str) or isinstance(content, ET.Element)
text_structure_tmp = parse_html(
self.ebook.get_raw_text(content),
textwidth=reading_state.textwidth,
section_ids=set(toc_entry.section for toc_entry in toc_entries), # type: ignore
starting_line=starting_line,
)
assert isinstance(text_structure_tmp, TextStructure)
# self.totlines_per_content.append(len(text_structure_tmp.text_lines))
self.totlines_per_content += (len(text_structure_tmp.text_lines),)
for toc_entry in toc_entries:
if toc_entry.content_index == n:
if toc_entry.section:
toc_entries_tmp.append(dataclasses.replace(toc_entry, content_index=0))
else:
section_id_tmp = str(uuid.uuid4())
toc_entries_tmp.append(
TocEntry(label=toc_entry.label, content_index=0, section=section_id_tmp)
)
section_rows_tmp[section_id_tmp] = starting_line
text_structure = merge_text_structures(text_structure, text_structure_tmp)
text_structure = dataclasses.replace(
text_structure, section_rows={**text_structure.section_rows, **section_rows_tmp}
)
return text_structure, tuple(toc_entries_tmp), (self.ebook.contents[0],)
def get_current_book_content(
self, reading_state: ReadingState
) -> Tuple[TextStructure, Tuple[TocEntry, ...], Union[Tuple[str, ...], Tuple[ET.Element, ...]]]:
contents = self.ebook.contents
toc_entries = self.ebook.toc_entries
content_path = contents[reading_state.content_index]
content = self.ebook.get_raw_text(content_path)
text_structure = parse_html( # type: ignore
content,
textwidth=reading_state.textwidth,
section_ids=set(toc_entry.section for toc_entry in toc_entries), # type: ignore
)
return text_structure, toc_entries, contents
def read(self, reading_state: ReadingState) -> Union[ReadingState, Ebook]:
# reusable loop indices
i: Any
k = self.keymap.RegexSearch[0] if self.search_data else NoUpdate()
rows, cols = self.screen.getmaxyx()
mincols_doublespr = (
DoubleSpreadPadding.LEFT.value
+ 22
+ DoubleSpreadPadding.MIDDLE.value
+ 22
+ DoubleSpreadPadding.RIGHT.value
)
if cols < mincols_doublespr:
self.spread = 1
if self.spread == 2:
reading_state = dataclasses.replace(
reading_state,
textwidth=(
cols
- sum(
[
DoubleSpreadPadding.LEFT.value,
DoubleSpreadPadding.MIDDLE.value,
DoubleSpreadPadding.RIGHT.value,
]
)
)
// 2,
)
x = (cols - reading_state.textwidth) // 2
if self.spread == 2:
x = DoubleSpreadPadding.LEFT.value
self.show_loader(subtext="loading contents")
# get text structure, toc entries and contents of the book
if self.seamless:
text_structure, toc_entries, contents = self.get_all_book_contents(reading_state)
# adjustment
reading_state = self.convert_relative_reading_state_to_absolute(reading_state)
else:
text_structure, toc_entries, contents = self.get_current_book_content(reading_state)
totlines = len(text_structure.text_lines)
if reading_state.row < 0 and totlines <= rows * self.spread:
reading_state = dataclasses.replace(reading_state, row=0)
elif reading_state.rel_pctg is not None:
reading_state = dataclasses.replace(
reading_state, row=round(reading_state.rel_pctg * totlines)
)
else:
reading_state = dataclasses.replace(reading_state, row=reading_state.row % totlines)
board = InfiniBoard(
screen=self.screen,
text=text_structure.text_lines,
textwidth=reading_state.textwidth,
default_style=text_structure.formatting,
spread=self.spread,
)
letters_per_content: List[int] = []
for i in text_structure.text_lines:
letters_per_content.append(len(re.sub(r"\s", "", i)))
self.screen.clear()
self.screen.refresh()
# try-except clause if there is issue
# with curses resize event
board.write(reading_state.row)
# if reading_state.section is not None
# then override reading_state.row to follow the section
if reading_state.section:
reading_state = dataclasses.replace(
reading_state, row=text_structure.section_rows.get(reading_state.section, 0)
)
checkpoint_row: Optional[int] = None
countstring = ""
try:
while True:
if countstring == "":
count = 1
else:
count = int(countstring)
if k in tuple(Key(i) for i in range(48, 58)): # i.e., k is a numeral
countstring = countstring + k.char
else:
if k in self.keymap.Quit:
if k == Key(27) and countstring != "":
countstring = ""
else:
self.try_assign_letters_count(force_wait=True)
self.calculate_reading_progress(letters_per_content, reading_state)
self.savestate(
dataclasses.replace(
reading_state, rel_pctg=reading_state.row / totlines
)
)
sys.exit()
elif k in self.keymap.TTSToggle and self.tts_support:
tospeak = ""
for i in text_structure.text_lines[
reading_state.row : reading_state.row + (rows * self.spread)
]:
if re.match(r"^\s*$", i) is not None:
tospeak += "\n. \n"
else:
tospeak += i + " "
k = self.speaking(tospeak)
if (
totlines - reading_state.row <= rows
and reading_state.content_index == len(contents) - 1
):
self.is_speaking = False
continue
elif k in self.keymap.DoubleSpreadToggle:
if cols < mincols_doublespr:
k = self.show_win_error(
"Screen is too small",
"Min: {} cols x {} rows".format(mincols_doublespr, 12),
(Key("D"),),
)
self.spread = (self.spread % 2) + 1
return ReadingState(
content_index=reading_state.content_index,
textwidth=reading_state.textwidth,
row=reading_state.row,
rel_pctg=reading_state.row / totlines,
)
elif k in self.keymap.ScrollUp:
if self.spread == 2:
k = self.keymap.PageUp[0]
continue
if count > 1:
checkpoint_row = reading_state.row - 1
if reading_state.row >= count:
reading_state = dataclasses.replace(
reading_state, row=reading_state.row - count
)
elif reading_state.row == 0 and reading_state.content_index != 0:
self.page_animation = Direction.BACKWARD
# return -1, width, -rows, None, ""
return ReadingState(
content_index=reading_state.content_index - 1,
textwidth=reading_state.textwidth,
row=-rows,
)
else:
reading_state = dataclasses.replace(reading_state, row=0)
elif k in self.keymap.PageUp:
if reading_state.row == 0 and reading_state.content_index != 0:
self.page_animation = Direction.BACKWARD
text_structure_content_before = parse_html(
self.ebook.get_raw_text(contents[reading_state.content_index - 1]),
textwidth=reading_state.textwidth,
)
assert isinstance(text_structure_content_before, TextStructure)
return ReadingState(
content_index=reading_state.content_index - 1,
textwidth=reading_state.textwidth,
row=rows
* self.spread
* (
len(text_structure_content_before.text_lines)
// (rows * self.spread)
),
)
else:
if reading_state.row >= rows * self.spread * count:
self.page_animation = Direction.BACKWARD
reading_state = dataclasses.replace(
reading_state,
row=reading_state.row - (rows * self.spread * count),
)
else:
reading_state = dataclasses.replace(reading_state, row=0)
elif k in self.keymap.ScrollDown:
if self.spread == 2:
k = self.keymap.PageDown[0]
continue
if count > 1:
checkpoint_row = reading_state.row + rows - 1
if reading_state.row + count <= totlines - rows:
reading_state = dataclasses.replace(
reading_state, row=reading_state.row + count
)
elif (
reading_state.row >= totlines - rows
and reading_state.content_index != len(contents) - 1
):
self.page_animation = Direction.FORWARD
return ReadingState(
content_index=reading_state.content_index + 1,
textwidth=reading_state.textwidth,
row=0,
)
elif k in self.keymap.PageDown:
if totlines - reading_state.row > rows * self.spread:
self.page_animation = Direction.FORWARD
reading_state = dataclasses.replace(
reading_state, row=reading_state.row + (rows * self.spread)
)
elif reading_state.content_index != len(contents) - 1:
self.page_animation = Direction.FORWARD
return ReadingState(
content_index=reading_state.content_index + 1,
textwidth=reading_state.textwidth,
row=0,
)
# elif k in K["HalfScreenUp"] | K["HalfScreenDown"]:
# countstring = str(rows // 2)
# k = list(K["ScrollUp" if k in K["HalfScreenUp"] else "ScrollDown"])[0]
# continue
elif k in self.keymap.NextChapter:
ntoc = find_current_content_index(
toc_entries,
text_structure.section_rows,
reading_state.content_index,
reading_state.row,
)
if ntoc < len(toc_entries) - 1:
if reading_state.content_index == toc_entries[ntoc + 1].content_index:
try:
reading_state = dataclasses.replace(
reading_state,
row=text_structure.section_rows[
toc_entries[ntoc + 1].section # type: ignore
],
)
except KeyError:
pass
else:
return ReadingState(
content_index=toc_entries[ntoc + 1].content_index,
textwidth=reading_state.textwidth,
row=0,
section=toc_entries[ntoc + 1].section,
)
elif k in self.keymap.PrevChapter:
ntoc = find_current_content_index(
toc_entries,
text_structure.section_rows,
reading_state.content_index,
reading_state.row,
)
if ntoc > 0:
if reading_state.content_index == toc_entries[ntoc - 1].content_index:
reading_state = dataclasses.replace(
reading_state,
row=text_structure.section_rows.get(
toc_entries[ntoc - 1].section, 0 # type: ignore
),
)
else:
return ReadingState(
content_index=toc_entries[ntoc - 1].content_index,
textwidth=reading_state.textwidth,
row=0,
section=toc_entries[ntoc - 1].section,
)
elif k in self.keymap.BeginningOfCh:
ntoc = find_current_content_index(
toc_entries,
text_structure.section_rows,
reading_state.content_index,
reading_state.row,
)
try:
reading_state = dataclasses.replace(
reading_state,
row=text_structure.section_rows[toc_entries[ntoc].section], # type: ignore
)
except (KeyError, IndexError):
reading_state = dataclasses.replace(reading_state, row=0)
elif k in self.keymap.EndOfCh:
ntoc = find_current_content_index(
toc_entries,
text_structure.section_rows,
reading_state.content_index,
reading_state.row,
)
try:
if (
text_structure.section_rows[toc_entries[ntoc + 1].section] - rows # type: ignore
>= 0
):
reading_state = dataclasses.replace(
reading_state,
row=text_structure.section_rows[toc_entries[ntoc + 1].section] # type: ignore
- rows,
)
else:
reading_state = dataclasses.replace(
reading_state,
row=text_structure.section_rows[toc_entries[ntoc].section], # type: ignore
)
except (KeyError, IndexError):
reading_state = dataclasses.replace(
reading_state, row=pgend(totlines, rows)
)
elif k in self.keymap.TableOfContents:
if not toc_entries:
k = self.show_win_error(
"Table of Contents",
"N/A: TableOfContents is unavailable for this book.",
self.keymap.TableOfContents,
)
continue
ntoc = find_current_content_index(
toc_entries,
text_structure.section_rows,
reading_state.content_index,
reading_state.row,
)
rettock, fllwd, _ = self.toc(toc_entries, ntoc)
if rettock is not None: # and rettock in WINKEYS:
k = rettock
continue
elif fllwd is not None:
if reading_state.content_index == toc_entries[fllwd].content_index:
try:
reading_state = dataclasses.replace(
reading_state,
row=text_structure.section_rows[toc_entries[fllwd].section],
)
except KeyError:
reading_state = dataclasses.replace(reading_state, row=0)
else:
return ReadingState(
content_index=toc_entries[fllwd].content_index,
textwidth=reading_state.textwidth,
row=0,
section=toc_entries[fllwd].section,
)
elif k in self.keymap.Metadata:
k = self.show_win_metadata()
if k in self._win_keys:
continue
elif k in self.keymap.Help:
k = self.show_win_help()
if k in self._win_keys:
continue
elif (
k in self.keymap.Enlarge
and (reading_state.textwidth + count) < cols - 4
and self.spread == 1
):
return dataclasses.replace(
reading_state,
textwidth=reading_state.textwidth + count,
rel_pctg=reading_state.row / totlines,
)
elif (
k in self.keymap.Shrink
and reading_state.textwidth >= 22
and self.spread == 1
):
return dataclasses.replace(
reading_state,
textwidth=reading_state.textwidth - count,
rel_pctg=reading_state.row / totlines,
)
elif k in self.keymap.SetWidth and self.spread == 1:
if countstring == "":
# if called without a count, toggle between 80 cols and full width
if reading_state.textwidth != 80 and cols - 4 >= 80:
return ReadingState(
content_index=reading_state.content_index,
textwidth=80,
row=reading_state.row,
rel_pctg=reading_state.row / totlines,
)
else:
return ReadingState(
content_index=reading_state.content_index,
textwidth=cols - 4,
row=reading_state.row,
rel_pctg=reading_state.row / totlines,
)
else:
reading_state = dataclasses.replace(reading_state, textwidth=count)
if reading_state.textwidth < 20:
reading_state = dataclasses.replace(reading_state, textwidth=20)
elif reading_state.textwidth >= cols - 4:
reading_state = dataclasses.replace(reading_state, textwidth=cols - 4)
return ReadingState(
content_index=reading_state.content_index,
textwidth=reading_state.textwidth,
row=reading_state.row,
rel_pctg=reading_state.row / totlines,
)
elif k in self.keymap.RegexSearch:
ret_object = self.searching(
board,
text_structure.text_lines,
reading_state,
len(contents),
)
if isinstance(ret_object, Key) or isinstance(ret_object, NoUpdate):
k = ret_object
# k = ret_object.value
continue
elif isinstance(ret_object, ReadingState) and self.search_data:
return ret_object
# else:
elif isinstance(ret_object, ReadingState):
# y = ret_object
reading_state = ret_object
elif k in self.keymap.OpenImage and self.image_viewer:
imgs_in_screen = list(
set(
range(reading_state.row, reading_state.row + rows * self.spread + 1)
)
& set(text_structure.image_maps.keys())
)
if not imgs_in_screen:
k = NoUpdate()
continue
imgs_in_screen.sort()
image_path: Optional[str] = None
if len(imgs_in_screen) == 1:
image_path = text_structure.image_maps[imgs_in_screen[0]]
elif len(imgs_in_screen) > 1:
imgs_rel_to_row = [i - reading_state.row for i in imgs_in_screen]
p: Union[NoUpdate, Key] = NoUpdate()
i = 0
while p not in self.keymap.Quit and p not in self.keymap.Follow:
self.screen.move(
imgs_rel_to_row[i] % rows,
(
x
if imgs_rel_to_row[i] // rows == 0
else cols
- DoubleSpreadPadding.RIGHT.value
- reading_state.textwidth
)
+ reading_state.textwidth // 2,
)
self.screen.refresh()
safe_curs_set(2)
p = board.getch()
if p in self.keymap.ScrollDown:
i += 1
elif p in self.keymap.ScrollUp:
i -= 1
i = i % len(imgs_rel_to_row)
safe_curs_set(0)
if p in self.keymap.Follow:
image_path = text_structure.image_maps[imgs_in_screen[i]]
if image_path:
try:
# if self.ebook.__class__.__name__ in {"Epub", "Mobi", "Azw"}:
if isinstance(self.ebook, (Epub, Mobi, Azw)):
# self.seamless adjustment
if self.seamless:
current_content_index = (
self.convert_absolute_reading_state_to_relative(
reading_state
).content_index
)
else:
current_content_index = reading_state.content_index
# for n, content in enumerate(self.ebook.contents):
# content_path = content
# if reading_state.row < sum(totlines_per_content[:n]):
# break
content_path = self.ebook.contents[current_content_index]
assert isinstance(content_path, str)
image_path = resolve_path(content_path, image_path)
imgnm, imgbstr = self.ebook.get_img_bytestr(image_path)
k = self.open_image(board, imgnm, imgbstr)
continue
except Exception as e:
self.show_win_error("Error Opening Image", str(e), tuple())
if DEBUG:
raise e
elif (
k in self.keymap.SwitchColor
and self.is_color_supported
and countstring in {"", "0", "1", "2"}
):
if countstring == "":
count_color = curses.pair_number(self.screen.getbkgd())
if count_color not in {2, 3}:
count_color = 1
count_color = count_color % 3
else:
count_color = count
self.screen.bkgd(curses.color_pair(count_color + 1))
# pad.format()
return ReadingState(
content_index=reading_state.content_index,
textwidth=reading_state.textwidth,
row=reading_state.row,
)
elif k in self.keymap.AddBookmark:
bmname = self.input_prompt(" Add bookmark:")
if isinstance(bmname, str) and bmname:
try:
self.state.insert_bookmark(
self.ebook,
bmname,
dataclasses.replace(
reading_state, rel_pctg=reading_state.row / totlines
),
)
except sqlite3.IntegrityError:
k = self.show_win_error(
"Error: Add Bookmarks",
f"Bookmark with name '{bmname}' already exists.",
(Key("B"),),
)
continue
else:
k = bmname
continue
elif k in self.keymap.ShowBookmarks:
bookmarks = self.state.get_bookmarks(self.ebook)
if not bookmarks:
k = self.show_win_error(
"Bookmarks",
"N/A: Bookmarks are not found in this book.",
self.keymap.ShowBookmarks,
)
continue
else:
retk, idxchoice = self.show_win_choices_bookmarks()
if retk is not None:
k = retk
continue
elif idxchoice is not None:
bookmark_to_jump = self.state.get_bookmarks(self.ebook)[idxchoice][
1
]
if (
bookmark_to_jump.content_index == reading_state.content_index
and bookmark_to_jump.textwidth == reading_state.textwidth
):
reading_state = bookmark_to_jump
else:
return ReadingState(
content_index=bookmark_to_jump.content_index,
textwidth=reading_state.textwidth,
row=bookmark_to_jump.row,
rel_pctg=bookmark_to_jump.rel_pctg,
)
elif k in self.keymap.DefineWord and self.ext_dict_app:
word = self.input_prompt(" Define:")
if isinstance(word, str) and word:
defin = self.define_word(word)
if defin in self._win_keys:
k = defin
continue
else:
k = word
continue
elif k in self.keymap.MarkPosition:
jumnum = board.getch()
if isinstance(jumnum, Key) and jumnum in tuple(
Key(i) for i in range(48, 58)
):
self.jump_list[jumnum.char] = reading_state
else:
k = NoUpdate()
continue
elif k in self.keymap.JumpToPosition:
jumnum = board.getch()
if (
isinstance(jumnum, Key)
and jumnum in tuple(Key(i) for i in range(48, 58))
and jumnum.char in self.jump_list
):
marked_reading_state = self.jump_list[jumnum.char]
return dataclasses.replace(
marked_reading_state,
textwidth=reading_state.textwidth,
rel_pctg=None
if marked_reading_state.textwidth == reading_state.textwidth
else marked_reading_state.rel_pctg,
section="",
)
else:
k = NoUpdate()
continue
elif k in self.keymap.ShowHideProgress:
self.show_reading_progress = not self.show_reading_progress
elif k in self.keymap.Library:
self.try_assign_letters_count(force_wait=True)
self.calculate_reading_progress(letters_per_content, reading_state)
self.savestate(
dataclasses.replace(
reading_state, rel_pctg=reading_state.row / totlines
)
)
library_items = self.state.get_from_history()
if not library_items:
k = self.show_win_error(
"Library",
"N/A: No reading history.",
self.keymap.Library,
)
continue
else:
retk, choice_index = self.show_win_library()
if retk is not None:
k = retk
continue
elif choice_index is not None:
return get_ebook_obj(library_items[choice_index].filepath)
elif k == Key(curses.KEY_RESIZE):
self.savestate(
dataclasses.replace(
reading_state, rel_pctg=reading_state.row / totlines
)
)
# stated in pypi windows-curses page:
# to call resize_term right after KEY_RESIZE
if sys.platform == "win32":
curses.resize_term(rows, cols)
rows, cols = self.screen.getmaxyx()
else:
rows, cols = self.screen.getmaxyx()
curses.resize_term(rows, cols)
if cols < 22 or rows < 12:
sys.exit("ERROR: Screen was too small (min 22cols x 12rows).")
if cols <= reading_state.textwidth + 4:
return ReadingState(
content_index=reading_state.content_index,
textwidth=cols - 4,
row=reading_state.row,
rel_pctg=reading_state.row / totlines,
)
else:
return ReadingState(
content_index=reading_state.content_index,
textwidth=reading_state.textwidth,
row=reading_state.row,
)
countstring = ""
if checkpoint_row:
board.feed_temporary_style(
(
InlineStyle(
row=checkpoint_row,
col=0,
n_letters=reading_state.textwidth,
attr=curses.A_UNDERLINE,
),
)
)
try:
if self.setting.PageScrollAnimation and self.page_animation:
self.screen.clear()
for i in range(1, reading_state.textwidth + 1):
curses.napms(1)
# self.screen.clear()
board.write_n(reading_state.row, i, self.page_animation)
self.screen.refresh()
self.page_animation = None
self.screen.clear()
self.screen.addstr(0, 0, countstring)
board.write(reading_state.row)
# check if letters counting process is done
self.try_assign_letters_count()
# reading progress
self.calculate_reading_progress(letters_per_content, reading_state)
# display reading progress
if (
self.reading_progress
and self.show_reading_progress
and (cols - reading_state.textwidth - 2) // 2 > 3
):
reading_progress_str = "{}%".format(int(self.reading_progress * 100))
self.screen.addstr(
0, cols - len(reading_progress_str), reading_progress_str
)
self.screen.refresh()
except curses.error:
pass
if self.is_speaking:
k = self.keymap.TTSToggle[0]
continue
k = board.getch()
if k == Key(curses.KEY_MOUSE):
mouse_event = curses.getmouse()
if mouse_event[4] == curses.BUTTON1_CLICKED:
if mouse_event[1] < cols // 2:
k = self.keymap.PageUp[0]
else:
k = self.keymap.PageDown[0]
elif mouse_event[4] == curses.BUTTON3_CLICKED:
k = self.keymap.TableOfContents[0]
elif mouse_event[4] == curses.BUTTON4_PRESSED:
k = self.keymap.ScrollUp[0]
elif mouse_event[4] == 2097152:
k = self.keymap.ScrollDown[0]
elif mouse_event[4] == curses.BUTTON4_PRESSED + curses.BUTTON_CTRL:
k = self.keymap.Enlarge[0]
elif mouse_event[4] == 2097152 + curses.BUTTON_CTRL:
k = self.keymap.Shrink[0]
elif mouse_event[4] == curses.BUTTON2_CLICKED:
k = self.keymap.TTSToggle[0]
if checkpoint_row:
board.feed_temporary_style()
checkpoint_row = None
except KeyboardInterrupt:
self.savestate(
dataclasses.replace(reading_state, rel_pctg=reading_state.row / totlines)
)
sys.exit()
# }}}
# Reading Init {{{
def preread(stdscr, filepath: str):
global STDSCR
if DEBUG:
STDSCR = stdscr
ebook = get_ebook_obj(filepath)
state = State()
config = Config()
reader = Reader(screen=stdscr, ebook=ebook, config=config, state=state)
try:
reader.run_counting_letters()
reading_state = state.get_last_reading_state(reader.ebook)
if reader.screen_cols <= reading_state.textwidth + 4:
reading_state = dataclasses.replace(reading_state, textwidth=reader.screen_cols - 4)
else:
reading_state = dataclasses.replace(reading_state, rel_pctg=None)
while True:
reading_state_or_ebook = reader.read(reading_state)
if isinstance(reading_state_or_ebook, Ebook):
return reading_state_or_ebook.path
else:
reading_state = reading_state_or_ebook
if reader.seamless:
reading_state = reader.convert_absolute_reading_state_to_relative(reading_state)
finally:
reader.cleanup()
# }}}
# Commandline {{{
def parse_cli_args() -> Tuple[str, bool]:
"""
Parse CLI args and return tuple of filepath and boolean (dump ebook indicator).
And exiting the program depending on situation.
"""
prog = "epy"
positional_arg_help_str = "[PATH | # | PATTERN | URL]"
args_parser = argparse.ArgumentParser(
prog=prog,
usage=f"%(prog)s [-h] [-r] [-d] [-v] {positional_arg_help_str}",
formatter_class=argparse.RawDescriptionHelpFormatter,
description="Read ebook in terminal",
epilog=textwrap.dedent(
f"""\
examples:
{prog} /path/to/ebook read /path/to/ebook file
{prog} 3 read #3 file from reading history
{prog} count monte read file matching 'count monte'
from reading history
"""
),
)
args_parser.add_argument("-r", "--history", action="store_true", help="print reading history")
args_parser.add_argument("-d", "--dump", action="store_true", help="dump the content of ebook")
args_parser.add_argument(
"-v",
"--version",
action="version",
version=f"v{__version__}",
help="print version and exit",
)
args_parser.add_argument(
"ebook",
action="store",
nargs="*",
metavar=positional_arg_help_str,
help="ebook path, history number, pattern or URL",
)
args = args_parser.parse_args()
state = State()
cleanup_library(state)
if args.history:
print_reading_history(state)
sys.exit()
if len(args.ebook) == 0:
last_read = state.get_last_read()
if last_read:
return last_read, args.dump
else:
sys.exit("ERROR: Found no last read ebook file.")
elif len(args.ebook) == 1:
nth = coerce_to_int(args.ebook[0])
if nth is not None:
file = get_nth_file_from_library(state, nth)
if file:
return file.filepath, args.dump
else:
print(f"ERROR: #{nth} file not found.")
print_reading_history(state)
sys.exit(1)
elif is_url(args.ebook[0]):
return args.ebook[0], args.dump
elif os.path.isfile(args.ebook[0]):
return args.ebook[0], args.dump
pattern = " ".join(args.ebook)
match = get_matching_library_item(state, pattern)
if match:
return match.filepath, args.dump
else:
sys.exit("ERROR: Found no matching ebook from history.")
def main():
filepath, dump_only = parse_cli_args()
if dump_only:
sys.exit(dump_ebook_content(filepath))
while True:
filepath = curses.wrapper(preread, filepath)
# }}}
if __name__ == "__main__":
main()