From eae2c10f4c81e560629b0e55a048f6a3186092af Mon Sep 17 00:00:00 2001 From: Matěj Cepl Date: Thu, 9 Jan 2020 17:33:04 +0100 Subject: Some more work on reorganizing the code to actor model --- dlpcvp.py | 157 +++++++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 105 insertions(+), 52 deletions(-) diff --git a/dlpcvp.py b/dlpcvp.py index d9d684d..2ad1377 100755 --- a/dlpcvp.py +++ b/dlpcvp.py @@ -10,10 +10,12 @@ import sys import urllib.request import xml.etree.ElementTree as ET from distutils.version import LooseVersion +from enum import Enum, auto from typing import Iterable, List, Optional, Tuple, Union from urllib.error import URLError, HTTPError from urllib.request import Request, urlopen -import thespian + +from thespian.actors import Actor, ActorSystem # PyPI API documentation https://warehouse.readthedocs.io/api-reference/ PyPI_base = "https://pypi.org/pypi/{}/json" @@ -249,74 +251,87 @@ def get_version_from_pypi( return None -def package_version( - proj: str, pkgn: str, con: DBConnType = None -) -> Optional[LooseVersion]: - """ - Return the version of the given package in the given proj. - - Downloads SPEC file from OBS and parses it. - """ - # pkg, suse_name_etag, suse_spec_etag, pypi_etag - if con: - _, etag_fn, etag_spcf, _ = get_etags(con, pkgn) - else: - etag_fn, etag_spcf = None, None +class SPkg(Enum): + CURRENT = auto() + NOOBS = auto() - # Get listing of the package repository - req_spc_name = Request(url=OBS_base + f'/source/{proj}/{pkgn}?expand=1') - spc_fname = get_spec_name(req_spc_name, proj, pkgn, etag_fn) - if spc_fname is None: - return None +class DispatchCmd(Enum): + INIT = auto() + PKG = auto() - req_spec = Request( - url=OBS_base + f'/source/{proj}/{pkgn}/{spc_fname}?expand=1' - ) - if etag_spcf is not None: - req_spc_name.add_header('ETag', etag_spcf) +class SPECParser(Actor): + def package_version( + self, proj: str, pkgn: str, con: DBConnType = None + ) -> Optional[LooseVersion]: + """ + Return the version of the given package in the given proj. - try: - with opener.open(req_spec) as resp: - etag_spcf = str(resp.info()['ETag']) - etag_spcf = None if not etag_spcf else etag_spcf - spec_file_str = resp.read().decode() - - if (con is not None) and (etag_spcf or etag_fn): - update_etags(con, pkgn, etag_fn, etag_spcf, None) - return parse_spec(spec_file_str) - except HTTPError as ex: - if ex.getcode() == 404: - log.warning(f'Cannot parse SPEC file {spc_fname} for {pkgn}') + Downloads SPEC file from OBS and parses it. + """ + # pkg, suse_name_etag, suse_spec_etag, pypi_etag + if con: + _, etag_fn, etag_spcf, _ = get_etags(con, pkgn) else: - raise - return None + etag_fn, etag_spcf = None, None + + # Get listing of the package repository + req_spc_name = Request(url=OBS_base + + f'/source/{proj}/{pkgn}?expand=1') + spc_fname = get_spec_name(req_spc_name, proj, pkgn, etag_fn) + if spc_fname is None: + return None -class SUSEPkgVerifier(thespian.actors.Actor): - def receiveMessage(self, message, sender): + req_spec = Request( + url=OBS_base + f'/source/{proj}/{pkgn}/{spc_fname}?expand=1' + ) + + if etag_spcf is not None: + req_spc_name.add_header('ETag', etag_spcf) + + try: + with opener.open(req_spec) as resp: + etag_spcf = str(resp.info()['ETag']) + etag_spcf = None if not etag_spcf else etag_spcf + spec_file_str = resp.read().decode() + + if (con is not None) and (etag_spcf or etag_fn): + update_etags(con, pkgn, etag_fn, etag_spcf, None) + return parse_spec(spec_file_str) + except HTTPError as ex: + if ex.getcode() == 404: + log.warning(f'Cannot parse SPEC file {spc_fname} for {pkgn}') + else: + raise + return SPkg.NOOBS + + def receiveMessage(self, msg, sender): pass -def main(prj): - db_name = osp.splitext(osp.basename(osp.realpath(__file__)))[0] + ".db" - to_be_upgraded = [] # type: List[Tuple[str, LooseVersion, LooseVersion]] - missing_on_PyPI = [] # type: List[str] - with sqlite3.connect(db_name) as conn: - if not conn.execute(TB_EXISTS).fetchone(): - conn.execute(TB_CREATE) - conn.execute(TB_CREATE_IDX) +class PkgDispatcher(Actor): + def __init__(self, conn, project): + self.conn = None # database connection for ETags management + self.prj = None # currently processed project on OBS - for pkg in suse_packages(prj): + def receiveMessage(self, msg, sender): + cmd, val = msg + + if cmd is DispatchCmd.INIT: + self.conn = val.conn + self.prj = val.prj + elif cmd is DispatchCmd.PKG: + pkg = msg log.debug('pkg = %s', pkg) print(pkg[0], file=sys.stderr, end='', flush=True) if pkg.startswith('python-'): pypi_name = pkg[CUTCHARS:] try: - suse_ver = package_version(prj, pkg, conn) - if suse_ver is None: - raise RuntimeError('not in OBS') + suse_ver = self.package_version(self.prj, pkg, self.conn) + if suse_ver is SPkg.NOOBS: + return suse_ver if not is_develpackage(prj, pkg): continue pypi_ver = get_version_from_pypi(pypi_name, conn) @@ -341,6 +356,44 @@ def main(prj): else: missing_on_PyPI.append(pkg) + +def main(prj): + db_name = osp.splitext(osp.basename(osp.realpath(__file__)))[0] + ".db" + to_be_upgraded = [] # type: List[Tuple[str, LooseVersion, LooseVersion]] + missing_on_PyPI = [] # type: List[str] + actorsys = ActorSystem() + + with sqlite3.connect(db_name) as conn: + if not conn.execute(TB_EXISTS).fetchone(): + conn.execute(TB_CREATE) + conn.execute(TB_CREATE_IDX) + + act_dispatcher = actorsys.createActor(PkgDispatcher) # prj conn + actorsys.tell(act_dispatcher, (DispatchCmd.INIT, (conn, prj))) + + # https://thespianpy.com/doc/in_depth#outline-container-orgbc15e0e + # There are no futures in the Actors model, and there shouldn't + # be ones: + # Actors thus operate on individual messages, running when + # a message is received and finishing when they exit from the + # receiveMessage(), during which they may have created other + # Actors or sent other messages. This message passing paradigm + # is different from the conventional blocking paradigm used by + # most code. In Thespian, there are no need for special objects + # like futures that hide asynchronous functionality behind what + # looks like synchronous code. If an Actor does block during the + # receiveMessage() execution, it will not receive other messages + # while it is blocked; the typical method of addressing this in + # an Actor model is to start up more Actors or delegate the + # blocking functionality to a sub-Actor that was dynamically + # created as needed. + # TL;DR: Individual actors just have to send message to + # appropriate actors processing results. + for pkg in suse_packages(prj): + ret = actorsys.ask(act_dispatcher, (DispatchCmd.PKG, pkg)) + + actorsys.shutdown() # end of the processing + sys.stdout.flush() if missing_on_PyPI: print("\nThese packages don't seem to be available on PyPI:") -- cgit