1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
#!/usr/bin/python3
from datetime import datetime
import logging
import sys
from os import mkdir
from os.path import basename, exists, normpath, splitext
from pprint import pprint
from shutil import which
from subprocess import DEVNULL, Popen, run, STDOUT, PIPE
from typing import Dict, List
from urllib.parse import urlparse
from xml.etree import ElementTree as ET
ns = {'dflt': 'http://disqus.com',
'dsq': 'http://disqus.com/disqus-internals',
'xsi': 'http://www.w3.org/2001/XMLSchema-instance'}
logging.basicConfig(format='%(levelname)s:%(funcName)s:%(message)s',
level=logging.DEBUG)
log = logging.getLogger()
def init(filename: str) -> ET.Element:
try:
pandoc_ver = run(['pandoc', '--version'], stderr=STDOUT, stdout=PIPE)
except FileNotFoundError:
print('Requires pandoc', file=sys.stderr)
sys.exit(1)
tree = ET.parse(filename)
return tree.getroot()
def collect_threads(root: ET.Element) -> Dict[int, Dict[str, str|List]]:
threads: Dict[int, Dict[str, str|List]] = {}
for thread in root.findall("dflt:thread", ns):
thread_id = int(thread.get(f"{{{ns['dsq']}}}id", 0))
thread_link = thread.findtext("dflt:link", '', ns).strip()
if thread_id not in threads:
threads[thread_id] = {}
threads[thread_id]['link'] = thread_link
if thread_link is not None:
threads[thread_id]['blurb'] = splitext(basename(normpath(urlparse(thread_link).path)))[0].strip()
threads[thread_id]['msgs'] = []
return threads
def collect_posts(root: ET.Element, threads: dict) -> dict:
out = {}
print('', file=sys.stderr, flush=True)
for com_post in root.findall("dflt:post", ns):
is_spam = com_post.findtext('dflt:isSpam', '', ns) == 'true'
is_del = com_post.findtext('dflt:isDeleted', '', ns) == 'true'
if not (is_spam or is_del):
print('.', end='', file=sys.stderr, flush=True)
comment_id = int(com_post.get(f"{{{ns['dsq']}}}id", 0))
thread = com_post.find('dflt:thread', ns)
thread_id = int(thread.get(f"{{{ns['dsq']}}}id", 0))
blurb = threads[thread_id]['blurb']
if blurb not in out:
out[blurb] = {}
out[blurb]['msg'] = ''
parent = com_post.find('dflt:parent', ns)
if parent is not None:
parent_id = int(parent.get(f"{{{ns['dsq']}}}id", 0))
else:
parent_id = 0
created_str = com_post.findtext('dflt:createdAt', '', ns).strip()
if created_str:
created_at = datetime.strptime(created_str, "%Y-%m-%dT%H:%M:%SZ")
else:
created_at = datetime.fromtimestamp(0)
html_msg = com_post.find('dflt:message', ns)
if html_msg is not None:
html_msg = html_msg.text
pd_proc = Popen([which('pandoc'), '-f', 'html', '-t', 'rst'],
stderr=DEVNULL, stdout=PIPE, stdin=PIPE,
encoding='utf-8')
html_msg = pd_proc.communicate(html_msg)[0]
if created_at:
html_msg = f":date: {created_at:%d.%m.%Y}\n\n{html_msg}"
if parent_id:
html_msg = f":parentID#: {parent_id}\n{html_msg}"
if comment_id:
html_msg = f":commentID#: {comment_id}\n{html_msg}"
out[blurb]['msg'] += html_msg + '\n'
print('', file=sys.stderr)
return out
if __name__=='__main__':
root = init("comments.xml")
threads = collect_threads(root)
posts = collect_posts(root, threads)
if not exists('comments/'):
mkdir('comments')
for post in posts:
with open(f"comments/{post}.rst", "w") as out_rst:
print(posts[post]['msg'].strip(), file=out_rst)
|