diff options
Diffstat (limited to 'src/epy_reader/utils.py')
-rw-r--r-- | src/epy_reader/utils.py | 377 |
1 files changed, 377 insertions, 0 deletions
diff --git a/src/epy_reader/utils.py b/src/epy_reader/utils.py new file mode 100644 index 0000000..5bba7f6 --- /dev/null +++ b/src/epy_reader/utils.py @@ -0,0 +1,377 @@ +import curses +import os +import re +import sys +import textwrap +from functools import wraps +from typing import List, Mapping, Optional, Sequence, Tuple, Union + +from epy_reader.ebooks import URL, Azw, Ebook, Epub, FictionBook, Mobi +from epy_reader.lib import is_url, tuple_subtract +from epy_reader.models import Key, LettersCount, NoUpdate, ReadingState, TextStructure, TocEntry +from epy_reader.parser import parse_html +from epy_reader.speakers import SpeakerBaseModel, SpeakerMimic, SpeakerPico + + +def get_ebook_obj(filepath: str) -> Ebook: + file_ext = os.path.splitext(filepath)[1].lower() + if is_url(filepath): + return URL(filepath) + elif file_ext in {".epub", ".epub3"}: + return Epub(filepath) + elif file_ext == ".fb2": + return FictionBook(filepath) + elif file_ext == ".mobi": + return Mobi(filepath) + elif file_ext in {".azw", ".azw3"}: + return Azw(filepath) + else: + sys.exit("ERROR: Format not supported. (Supported: epub, fb2)") + + +def safe_curs_set(state: int) -> None: + try: + curses.curs_set(state) + except: + return + + +def find_current_content_index( + toc_entries: Tuple[TocEntry, ...], toc_secid: Mapping[str, int], index: int, y: int +) -> int: + ntoc = 0 + for n, toc_entry in enumerate(toc_entries): + if toc_entry.content_index <= index: + if y >= toc_secid.get(toc_entry.section, 0): # type: ignore + ntoc = n + return ntoc + + +def pgup(current_row: int, window_height: int, counter: int = 1) -> int: + if current_row >= (window_height) * counter: + return current_row - (window_height) * counter + else: + return 0 + + +def pgdn(current_row: int, total_lines: int, window_height: int, counter: int = 1) -> int: + if current_row + (window_height * counter) <= total_lines - window_height: + return current_row + (window_height * counter) + else: + current_row = total_lines - window_height + if current_row < 0: + return 0 + return current_row + + +def pgend(total_lines: int, window_height: int) -> int: + if total_lines - window_height >= 0: + return total_lines - window_height + else: + return 0 + + +def choice_win(allowdel=False): + """ + Conjure options window by wrapping a window function + which has a return type of tuple in the form of + (title, list_to_chose, initial_active_index, windows_key_to_toggle) + and return tuple of (returned_key, chosen_index, chosen_index_to_delete) + """ + + def inner_f(listgen): + @wraps(listgen) + def wrapper(self, *args, **kwargs): + rows, cols = self.screen.getmaxyx() + hi, wi = rows - 4, cols - 4 + Y, X = 2, 2 + chwin = curses.newwin(hi, wi, Y, X) + if self.is_color_supported: + chwin.bkgd(self.screen.getbkgd()) + + title, ch_list, index, key = listgen(self, *args, **kwargs) + + if len(title) > cols - 8: + title = title[: cols - 8] + + chwin.box() + chwin.keypad(True) + chwin.addstr(1, 2, title) + chwin.addstr(2, 2, "-" * len(title)) + if allowdel: + chwin.addstr(3, 2, "HINT: Press 'd' to delete.") + key_chwin = 0 + + totlines = len(ch_list) + chwin.refresh() + pad = curses.newpad(totlines, wi - 2) + if self.is_color_supported: + pad.bkgd(self.screen.getbkgd()) + + pad.keypad(True) + + padhi = rows - 5 - Y - 4 + 1 - (1 if allowdel else 0) + # padhi = rows - 5 - Y - 4 + 1 - 1 + y = 0 + if index in range(padhi // 2, totlines - padhi // 2): + y = index - padhi // 2 + 1 + span = [] + + for n, i in enumerate(ch_list): + # strs = " " + str(n+1).rjust(d) + " " + i[0] + # remove newline from choice entries + # mostly happens in FictionBook (.fb2) format + strs = " " + i.replace("\n", " ") + strs = strs[0 : wi - 3] + pad.addstr(n, 0, strs) + span.append(len(strs)) + + countstring = "" + while key_chwin not in self.keymap.Quit + key: + if countstring == "": + count = 1 + else: + count = int(countstring) + if key_chwin in tuple(Key(i) for i in range(48, 58)): # i.e., k is a numeral + countstring = countstring + key_chwin.char + else: + if key_chwin in self.keymap.ScrollUp + self.keymap.PageUp: + index -= count + if index < 0: + index = 0 + elif key_chwin in self.keymap.ScrollDown or key_chwin in self.keymap.PageDown: + index += count + if index + 1 >= totlines: + index = totlines - 1 + elif key_chwin in self.keymap.Follow: + chwin.clear() + chwin.refresh() + return None, index, None + elif key_chwin in self.keymap.BeginningOfCh: + index = 0 + elif key_chwin in self.keymap.EndOfCh: + index = totlines - 1 + elif key_chwin == Key("D") and allowdel: + return None, (0 if index == 0 else index - 1), index + # chwin.redrawwin() + # chwin.refresh() + elif key_chwin == Key("d") and allowdel: + resk, resp, _ = self.show_win_options( + "Delete '{}'?".format(ch_list[index]), + ["(Y)es", "(N)o"], + 0, + (Key("n"),), + ) + if resk is not None: + key_chwin = resk + continue + elif resp == 0: + return None, (0 if index == 0 else index - 1), index + chwin.redrawwin() + chwin.refresh() + elif key_chwin in {Key(i) for i in ["Y", "y", "N", "n"]} and ch_list == [ + "(Y)es", + "(N)o", + ]: + if key_chwin in {Key("Y"), Key("y")}: + return None, 0, None + else: + return None, 1, None + elif key_chwin in tuple_subtract(self._win_keys, key): + chwin.clear() + chwin.refresh() + return key_chwin, index, None + countstring = "" + + while index not in range(y, y + padhi): + if index < y: + y -= 1 + else: + y += 1 + + for n in range(totlines): + att = curses.A_REVERSE if index == n else curses.A_NORMAL + pre = ">>" if index == n else " " + pad.addstr(n, 0, pre) + pad.chgat(n, 0, span[n], pad.getbkgd() | att) + + pad.refresh(y, 0, Y + 4 + (1 if allowdel else 0), X + 4, rows - 5, cols - 6) + # pad.refresh(y, 0, Y+5, X+4, rows - 5, cols - 6) + key_chwin = Key(chwin.getch()) + if key_chwin == Key(curses.KEY_MOUSE): + mouse_event = curses.getmouse() + if mouse_event[4] == curses.BUTTON4_PRESSED: + key_chwin = self.keymap.ScrollUp[0] + elif mouse_event[4] == 2097152: + key_chwin = self.keymap.ScrollDown[0] + elif mouse_event[4] == curses.BUTTON1_DOUBLE_CLICKED: + if ( + mouse_event[2] >= 6 + and mouse_event[2] < rows - 4 + and mouse_event[2] < 6 + totlines + ): + index = mouse_event[2] - 6 + y + key_chwin = self.keymap.Follow[0] + elif ( + mouse_event[4] == curses.BUTTON1_CLICKED + and mouse_event[2] >= 6 + and mouse_event[2] < rows - 4 + and mouse_event[2] < 6 + totlines + ): + if index == mouse_event[2] - 6 + y: + key_chwin = self.keymap.Follow[0] + continue + index = mouse_event[2] - 6 + y + elif mouse_event[4] == curses.BUTTON3_CLICKED: + key_chwin = self.keymap.Quit[0] + + chwin.clear() + chwin.refresh() + return None, None, None + + return wrapper + + return inner_f + + +def text_win(textfunc): + @wraps(textfunc) + def wrapper(self, *args, **kwargs) -> Union[NoUpdate, Key]: + rows, cols = self.screen.getmaxyx() + hi, wi = rows - 4, cols - 4 + Y, X = 2, 2 + textw = curses.newwin(hi, wi, Y, X) + if self.is_color_supported: + textw.bkgd(self.screen.getbkgd()) + + title, raw_texts, key = textfunc(self, *args, **kwargs) + + if len(title) > cols - 8: + title = title[: cols - 8] + + texts = [] + for i in raw_texts.splitlines(): + texts += textwrap.wrap(i, wi - 6, drop_whitespace=False) + + textw.box() + textw.keypad(True) + textw.addstr(1, 2, title) + textw.addstr(2, 2, "-" * len(title)) + key_textw: Union[NoUpdate, Key] = NoUpdate() + + totlines = len(texts) + + pad = curses.newpad(totlines, wi - 2) + if self.is_color_supported: + pad.bkgd(self.screen.getbkgd()) + + pad.keypad(True) + for n, i in enumerate(texts): + pad.addstr(n, 0, i) + y = 0 + textw.refresh() + pad.refresh(y, 0, Y + 4, X + 4, rows - 5, cols - 6) + padhi = rows - 8 - Y + + while key_textw not in self.keymap.Quit + key: + if key_textw in self.keymap.ScrollUp and y > 0: + y -= 1 + elif key_textw in self.keymap.ScrollDown and y < totlines - hi + 6: + y += 1 + elif key_textw in self.keymap.PageUp: + y = pgup(y, padhi) + elif key_textw in self.keymap.PageDown: + y = pgdn(y, totlines, padhi) + elif key_textw in self.keymap.BeginningOfCh: + y = 0 + elif key_textw in self.keymap.EndOfCh: + y = pgend(totlines, padhi) + elif key_textw in tuple_subtract(self._win_keys, key): + textw.clear() + textw.refresh() + return key_textw + pad.refresh(y, 0, 6, 5, rows - 5, cols - 5) + key_textw = Key(textw.getch()) + + textw.clear() + textw.refresh() + return NoUpdate() + + return wrapper + + +def merge_text_structures( + text_structure_first: TextStructure, text_structure_second: TextStructure +) -> TextStructure: + return TextStructure( + text_lines=text_structure_first.text_lines + text_structure_second.text_lines, + image_maps={**text_structure_first.image_maps, **text_structure_second.image_maps}, + section_rows={**text_structure_first.section_rows, **text_structure_second.section_rows}, + formatting=text_structure_first.formatting + text_structure_second.formatting, + ) + + +def construct_relative_reading_state( + abs_reading_state: ReadingState, totlines_per_content: Sequence[int] +) -> ReadingState: + """ + :param abs_reading_state: ReadingState absolute to whole book when Setting.Seamless==True + :param totlines_per_content: sequence of total lines per book content + :return: new ReadingState relative to per content of the book + """ + index = 0 + cumulative_contents_lines = 0 + all_contents_lines = sum(totlines_per_content) + # for n, content_lines in enumerate(totlines_per_content): + # cumulative_contents_lines += content_lines + # if cumulative_contents_lines > abs_reading_state.row: + # return + while True: + content_lines = totlines_per_content[index] + cumulative_contents_lines += content_lines + if cumulative_contents_lines > abs_reading_state.row: + break + index += 1 + + return ReadingState( + content_index=index, + textwidth=abs_reading_state.textwidth, + row=abs_reading_state.row - cumulative_contents_lines + content_lines, + rel_pctg=abs_reading_state.rel_pctg + - ((cumulative_contents_lines - content_lines) / all_contents_lines) + if abs_reading_state.rel_pctg + else None, + section=abs_reading_state.section, + ) + + +def count_letters(ebook: Ebook) -> LettersCount: + per_content_counts: List[int] = [] + cumulative_counts: List[int] = [] + # assert isinstance(ebook.contents, tuple) + for i in ebook.contents: + content = ebook.get_raw_text(i) + src_lines = parse_html(content) + assert isinstance(src_lines, tuple) + cumulative_counts.append(sum(per_content_counts)) + per_content_counts.append(sum([len(re.sub(r"\s", "", j)) for j in src_lines])) + + return LettersCount(all=sum(per_content_counts), cumulative=tuple(cumulative_counts)) + + +def count_letters_parallel(ebook: Ebook, child_conn) -> None: + child_conn.send(count_letters(ebook)) + child_conn.close() + + +def construct_speaker( + preferred: Optional[str] = None, args: List[str] = [] +) -> Optional[SpeakerBaseModel]: + available_speakers = [SpeakerMimic, SpeakerPico] + sorted_speakers = ( + sorted(available_speakers, key=lambda x: int(x.cmd == preferred), reverse=True) + if preferred + else available_speakers + ) + speaker = next((speaker for speaker in sorted_speakers if speaker.available), None) + return speaker(args) if speaker else None |