aboutsummaryrefslogtreecommitdiffstats
path: root/gg_scraper.py
diff options
context:
space:
mode:
authorMatěj Cepl <mcepl@redhat.com>2014-01-09 00:37:42 +0100
committerMatěj Cepl <mcepl@cepl.eu>2016-04-16 12:15:16 +0200
commit4dfe96db384a98c654f31063736a2e0c84a9ea69 (patch)
treec45cdf3c876f6850d29dbd99d44d24f1cc4715dd /gg_scraper.py
parent2b9e1003e81754528fb18a7d835ed9c3c800d743 (diff)
downloadgg_scraper-4dfe96db384a98c654f31063736a2e0c84a9ea69.tar.gz
Add concurrent processing of Group.collect_group()
Diffstat (limited to 'gg_scraper.py')
-rwxr-xr-xgg_scraper.py40
1 files changed, 29 insertions, 11 deletions
diff --git a/gg_scraper.py b/gg_scraper.py
index bd44b18..4c7fef7 100755
--- a/gg_scraper.py
+++ b/gg_scraper.py
@@ -43,13 +43,18 @@ try:
except ImportError:
from urllib2 import (HTTPError, HTTPHandler, HTTPRedirectHandler,
build_opener)
-#from concurrent.futures import ProcessPoolExecutor
+from concurrent.futures import ThreadPoolExecutor, as_completed
+try:
+ from queue import Queue
+except ImportError:
+ from Queue import Queue
from bs4 import BeautifulSoup
import logging
logging.basicConfig(format='%(levelname)s:%(funcName)s:%(message)s',
level=logging.DEBUG)
ADDR_SEC_LABEL = 'addresses'
+MAX_THREADS = 42
MANGLED_ADDR_RE = re.compile(
r'([a-zA-Z0-9_.+-]+(\.)+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)',
re.IGNORECASE)
@@ -248,16 +253,29 @@ class Group(Page):
def collect_group(self):
self.topics = self.get_topics()
len_topics = len(self.topics)
- for top in self.topics:
- #print('[%d/%d] downloading "%s"' % (self.topics.index(top),
- # len_topics, top.name))
- print('[%d/%d] downloading' % (self.topics.index(top), len_topics))
- arts = top.get_articles()
- top.articles = arts
- for a in arts:
- msg = a.collect_message()
- if msg is not None:
- a.raw_message = msg
+ jobs = []
+ with ThreadPoolExecutor(MAX_THREADS) as executor:
+ for top in self.topics:
+ #print('[%d/%d] downloading "%s"' % (self.topics.index(top),
+ # len_topics, top.name))
+ print('[%d/%d] downloading' % (self.topics.index(top), len_topics))
+ job = executor.submit(top.get_articles)
+ jobs.append(job)
+
+ for job in as_completed(jobs):
+ arts = job.result()
+ top.articles = arts
+ msg_jobs = {}
+
+ for a_job in arts:
+ m_job = executor.submit(a_job.collect_message)
+ msg_jobs[m_job] = a_job
+
+ for m_job in as_completed(msg_jobs):
+ a_job = msg_jobs[m_job]
+ msg = m_job.result()
+ if msg is not None:
+ a_job.raw_message = msg
def all_messages(self):
'''Iterate over all messages in the group'''