aboutsummaryrefslogblamecommitdiffstats
path: root/epubgrep.py
blob: 0b960a2e5b42ea3363f5bc937c559d3d3bf5423d (plain) (tree)
1
2
3
4
5
6
7

                      
                         
              

              
          





                                                   





                                                                                            
                         

                                                          




                                                                 

 
                                                                                                
                                                                            

                                                      

                  
 

                                

                                                        
                                   
                                                           
                                    
                             


                                   



                                                                 
                                             
                                                      
                                                


                                  



                                                                               



                                                                        

                                                        

                                                             


                                      


                                                        



                                                                           

                                                                             


                                                              




                                                                               
                                                                
                                 
                                                          

                 


                                                   
 
 
           

                                                                          



                                                             


                                                            
                                                    

                                                                     


                                                      
                              
                                  




                            


                                   






                                                                   
            





                                                    
                      



                          
#!/usr/bin/env python3
import argparse
import concurrent.futures
import logging
import os.path
import re
import sys
import zipfile

from typing import Any, Dict, List, Optional, Tuple

import epub_meta

logging.basicConfig(format='%(levelname)s:%(funcName)s:%(message)s',
                    level=logging.INFO)
log = logging.getLogger('epubgrep')


def get_chapter_title(mdata: List[Dict[str, Any]], fname: str) -> Optional[Tuple[str, int]]:
    if mdata is not None:
        found_list = [(x['title'], x['index'])
                      for x in mdata if x['src'] == fname]
        if len(found_list) > 0:
            chap_title = found_list[0][0].strip(' \t.0123456789')
            return chap_title, found_list[0][1]
        else:
            return ('Unknown', 0)


def grep_book(filename: str, pattern: str, flags: int, counting: bool=False, color: bool=False):
    assert os.path.isfile(filename), "{} is not EPub file.".format(filename)
    sought_RE = re.compile('(' + pattern + ')', flags)
    count = 0
    out_title = ''
    out = ''

    mline = flags & re.M == re.M

    try:
        metadata = epub_meta.get_epub_metadata(filename)
    except epub_meta.EPubException:
        log.exception('Failed to open {}'.format(filename))
    book = zipfile.ZipFile(filename)
    printed_booktitle = False

    for zif in book.infolist():
        with book.open(zif) as inf:
            if mline:
                decoded_str = inf.read().decode(errors='replace')
                res = sought_RE.search(decoded_str)
                if res:
                    if not printed_booktitle:
                        out += '{}\n'.format(filename)
                        printed_booktitle = True
                    if counting:
                        count += 1
                    else:
                        chap_info = get_chapter_title(metadata.toc,
                                                      zif.filename)
                        out += "{}. {}:\n\n".format(chap_info[1], chap_info[0])
                        out += '{}\n\n'.format(res.group(0))
            else:
                printed_title = False
                for line in inf:
                    decoded_line = line.decode(errors='replace').strip()
                    res = sought_RE.search(decoded_line)
                    if res:
                        if not out_title:
                            out_title = '{}'.format(filename)
                        if counting:
                            count += 1
                        else:
                            if not printed_booktitle:
                                out += out_title + '\n'
                                printed_booktitle = True
                            if not printed_title:
                                chap_info = get_chapter_title(metadata.toc,
                                                              zif.filename)
                                if chap_info is not None:
                                    out += "{}. {}:\n\n".format(chap_info[1],
                                                                chap_info[0])
                                    printed_title = True
                        # https://stackoverflow.com/a/33206814
                        # print("\033[31;1;4mHello\033[0m")
                        if not counting:
                            if color:
                                found_line = decoded_line.replace(
                                    res.group(1),
                                    "\033[31;1m" + res.group(1) + "\033[31;0m")
                                out += '{}\n'.format(found_line)
                            else:
                                out += decoded_line + '\n'

    if count > 0:
        out += '{:02d}:{}'.format(count, out_title)

    return out


def main():
    parser = argparse.ArgumentParser(description='Grep through EPub book')
    parser.add_argument('pattern')
    parser.add_argument('files', nargs='+')
    parser.add_argument('-c', '--count',
                        action='store_true',
                        help="just counts of found patterns")
    parser.add_argument('-i', '--ignore-case',
                        action='store_true',
                        help="make search case insensitive")
    parser.add_argument('-o', '--color', '--colour',
                        action='store_false',
                        help="Do NOT mark found patterns with color")
    parser.add_argument('-m', '--multi-line',
                        action='store_true',
                        help="make search multi line")
    args = parser.parse_args()
    # log.debug('args = %s', args)

    search_flags = 0
    if args.ignore_case:
        search_flags |= re.I

    if args.multi_line:
        search_flags |= re.M | re.S

    with concurrent.futures.ProcessPoolExecutor() as executor:
        fut_to_fname = {executor.submit(grep_book,
                                        os.path.realpath(filename),
                                        args.pattern, search_flags,
                                        args.count, args.color):
                        filename for filename in args.files}
    for future in concurrent.futures.as_completed(fut_to_fname):
        try:
            data = future.result()
            if data:
                data = data.rstrip()
            if len(data) > 0:
                print(data)
        except (BrokenPipeError, KeyboardInterrupt):
            sys.exit()


if __name__ == '__main__':
    main()