aboutsummaryrefslogtreecommitdiffstats
path: root/src/epy_reader/ebooks
diff options
context:
space:
mode:
Diffstat (limited to 'src/epy_reader/ebooks')
-rw-r--r--src/epy_reader/ebooks/__init__.py15
-rw-r--r--src/epy_reader/ebooks/azw.py26
-rw-r--r--src/epy_reader/ebooks/base.py48
-rw-r--r--src/epy_reader/ebooks/epub.py202
-rw-r--r--src/epy_reader/ebooks/fictionbook.py76
-rw-r--r--src/epy_reader/ebooks/mobi.py69
-rw-r--r--src/epy_reader/ebooks/url.py49
7 files changed, 485 insertions, 0 deletions
diff --git a/src/epy_reader/ebooks/__init__.py b/src/epy_reader/ebooks/__init__.py
new file mode 100644
index 0000000..da5cfc0
--- /dev/null
+++ b/src/epy_reader/ebooks/__init__.py
@@ -0,0 +1,15 @@
+__all__ = [
+ "Ebook",
+ "Epub",
+ "FictionBook",
+ "Mobi",
+ "Azw",
+ "URL",
+]
+
+from epy_reader.ebooks.azw import Azw
+from epy_reader.ebooks.base import Ebook
+from epy_reader.ebooks.epub import Epub
+from epy_reader.ebooks.fictionbook import FictionBook
+from epy_reader.ebooks.mobi import Mobi
+from epy_reader.ebooks.url import URL
diff --git a/src/epy_reader/ebooks/azw.py b/src/epy_reader/ebooks/azw.py
new file mode 100644
index 0000000..139fcc5
--- /dev/null
+++ b/src/epy_reader/ebooks/azw.py
@@ -0,0 +1,26 @@
+import contextlib
+import os
+import shutil
+import tempfile
+import zipfile
+
+from epy_reader.ebooks.epub import Epub
+from epy_reader.tools import unpack_kindle_book
+
+
+class Azw(Epub):
+ def __init__(self, fileepub):
+ self.path = os.path.abspath(fileepub)
+ self.tmpdir = tempfile.mkdtemp(prefix="epy-")
+ basename, _ = os.path.splitext(os.path.basename(self.path))
+ self.tmpepub = os.path.join(self.tmpdir, "mobi8", basename + ".epub")
+
+ def initialize(self):
+ with contextlib.redirect_stdout(None):
+ unpack_kindle_book(self.path, self.tmpdir, epubver="A", use_hd=True)
+ self.file = zipfile.ZipFile(self.tmpepub, "r")
+ Epub.initialize(self)
+
+ def cleanup(self) -> None:
+ shutil.rmtree(self.tmpdir)
+ return
diff --git a/src/epy_reader/ebooks/base.py b/src/epy_reader/ebooks/base.py
new file mode 100644
index 0000000..0869db9
--- /dev/null
+++ b/src/epy_reader/ebooks/base.py
@@ -0,0 +1,48 @@
+import xml.etree.ElementTree as ET
+from typing import Tuple, Union
+
+from epy_reader.models import BookMetadata, TocEntry
+
+
+class Ebook:
+ def __init__(self, fileepub: str):
+ raise NotImplementedError("Ebook.__init__() not implemented")
+
+ @property
+ def path(self) -> str:
+ return self._path
+
+ @path.setter
+ def path(self, value: str) -> None:
+ self._path = value
+
+ @property
+ def contents(self) -> Union[Tuple[str, ...], Tuple[ET.Element, ...]]:
+ return self._contents
+
+ @contents.setter
+ def contents(self, value: Union[Tuple[str, ...], Tuple[ET.Element, ...]]) -> None:
+ self._contents = value
+
+ @property
+ def toc_entries(self) -> Tuple[TocEntry, ...]:
+ return self._toc_entries
+
+ @toc_entries.setter
+ def toc_entries(self, value: Tuple[TocEntry, ...]) -> None:
+ self._toc_entries = value
+
+ def get_meta(self) -> BookMetadata:
+ raise NotImplementedError("Ebook.get_meta() not implemented")
+
+ def initialize(self) -> None:
+ raise NotImplementedError("Ebook.initialize() not implemented")
+
+ def get_raw_text(self, content: Union[str, ET.Element]) -> str:
+ raise NotImplementedError("Ebook.get_raw_text() not implemented")
+
+ def get_img_bytestr(self, impath: str) -> Tuple[str, bytes]:
+ raise NotImplementedError("Ebook.get_img_bytestr() not implemented")
+
+ def cleanup(self) -> None:
+ raise NotImplementedError("Ebook.cleanup() not implemented")
diff --git a/src/epy_reader/ebooks/epub.py b/src/epy_reader/ebooks/epub.py
new file mode 100644
index 0000000..a8cf0fa
--- /dev/null
+++ b/src/epy_reader/ebooks/epub.py
@@ -0,0 +1,202 @@
+import dataclasses
+import os
+import xml.etree.ElementTree as ET
+import zipfile
+import zlib
+from typing import Dict, List, Optional, Sequence, Tuple, Union
+from urllib.parse import unquote, urljoin
+
+from epy_reader.ebooks.base import Ebook
+from epy_reader.models import BookMetadata, TocEntry
+
+
+# TODO: to be deprecated
+DEBUG = False
+
+
+class Epub(Ebook):
+ NAMESPACE = {
+ "DAISY": "http://www.daisy.org/z3986/2005/ncx/",
+ "OPF": "http://www.idpf.org/2007/opf",
+ "CONT": "urn:oasis:names:tc:opendocument:xmlns:container",
+ "XHTML": "http://www.w3.org/1999/xhtml",
+ "EPUB": "http://www.idpf.org/2007/ops",
+ # Dublin Core
+ "DC": "http://purl.org/dc/elements/1.1/",
+ }
+
+ def __init__(self, fileepub: str):
+ self.path: str = os.path.abspath(fileepub)
+ self.file: Union[zipfile.ZipFile, str] = zipfile.ZipFile(fileepub, "r")
+
+ # populate these attributes
+ # by calling self.initialize()
+ self.root_filepath: str
+ self.root_dirpath: str
+
+ def get_meta(self) -> BookMetadata:
+ assert isinstance(self.file, zipfile.ZipFile)
+ # why self.file.read(self.root_filepath) problematic
+ # content_opf = ET.fromstring(self.file.open(self.root_filepath).read())
+ content_opf = ET.parse(self.file.open(self.root_filepath))
+ return Epub._get_metadata(content_opf)
+
+ @staticmethod
+ def _get_metadata(content_opf: ET.ElementTree) -> BookMetadata:
+ metadata: Dict[str, Optional[str]] = {}
+ for field in dataclasses.fields(BookMetadata):
+ element = content_opf.find(f".//DC:{field.name}", Epub.NAMESPACE)
+ if element is not None:
+ metadata[field.name] = element.text
+
+ return BookMetadata(**metadata)
+
+ @staticmethod
+ def _get_contents(content_opf: ET.ElementTree) -> Tuple[str, ...]:
+ # cont = ET.parse(self.file.open(self.root_filepath)).getroot()
+ manifests: List[Tuple[str, str]] = []
+ for manifest_elem in content_opf.findall("OPF:manifest/*", Epub.NAMESPACE):
+ # EPUB3
+ # if manifest_elem.get("id") != "ncx" and manifest_elem.get("properties") != "nav":
+ if (
+ manifest_elem.get("media-type") != "application/x-dtbncx+xml"
+ and manifest_elem.get("properties") != "nav"
+ ):
+ manifest_id = manifest_elem.get("id")
+ assert manifest_id is not None
+ manifest_href = manifest_elem.get("href")
+ assert manifest_href is not None
+ manifests.append((manifest_id, manifest_href))
+
+ spines: List[str] = []
+ contents: List[str] = []
+ for spine_elem in content_opf.findall("OPF:spine/*", Epub.NAMESPACE):
+ idref = spine_elem.get("idref")
+ assert idref is not None
+ spines.append(idref)
+ for spine in spines:
+ for manifest in manifests:
+ if spine == manifest[0]:
+ # book_contents.append(root_dirpath + unquote(manifest[1]))
+ contents.append(unquote(manifest[1]))
+ manifests.remove(manifest)
+ # TODO: test is break necessary
+ break
+
+ return tuple(contents)
+
+ @staticmethod
+ def _get_tocs(toc: ET.Element, version: str, contents: Sequence[str]) -> Tuple[TocEntry, ...]:
+ try:
+ # EPUB3
+ if version in {"1.0", "2.0"}:
+ navPoints = toc.findall("DAISY:navMap//DAISY:navPoint", Epub.NAMESPACE)
+ elif version == "3.0":
+ navPoints = toc.findall(
+ "XHTML:body//XHTML:nav[@EPUB:type='toc']//XHTML:a", Epub.NAMESPACE
+ )
+
+ toc_entries: List[TocEntry] = []
+ for navPoint in navPoints:
+ if version in {"1.0", "2.0"}:
+ src_elem = navPoint.find("DAISY:content", Epub.NAMESPACE)
+ assert src_elem is not None
+ src = src_elem.get("src")
+
+ name_elem = navPoint.find("DAISY:navLabel/DAISY:text", Epub.NAMESPACE)
+ assert name_elem is not None
+ name = name_elem.text
+ elif version == "3.0":
+ src_elem = navPoint
+ assert src_elem is not None
+ src = src_elem.get("href")
+
+ name = "".join(list(navPoint.itertext()))
+
+ assert src is not None
+ src_id = src.split("#")
+
+ try:
+ idx = contents.index(unquote(src_id[0]))
+ except ValueError:
+ continue
+
+ # assert name is not None
+ # NOTE: skip empty label
+ if name is not None:
+ toc_entries.append(
+ TocEntry(
+ label=name,
+ content_index=idx,
+ section=src_id[1] if len(src_id) == 2 else None,
+ )
+ )
+ except AttributeError as e:
+ # TODO:
+ if DEBUG:
+ raise e
+
+ return tuple(toc_entries)
+
+ def initialize(self) -> None:
+ assert isinstance(self.file, zipfile.ZipFile)
+
+ container = ET.parse(self.file.open("META-INF/container.xml"))
+ rootfile_elem = container.find("CONT:rootfiles/CONT:rootfile", Epub.NAMESPACE)
+ assert rootfile_elem is not None
+ self.root_filepath = rootfile_elem.attrib["full-path"]
+ self.root_dirpath = (
+ os.path.dirname(self.root_filepath) + "/"
+ if os.path.dirname(self.root_filepath) != ""
+ else ""
+ )
+
+ content_opf = ET.parse(self.file.open(self.root_filepath))
+ version = content_opf.getroot().get("version")
+
+ contents = Epub._get_contents(content_opf)
+ self.contents = tuple(urljoin(self.root_dirpath, content) for content in contents)
+
+ if version in {"1.0", "2.0"}:
+ # "OPF:manifest/*[@id='ncx']"
+ relative_toc = content_opf.find(
+ "OPF:manifest/*[@media-type='application/x-dtbncx+xml']", Epub.NAMESPACE
+ )
+ elif version == "3.0":
+ relative_toc = content_opf.find("OPF:manifest/*[@properties='nav']", Epub.NAMESPACE)
+ else:
+ raise RuntimeError(f"Unsupported Epub version: {version}")
+ assert relative_toc is not None
+ relative_toc_path = relative_toc.get("href")
+ assert relative_toc_path is not None
+ toc_path = self.root_dirpath + relative_toc_path
+ toc = ET.parse(self.file.open(toc_path)).getroot()
+ self.toc_entries = Epub._get_tocs(toc, version, contents) # *self.contents (absolute path)
+
+ def get_raw_text(self, content_path: Union[str, ET.Element]) -> str:
+ assert isinstance(self.file, zipfile.ZipFile)
+ assert isinstance(content_path, str)
+
+ max_tries: Optional[int] = None # 1 if DEBUG else None
+
+ # use try-except block to catch
+ # zlib.error: Error -3 while decompressing data: invalid distance too far back
+ # seems like caused by multiprocessing
+ tries = 0
+ while True:
+ try:
+ content = self.file.open(content_path).read()
+ break
+ except zlib.error as e:
+ tries += 1
+ if max_tries is not None and tries >= max_tries:
+ raise e
+
+ return content.decode("utf-8")
+
+ def get_img_bytestr(self, impath: str) -> Tuple[str, bytes]:
+ assert isinstance(self.file, zipfile.ZipFile)
+ return impath, self.file.read(impath)
+
+ def cleanup(self) -> None:
+ pass
diff --git a/src/epy_reader/ebooks/fictionbook.py b/src/epy_reader/ebooks/fictionbook.py
new file mode 100644
index 0000000..35611b2
--- /dev/null
+++ b/src/epy_reader/ebooks/fictionbook.py
@@ -0,0 +1,76 @@
+import base64
+import os
+import xml.etree.ElementTree as ET
+from typing import List, Tuple, Union
+
+from epy_reader.ebooks import Ebook
+from epy_reader.models import BookMetadata, TocEntry
+
+
+class FictionBook(Ebook):
+ NAMESPACE = {"FB2": "http://www.gribuser.ru/xml/fictionbook/2.0"}
+
+ def __init__(self, filefb: str):
+ self.path = os.path.abspath(filefb)
+ self.file = filefb
+
+ # populate these attribute
+ # by calling self.initialize()
+ self.root: ET.Element
+
+ def get_meta(self) -> BookMetadata:
+ title_elem = self.root.find(".//FB2:book-title", FictionBook.NAMESPACE)
+ first_name_elem = self.root.find(".//FB2:first-name", FictionBook.NAMESPACE)
+ last_name_elem = self.root.find(".//FB2:last-name", FictionBook.NAMESPACE)
+ date_elem = self.root.find(".//FB2:date", FictionBook.NAMESPACE)
+ identifier_elem = self.root.find(".//FB2:id", FictionBook.NAMESPACE)
+
+ author = first_name_elem.text if first_name_elem is not None else None
+ if last_name_elem is not None:
+ if author is not None and author != "":
+ author += f" {last_name_elem.text}"
+ else:
+ author = last_name_elem.text
+
+ return BookMetadata(
+ title=title_elem.text if title_elem is not None else None,
+ creator=author,
+ date=date_elem.text if date_elem is not None else None,
+ identifier=identifier_elem.text if identifier_elem is not None else None,
+ )
+
+ def initialize(self) -> None:
+ cont = ET.parse(self.file)
+ self.root = cont.getroot()
+
+ self.contents = tuple(self.root.findall("FB2:body/*", FictionBook.NAMESPACE))
+
+ # TODO
+ toc_entries: List[TocEntry] = []
+ for n, i in enumerate(self.contents):
+ title = i.find("FB2:title", FictionBook.NAMESPACE)
+ if title is not None:
+ toc_entries.append(
+ TocEntry(label="".join(title.itertext()), content_index=n, section=None)
+ )
+ self.toc_entries = tuple(toc_entries)
+
+ def get_raw_text(self, node: Union[str, ET.Element]) -> str:
+ assert isinstance(node, ET.Element)
+ ET.register_namespace("", "http://www.gribuser.ru/xml/fictionbook/2.0")
+ # sys.exit(ET.tostring(node, encoding="utf8", method="html").decode("utf-8").replace("ns1:",""))
+ return ET.tostring(node, encoding="utf8", method="html").decode("utf-8").replace("ns1:", "")
+
+ def get_img_bytestr(self, imgid: str) -> Tuple[str, bytes]:
+ # TODO: test if image works
+ imgid = imgid.replace("#", "")
+ img_elem = self.root.find("*[@id='{}']".format(imgid))
+ assert img_elem is not None
+ imgtype = img_elem.get("content-type")
+ img_elem_text = img_elem.text
+ assert imgtype is not None
+ assert img_elem_text is not None
+ return imgid + "." + imgtype.split("/")[1], base64.b64decode(img_elem_text)
+
+ def cleanup(self) -> None:
+ return
diff --git a/src/epy_reader/ebooks/mobi.py b/src/epy_reader/ebooks/mobi.py
new file mode 100644
index 0000000..39f3be4
--- /dev/null
+++ b/src/epy_reader/ebooks/mobi.py
@@ -0,0 +1,69 @@
+import contextlib
+import os
+import shutil
+import tempfile
+import xml.etree.ElementTree as ET
+from typing import Tuple, Union
+
+from epy_reader.ebooks.epub import Epub
+from epy_reader.models import BookMetadata
+from epy_reader.tools import unpack_kindle_book
+
+
+class Mobi(Epub):
+ def __init__(self, filemobi: str):
+ self.path = os.path.abspath(filemobi)
+ self.file = tempfile.mkdtemp(prefix="epy-")
+
+ # populate these attribute
+ # by calling self.initialize()
+ self.root_filepath: str
+ self.root_dirpath: str
+
+ def get_meta(self) -> BookMetadata:
+ # why self.file.read(self.root_filepath) problematic
+ with open(os.path.join(self.root_dirpath, "content.opf")) as f:
+ content_opf = ET.parse(f) # .getroot()
+ return Epub._get_metadata(content_opf)
+
+ def initialize(self) -> None:
+ assert isinstance(self.file, str)
+
+ with contextlib.redirect_stdout(None):
+ unpack_kindle_book(self.path, self.file, epubver="A", use_hd=True)
+ # TODO: add cleanup here
+
+ self.root_dirpath = os.path.join(self.file, "mobi7")
+ self.toc_path = os.path.join(self.root_dirpath, "toc.ncx")
+ version = "2.0"
+
+ with open(os.path.join(self.root_dirpath, "content.opf")) as f:
+ content_opf = ET.parse(f) # .getroot()
+
+ contents = Epub._get_contents(content_opf)
+ self.contents = tuple(os.path.join(self.root_dirpath, content) for content in contents)
+
+ with open(self.toc_path) as f:
+ toc = ET.parse(f).getroot()
+ self.toc_entries = Epub._get_tocs(toc, version, contents) # *self.contents (absolute path)
+
+ def get_raw_text(self, content_path: Union[str, ET.Element]) -> str:
+ assert isinstance(content_path, str)
+ with open(content_path, encoding="utf8") as f:
+ content = f.read()
+ # return content.decode("utf-8")
+ return content
+
+ def get_img_bytestr(self, impath: str) -> Tuple[str, bytes]:
+ # TODO: test on windows
+ # if impath "Images/asdf.png" is problematic
+ image_abspath = os.path.join(self.root_dirpath, impath)
+ image_abspath = os.path.normpath(image_abspath) # handle crossplatform path
+ with open(image_abspath, "rb") as f:
+ src = f.read()
+ return impath, src
+
+ def cleanup(self) -> None:
+ assert isinstance(self.file, str)
+ shutil.rmtree(self.file)
+ return
diff --git a/src/epy_reader/ebooks/url.py b/src/epy_reader/ebooks/url.py
new file mode 100644
index 0000000..4356fa7
--- /dev/null
+++ b/src/epy_reader/ebooks/url.py
@@ -0,0 +1,49 @@
+from pathlib import PurePosixPath
+from typing import Tuple
+from urllib.error import HTTPError, URLError
+from urllib.parse import urljoin, urlparse
+from urllib.request import Request, urlopen
+
+from epy_reader import __version__
+from epy_reader.ebooks import Ebook
+from epy_reader.lib import is_url
+from epy_reader.models import BookMetadata
+
+
+class URL(Ebook):
+ _header = {
+ "User-Agent": f"epy/v{__version__}",
+ "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
+ "Accept-Language": "en-US,en;q=0.8",
+ }
+
+ def __init__(self, url: str):
+ self.path = url
+ self.file = url
+ self.contents = ("_",)
+ self.toc_entries = tuple()
+
+ def get_meta(self) -> BookMetadata:
+ return BookMetadata()
+
+ def initialize(self) -> None:
+ try:
+ with urlopen(Request(self.path, headers=URL._header)) as response:
+ self.html = response.read().decode()
+ except HTTPError as e:
+ raise e
+ except URLError as e:
+ raise e
+
+ def get_raw_text(self, _) -> str:
+ return self.html
+
+ def get_img_bytestr(self, src: str) -> Tuple[str, bytes]:
+ image_url = src if is_url(src) else urljoin(self.path, src)
+ # TODO: catch error on request
+ with urlopen(Request(image_url, headers=URL._header)) as response:
+ byte_str = response.read()
+ return PurePosixPath(urlparse(src).path).name, byte_str
+
+ def cleanup(self) -> None:
+ return