diff options
author | Matěj Cepl <mcepl@redhat.com> | 2014-01-09 00:37:42 +0100 |
---|---|---|
committer | Matěj Cepl <mcepl@cepl.eu> | 2016-04-16 12:15:16 +0200 |
commit | 4dfe96db384a98c654f31063736a2e0c84a9ea69 (patch) | |
tree | c45cdf3c876f6850d29dbd99d44d24f1cc4715dd /gg_scraper.py | |
parent | 2b9e1003e81754528fb18a7d835ed9c3c800d743 (diff) | |
download | gg_scraper-4dfe96db384a98c654f31063736a2e0c84a9ea69.tar.gz |
Add concurrent processing of Group.collect_group()
Diffstat (limited to 'gg_scraper.py')
-rwxr-xr-x | gg_scraper.py | 40 |
1 files changed, 29 insertions, 11 deletions
diff --git a/gg_scraper.py b/gg_scraper.py index bd44b18..4c7fef7 100755 --- a/gg_scraper.py +++ b/gg_scraper.py @@ -43,13 +43,18 @@ try: except ImportError: from urllib2 import (HTTPError, HTTPHandler, HTTPRedirectHandler, build_opener) -#from concurrent.futures import ProcessPoolExecutor +from concurrent.futures import ThreadPoolExecutor, as_completed +try: + from queue import Queue +except ImportError: + from Queue import Queue from bs4 import BeautifulSoup import logging logging.basicConfig(format='%(levelname)s:%(funcName)s:%(message)s', level=logging.DEBUG) ADDR_SEC_LABEL = 'addresses' +MAX_THREADS = 42 MANGLED_ADDR_RE = re.compile( r'([a-zA-Z0-9_.+-]+(\.)+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)', re.IGNORECASE) @@ -248,16 +253,29 @@ class Group(Page): def collect_group(self): self.topics = self.get_topics() len_topics = len(self.topics) - for top in self.topics: - #print('[%d/%d] downloading "%s"' % (self.topics.index(top), - # len_topics, top.name)) - print('[%d/%d] downloading' % (self.topics.index(top), len_topics)) - arts = top.get_articles() - top.articles = arts - for a in arts: - msg = a.collect_message() - if msg is not None: - a.raw_message = msg + jobs = [] + with ThreadPoolExecutor(MAX_THREADS) as executor: + for top in self.topics: + #print('[%d/%d] downloading "%s"' % (self.topics.index(top), + # len_topics, top.name)) + print('[%d/%d] downloading' % (self.topics.index(top), len_topics)) + job = executor.submit(top.get_articles) + jobs.append(job) + + for job in as_completed(jobs): + arts = job.result() + top.articles = arts + msg_jobs = {} + + for a_job in arts: + m_job = executor.submit(a_job.collect_message) + msg_jobs[m_job] = a_job + + for m_job in as_completed(msg_jobs): + a_job = msg_jobs[m_job] + msg = m_job.result() + if msg is not None: + a_job.raw_message = msg def all_messages(self): '''Iterate over all messages in the group''' |