diff options
Diffstat (limited to 'libbe/util')
-rw-r--r-- | libbe/util/__init__.py | 24 | ||||
-rw-r--r-- | libbe/util/encoding.py | 91 | ||||
-rw-r--r-- | libbe/util/id.py | 711 | ||||
-rw-r--r-- | libbe/util/plugin.py | 67 | ||||
-rw-r--r-- | libbe/util/subproc.py | 223 | ||||
-rw-r--r-- | libbe/util/tree.py | 258 | ||||
-rw-r--r-- | libbe/util/utility.py | 248 |
7 files changed, 1622 insertions, 0 deletions
diff --git a/libbe/util/__init__.py b/libbe/util/__init__.py new file mode 100644 index 0000000..0f4850f --- /dev/null +++ b/libbe/util/__init__.py @@ -0,0 +1,24 @@ +# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Miscellaneous utilities. +""" + +class InvalidObject (object): + """An object that won't come up by accident.""" + pass + diff --git a/libbe/util/encoding.py b/libbe/util/encoding.py new file mode 100644 index 0000000..8eea438 --- /dev/null +++ b/libbe/util/encoding.py @@ -0,0 +1,91 @@ +# Copyright (C) 2008-2010 Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Support input/output/filesystem encodings (e.g. UTF-8). +""" + +import codecs +import locale +import sys +import types + +import libbe +if libbe.TESTING == True: + import doctest + + +ENCODING = None # override get_encoding() output by setting this + +def get_encoding(): + """ + Guess a useful input/output/filesystem encoding... Maybe we need + seperate encodings for input/output and filesystem? Hmm... + """ + if ENCODING != None: + return ENCODING + encoding = locale.getpreferredencoding() or sys.getdefaultencoding() + if sys.platform != 'win32' or sys.version_info[:2] > (2, 3): + encoding = locale.getlocale(locale.LC_TIME)[1] or encoding + # Python 2.3 on windows doesn't know about 'XYZ' alias for 'cpXYZ' + return encoding + +def get_input_encoding(): + return get_encoding() + +def get_output_encoding(): + return get_encoding() + +def get_filesystem_encoding(): + return get_encoding() + +def known_encoding(encoding): + """ + >>> known_encoding("highly-unlikely-encoding") + False + >>> known_encoding(get_encoding()) + True + """ + try: + codecs.lookup(encoding) + return True + except LookupError: + return False + +def get_file_contents(path, mode='r', encoding=None, decode=False): + if decode == True: + if encoding == None: + encoding = get_filesystem_encoding() + f = codecs.open(path, mode, encoding) + else: + f = open(path, mode) + contents = f.read() + f.close() + return contents + +def set_file_contents(path, contents, mode='w', encoding=None): + if type(contents) == types.UnicodeType: + if encoding == None: + encoding = get_filesystem_encoding() + f = codecs.open(path, mode, encoding) + else: + f = open(path, mode) + f.write(contents) + f.close() + +if libbe.TESTING == True: + suite = doctest.DocTestSuite() diff --git a/libbe/util/id.py b/libbe/util/id.py new file mode 100644 index 0000000..76079e7 --- /dev/null +++ b/libbe/util/id.py @@ -0,0 +1,711 @@ +# Copyright (C) 2008-2010 Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Handle ID creation and parsing. + +Format +====== + +BE IDs are formatted:: + + <bug-directory>[/<bug>[/<comment>]] + +where each ``<..>`` is a UUID. For example:: + + bea86499-824e-4e77-b085-2d581fa9ccab/3438b72c-6244-4f1d-8722-8c8d41484e35 + +refers to bug ``3438b72c-6244-4f1d-8722-8c8d41484e35`` which is +located in bug directory ``bea86499-824e-4e77-b085-2d581fa9ccab``. +This is a bit of a mouthful, so you can truncate each UUID so long as +it remains unique. For example:: + + bea/343 + +If there were two bugs ``3438...`` and ``343a...`` in ``bea``, you'd +have to use:: + + bea/3438 + +BE will only truncate each UUID down to three characters to slightly +future-proof the short user ids. However, if you want to save keystrokes +and you *know* there is only one bug directory, feel free to truncate +all the way to zero characters:: + + /3438 + +Cross references +================ + +To refer to other bug-directories/bugs/comments from bug comments, simply +enclose the ID in pound signs (``#``). BE will automatically expand the +truncations to the full UUIDs before storing the comment, and the reference +will be appropriately truncated (and hyperlinked, if possible) when the +comment is displayed. + +Scope +===== + +Although bug and comment IDs always appear in compound references, +UUIDs at each level are globally unique. For example, comment +``bea/343/ba96f1c0-ba48-4df8-aaf0-4e3a3144fc46`` will *only* appear +under ``bea/343``. The prefix (``bea/343``) allows BE to reduce +caching global comment-lookup tables and enables easy error messages +("I couldn't find ``bea/343/ba9`` because I don't know where the +``bea`` bug directory is located"). +""" + +import os.path +import re + +import libbe + +if libbe.TESTING == True: + import doctest + import sys + import unittest + +try: + from uuid import uuid4 # Python >= 2.5 + def uuid_gen(): + id = uuid4() + idstr = id.urn + start = "urn:uuid:" + assert idstr.startswith(start) + return idstr[len(start):] +except ImportError: + import os + import sys + from subprocess import Popen, PIPE + + def uuid_gen(): + # Shell-out to system uuidgen + args = ['uuidgen', 'r'] + try: + if sys.platform != "win32": + q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) + else: + # win32 don't have os.execvp() so have to run command in a shell + q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, + shell=True, cwd=cwd) + except OSError, e : + strerror = "%s\nwhile executing %s" % (e.args[1], args) + raise OSError, strerror + output, error = q.communicate() + status = q.wait() + if status != 0: + strerror = "%s\nwhile executing %s" % (status, args) + raise Exception, strerror + return output.rstrip('\n') + + +HIERARCHY = ['bugdir', 'bug', 'comment'] +"""Keep track of the object type hierarchy. +""" + +class MultipleIDMatches (ValueError): + """Multiple IDs match the given user ID. + + Parameters + ---------- + id : str + The not-specific-enough truncated UUID. + common : str + The initial characters common to all matching UUIDs. + matches : list of str + The list of possibly matching UUIDs. + """ + def __init__(self, id, common, matches): + msg = ('More than one id matches %s. ' + 'Please be more specific (%s*).\n%s' % (id, common, matches)) + ValueError.__init__(self, msg) + self.id = id + self.common = common + self.matches = matches + +class NoIDMatches (KeyError): + """No IDs match the given user ID. + + Parameters + ---------- + id : str + The not-matching, possibly truncated UUID. + possible_ids : list of str + The list of potential UUIDs at that level. + msg : str, optional + A helpful message explaining what went wrong. + """ + def __init__(self, id, possible_ids, msg=None): + KeyError.__init__(self, id) + self.id = id + self.possible_ids = possible_ids + self.msg = msg + def __str__(self): + if self.msg == None: + return 'No id matches %s.\n%s' % (self.id, self.possible_ids) + return self.msg + +class InvalidIDStructure (KeyError): + """A purported ID does not have the appropriate syntax. + + Parameters + ---------- + id : str + The purported ID. + msg : str, optional + A helpful message explaining what went wrong. + """ + def __init__(self, id, msg=None): + KeyError.__init__(self, id) + self.id = id + self.msg = msg + def __str__(self): + if self.msg == None: + return 'Invalid id structure "%s"' % self.id + return self.msg + +def _assemble(args, check_length=False): + """Join a bunch of level UUIDs into a single ID. + + See Also + -------- + _split : inverse + """ + args = list(args) + for i,arg in enumerate(args): + if arg == None: + args[i] = '' + id = '/'.join(args) + if check_length == True: + assert len(args) > 0, args + if len(args) > len(HIERARCHY): + raise InvalidIDStructure( + id, '%d > %d levels in "%s"' % (len(args), len(HIERARCHY), id)) + return id + +def _split(id, check_length=False): + """Split an ID into a list of level UUIDs. + + See Also + -------- + _assemble : inverse + """ + args = id.split('/') + for i,arg in enumerate(args): + if arg == '': + args[i] = None + if check_length == True: + assert len(args) > 0, args + if len(args) > len(HIERARCHY): + raise InvalidIDStructure( + id, '%d > %d levels in "%s"' % (len(args), len(HIERARCHY), id)) + return args + +def _truncate(uuid, other_uuids, min_length=3): + """Truncate a UUID to the shortest length >= `min_length` such that it + is *not* a truncated form of a UUID in `other_uuids`. + + Parameters + ---------- + uuid : str + The UUID to truncate. + other_uuids : list of str + The other UUIDs which the truncation *might* (but doesn't) refer + to. + min_length : int + Avoid rapidly outdated truncations, even if they are unique now. + + See Also + -------- + _expand : inverse + """ + chars = min_length + for id in other_uuids: + if id == uuid: + continue + while (id[:chars] == uuid[:chars]): + chars+=1 + return uuid[:chars] + +def _expand(truncated_id, common, other_ids): + """Expand a truncated UUID. + + Parameters + ---------- + truncated_id : str + The ID to expand. + common : str + The common portion `truncated_id` shares with the UUIDs in + `other_ids`. Not used by ``_expand``, but passed on to the + matching exceptions if they occur. + other_uuids : list of str + The other UUIDs which the truncation *might* (but doesn't) refer + to. + + Raises + ------ + NoIDMatches + MultipleIDMatches + + See Also + -------- + _expand : inverse + """ + other_ids = list(other_ids) + if len(other_ids) == 0: + raise NoIDMatches(truncated_id, other_ids) + if truncated_id == None: + if len(other_ids) == 1: + return other_ids[0] + raise MultipleIDMatches(truncated_id, common, other_ids) + matches = [] + other_ids = list(other_ids) + for id in other_ids: + if id.startswith(truncated_id): + if id == truncated_id: + return id + matches.append(id) + if len(matches) > 1: + raise MultipleIDMatches(truncated_id, common, matches) + if len(matches) == 0: + raise NoIDMatches(truncated_id, other_ids) + return matches[0] + + +class ID (object): + """Store an object ID and produce various representations. + + Parameters + ---------- + object : :class:`~libbe.bugdir.BugDir` or :class:`~libbe.bug.Bug` or :class:`~libbe.comment.Comment` + The object that the ID applies to. + type : 'bugdir' or 'bug' or 'comment' + The type of the object. + + Notes + ----- + + IDs have several formats specialized for different uses. + + In storage, all objects are represented by their uuid alone, + because that is the simplest globally unique identifier. You can + generate ids of this sort with the .storage() method. Because an + object's storage may be distributed across several chunks, and the + chunks may not have their own uuid, we generate chunk ids by + prepending the objects uuid to the chunk name. The user id types + do not support this chunk extension feature. + + For users, the full uuids are a bit overwhelming, so we truncate + them while retaining local uniqueness (with regards to the other + objects currently in storage). We also prepend truncated parent + ids for two reasons: + + 1. So that a user can locate the repository containing the + referenced object. It would be hard to find bug ``XYZ`` if + that's all you knew. Much easier with ``ABC/XYZ``, where + ``ABC`` is the bugdir. Each project can publish a list of + bugdir-id-to-location mappings, e.g.:: + + ABC...(full uuid)...DEF https://server.com/projectX/be/ + + which is easier than publishing all-object-ids-to-location + mappings. + + 2. Because it's easier to generate and parse truncated ids if you + don't have to fetch all the ids in the storage repository but + can restrict yourself to a specific branch. + + You can generate ids of this sort with the :meth:`user` method, + although in order to preform the truncation, your object (and its + parents must define a `sibling_uuids` method. + + While users can use the convenient short user ids in the short + term, the truncation will inevitably lead to name collision. To + avoid that, we provide a non-truncated form of the short user ids + via the :meth:`long_user` method. These long user ids should be + converted to short user ids by intelligent user interfaces. + + See Also + -------- + parse_user : get uuids back out of the user ids. + short_to_long_user : convert a single short user id to a long user id. + long_to_short_user : convert a single long user id to a short user id. + short_to_long_text : scan text for user ids & convert to long user ids. + long_to_short_text : scan text for long user ids & convert to short user ids. + """ + def __init__(self, object, type): + self._object = object + self._type = type + assert self._type in HIERARCHY, self._type + + def storage(self, *args): + return _assemble([self._object.uuid]+list(args)) + + def _ancestors(self): + ret = [self._object] + index = HIERARCHY.index(self._type) + if index == 0: + return ret + o = self._object + for i in range(index, 0, -1): + parent_name = HIERARCHY[i-1] + o = getattr(o, parent_name, None) + ret.insert(0, o) + return ret + + def long_user(self): + return _assemble([o.uuid for o in self._ancestors()], + check_length=True) + + def user(self): + ids = [] + for o in self._ancestors(): + if o == None: + ids.append(None) + else: + ids.append(_truncate(o.uuid, o.sibling_uuids())) + return _assemble(ids, check_length=True) + +def child_uuids(child_storage_ids): + """Extract uuid children from other children generated by + :meth:`ID.storage`. + + This is useful for separating data belonging to a particular + object directly from entries for its child objects. Since the + :class:`~libbe.storage.base.Storage` backend doesn't distinguish + between the two. + + Examples + -------- + + >>> list(child_uuids(['abc123/values', '123abc', '123def'])) + ['123abc', '123def'] + """ + for id in child_storage_ids: + fields = _split(id) + if len(fields) == 1: + yield fields[0] + +def long_to_short_user(bugdirs, id): + """Convert a long user ID to a short user ID (see :class:`ID`). + The list of bugdirs allows uniqueness-maintaining truncation of + the bugdir portion of the ID. + + See Also + -------- + short_to_long_user : inverse + long_to_short_text : conversion on a block of text + """ + ids = _split(id, check_length=True) + matching_bugdirs = [bd for bd in bugdirs if bd.uuid == ids[0]] + if len(matching_bugdirs) == 0: + raise NoIDMatches(id, [bd.uuid for bd in bugdirs]) + elif len(matching_bugdirs) > 1: + raise MultipleIDMatches(id, '', [bd.uuid for bd in bugdirs]) + bugdir = matching_bugdirs[0] + objects = [bugdir] + if len(ids) >= 2: + bug = bugdir.bug_from_uuid(ids[1]) + objects.append(bug) + if len(ids) >= 3: + comment = bug.comment_from_uuid(ids[2]) + objects.append(comment) + for i,obj in enumerate(objects): + ids[i] = _truncate(ids[i], obj.sibling_uuids()) + return _assemble(ids) + +def short_to_long_user(bugdirs, id): + """Convert a short user ID to a long user ID (see :class:`ID`). The + list of bugdirs allows uniqueness-checking during expansion of the + bugdir portion of the ID. + + See Also + -------- + long_to_short_user : inverse + short_to_long_text : conversion on a block of text + """ + ids = _split(id, check_length=True) + ids[0] = _expand(ids[0], common=None, + other_ids=[bd.uuid for bd in bugdirs]) + if len(ids) == 1: + return _assemble(ids) + bugdir = [bd for bd in bugdirs if bd.uuid == ids[0]][0] + ids[1] = _expand(ids[1], common=bugdir.id.user(), + other_ids=bugdir.uuids()) + if len(ids) == 2: + return _assemble(ids) + bug = bugdir.bug_from_uuid(ids[1]) + ids[2] = _expand(ids[2], common=bug.id.user(), + other_ids=bug.uuids()) + return _assemble(ids) + + +REGEXP = '#([-a-f0-9]*)(/[-a-g0-9]*)?(/[-a-g0-9]*)?#' +"""Regular expression for matching IDs (both short and long) in text. +""" + +class IDreplacer (object): + """Helper class for ID replacement in text. + + Reassembles the match elements from :data:`REGEXP` matching + into the original ID, for easier replacement. + + See Also + -------- + short_to_long_text, long_to_short_text + """ + def __init__(self, bugdirs, replace_fn, wrap=True): + self.bugdirs = bugdirs + self.replace_fn = replace_fn + self.wrap = wrap + def __call__(self, match): + ids = [] + for m in match.groups(): + if m == None: + m = '' + ids.append(m) + replacement = self.replace_fn(self.bugdirs, ''.join(ids)) + if self.wrap == True: + return '#%s#' % replacement + return replacement + +def short_to_long_text(bugdirs, text): + """Convert short user IDs to long user IDs in text (see :class:`ID`). + The list of bugdirs allows uniqueness-checking during expansion of + the bugdir portion of the ID. + + See Also + -------- + short_to_long_user : conversion on a single ID + long_to_short_text : inverse + """ + return re.sub(REGEXP, IDreplacer(bugdirs, short_to_long_user), text) + +def long_to_short_text(bugdirs, text): + """Convert long user IDs to short user IDs in text (see :class:`ID`). + The list of bugdirs allows uniqueness-maintaining truncation of + the bugdir portion of the ID. + + See Also + -------- + long_to_short_user : conversion on a single ID + short_to_long_text : inverse + """ + return re.sub(REGEXP, IDreplacer(bugdirs, long_to_short_user), text) + +def residual(base, fragment): + """Split the short ID `fragment` into a portion corresponding + to `base`, and a portion inside `base`. + + Examples + -------- + + >>> residual('ABC/DEF/', '//GHI') + ('//', 'GHI') + >>> residual('ABC/DEF/', '/D/GHI') + ('/D/', 'GHI') + >>> residual('ABC/DEF', 'A/D/GHI') + ('A/D/', 'GHI') + >>> residual('ABC/DEF', 'A/D/GHI/JKL') + ('A/D/', 'GHI/JKL') + """ + base = base.rstrip('/') + '/' + ids = fragment.split('/') + base_count = base.count('/') + root_ids = ids[:base_count] + [''] + residual_ids = ids[base_count:] + return ('/'.join(root_ids), '/'.join(residual_ids)) + +def _parse_user(id): + """Parse a user ID (see :class:`ID`), returning a dict of parsed + information. + + The returned dict will contain a value for "type" (from + :data:`HIERARCHY`) and values for the levels that are defined. + + Examples + -------- + + >>> _parse_user('ABC/DEF/GHI') == \\ + ... {'bugdir':'ABC', 'bug':'DEF', 'comment':'GHI', 'type':'comment'} + True + >>> _parse_user('ABC/DEF') == \\ + ... {'bugdir':'ABC', 'bug':'DEF', 'type':'bug'} + True + >>> _parse_user('ABC') == \\ + ... {'bugdir':'ABC', 'type':'bugdir'} + True + >>> _parse_user('') == \\ + ... {'bugdir':None, 'type':'bugdir'} + True + >>> _parse_user('/') == \\ + ... {'bugdir':None, 'bug':None, 'type':'bug'} + True + >>> _parse_user('/DEF/') == \\ + ... {'bugdir':None, 'bug':'DEF', 'comment':None, 'type':'comment'} + True + >>> _parse_user('a/b/c/d') + Traceback (most recent call last): + ... + InvalidIDStructure: 4 > 3 levels in "a/b/c/d" + """ + ret = {} + args = _split(id, check_length=True) + for i,(type,arg) in enumerate(zip(HIERARCHY, args)): + if arg != None and len(arg) == 0: + raise InvalidIDStructure( + id, 'Invalid %s part %d "%s" of id "%s"' % (type, i, arg, id)) + ret['type'] = type + ret[type] = arg + return ret + +def parse_user(bugdir, id): + """Parse a user ID (see :class:`ID`), returning a dict of parsed + information. + + The returned dict will contain a value for "type" (from + :data:`HIERARCHY`) and values for the levels that are defined. + + Notes + ----- + This function tries to expand IDs before parsing, so it can handle + both short and long IDs successfully. + """ + long_id = short_to_long_user([bugdir], id) + return _parse_user(long_id) + +if libbe.TESTING == True: + class UUIDtestCase(unittest.TestCase): + def testUUID_gen(self): + id = uuid_gen() + self.failUnless(len(id) == 36, 'invalid UUID "%s"' % id) + + class DummyObject (object): + def __init__(self, uuid, parent=None, siblings=[]): + self.uuid = uuid + self._siblings = siblings + if parent == None: + type_i = 0 + else: + assert parent.type in HIERARCHY, parent + setattr(self, parent.type, parent) + type_i = HIERARCHY.index(parent.type) + 1 + self.type = HIERARCHY[type_i] + self.id = ID(self, self.type) + def sibling_uuids(self): + return self._siblings + + class IDtestCase(unittest.TestCase): + def setUp(self): + self.bugdir = DummyObject('1234abcd') + self.bug = DummyObject('abcdef', self.bugdir, ['a1234', 'ab9876']) + self.comment = DummyObject('12345678', self.bug, ['1234abcd', '1234cdef']) + self.bd_id = self.bugdir.id + self.b_id = self.bug.id + self.c_id = self.comment.id + def test_storage(self): + self.failUnless(self.bd_id.storage() == self.bugdir.uuid, + self.bd_id.storage()) + self.failUnless(self.b_id.storage() == self.bug.uuid, + self.b_id.storage()) + self.failUnless(self.c_id.storage() == self.comment.uuid, + self.c_id.storage()) + self.failUnless(self.bd_id.storage('x', 'y', 'z') == \ + '1234abcd/x/y/z', + self.bd_id.storage('x', 'y', 'z')) + def test_long_user(self): + self.failUnless(self.bd_id.long_user() == self.bugdir.uuid, + self.bd_id.long_user()) + self.failUnless(self.b_id.long_user() == \ + '/'.join([self.bugdir.uuid, self.bug.uuid]), + self.b_id.long_user()) + self.failUnless(self.c_id.long_user() == + '/'.join([self.bugdir.uuid, self.bug.uuid, + self.comment.uuid]), + self.c_id.long_user) + def test_user(self): + self.failUnless(self.bd_id.user() == '123', + self.bd_id.user()) + self.failUnless(self.b_id.user() == '123/abc', + self.b_id.user()) + self.failUnless(self.c_id.user() == '123/abc/12345', + self.c_id.user()) + + class ShortLongParseTestCase(unittest.TestCase): + def setUp(self): + self.bugdir = DummyObject('1234abcd') + self.bug = DummyObject('abcdef', self.bugdir, ['a1234', 'ab9876']) + self.comment = DummyObject('12345678', self.bug, ['1234abcd', '1234cdef']) + self.bd_id = self.bugdir.id + self.b_id = self.bug.id + self.c_id = self.comment.id + self.bugdir.bug_from_uuid = lambda uuid: self.bug + self.bugdir.uuids = lambda : self.bug.sibling_uuids() + [self.bug.uuid] + self.bug.comment_from_uuid = lambda uuid: self.comment + self.bug.uuids = lambda : self.comment.sibling_uuids() + [self.comment.uuid] + self.short = 'bla bla #123/abc# bla bla #123/abc/12345# bla bla' + self.long = 'bla bla #1234abcd/abcdef# bla bla #1234abcd/abcdef/12345678# bla bla' + self.short_id_parse_pairs = [ + ('', {'bugdir':'1234abcd', 'type':'bugdir'}), + ('123/abc', {'bugdir':'1234abcd', 'bug':'abcdef', + 'type':'bug'}), + ('123/abc/12345', {'bugdir':'1234abcd', 'bug':'abcdef', + 'comment':'12345678', 'type':'comment'}), + ] + self.short_id_exception_pairs = [ + ('z', NoIDMatches('z', ['1234abcd'])), + ('///', InvalidIDStructure( + '///', msg='4 > 3 levels in "///"')), + ('/', MultipleIDMatches( + None, '123', ['a1234', 'ab9876', 'abcdef'])), + ('123/', MultipleIDMatches( + None, '123', ['a1234', 'ab9876', 'abcdef'])), + ('123/abc/', MultipleIDMatches( + None, '123/abc', ['1234abcd','1234cdef','12345678'])), + ] + def test_short_to_long_text(self): + self.failUnless(short_to_long_text([self.bugdir], self.short) == self.long, + '\n' + self.short + '\n' + short_to_long_text([self.bugdir], self.short) + '\n' + self.long) + def test_long_to_short_text(self): + self.failUnless(long_to_short_text([self.bugdir], self.long) == self.short, + '\n' + long_to_short_text([self.bugdir], self.long) + '\n' + self.short) + def test_parse_user(self): + for short_id,parsed in self.short_id_parse_pairs: + ret = parse_user(self.bugdir, short_id) + self.failUnless(ret == parsed, + 'got %s\nexpected %s' % (ret, parsed)) + def test_parse_user_exceptions(self): + for short_id,exception in self.short_id_exception_pairs: + try: + ret = parse_user(self.bugdir, short_id) + self.fail('Expected parse_user(bugdir, "%s") to raise %s,' + '\n but it returned %s' + % (short_id, exception.__class__.__name__, ret)) + except exception.__class__, e: + for attr in dir(e): + if attr.startswith('_') or attr == 'args': + continue + value = getattr(e, attr) + expected = getattr(exception, attr) + self.failUnless( + value == expected, + 'Expected parse_user(bugdir, "%s") %s.%s' + '\n to be %s, but it is %s\n\n%s' + % (short_id, exception.__class__.__name__, + attr, expected, value, e)) + + unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/util/plugin.py b/libbe/util/plugin.py new file mode 100644 index 0000000..e598c34 --- /dev/null +++ b/libbe/util/plugin.py @@ -0,0 +1,67 @@ +# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. +# Gianluca Montecchi <gian@grys.it> +# Marien Zwart <marienz@gentoo.org> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Allow simple listing and loading of the various becommands and libbe +submodules (i.e. "plugins"). +""" + +import os +import os.path +import sys + + +_PLUGIN_PATH = os.path.realpath( + os.path.dirname( + os.path.dirname( + os.path.dirname(__file__)))) +if _PLUGIN_PATH not in sys.path: + sys.path.append(_PLUGIN_PATH) + +def import_by_name(modname): + """ + >>> mod = import_by_name('libbe.bugdir') + >>> 'BugDir' in dir(mod) + True + >>> import_by_name('libbe.highly_unlikely') + Traceback (most recent call last): + ... + ImportError: No module named highly_unlikely + """ + module = __import__(modname) + components = modname.split('.') + for comp in components[1:]: + module = getattr(module, comp) + return module + +def modnames(prefix): + """ + >>> 'list' in [n for n in modnames('libbe.command')] + True + >>> 'plugin' in [n for n in modnames('libbe.util')] + True + """ + components = prefix.split('.') + modfiles = os.listdir(os.path.join(_PLUGIN_PATH, *components)) + modfiles.sort() + for modfile in modfiles: + if modfile.startswith('.'): + continue # the occasional emacs temporary file + if modfile.endswith('.py') and modfile != '__init__.py': + yield modfile[:-3] diff --git a/libbe/util/subproc.py b/libbe/util/subproc.py new file mode 100644 index 0000000..b02b8e8 --- /dev/null +++ b/libbe/util/subproc.py @@ -0,0 +1,223 @@ +# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Functions for running external commands in subprocesses. +""" + +from subprocess import Popen, PIPE +import sys + +import libbe +from encoding import get_encoding +if libbe.TESTING == True: + import doctest + +_MSWINDOWS = sys.platform == 'win32' +_POSIX = not _MSWINDOWS + +if _POSIX == True: + import os + import select + +class CommandError(Exception): + def __init__(self, command, status, stdout=None, stderr=None): + strerror = ['Command failed (%d):\n %s\n' % (status, stderr), + 'while executing\n %s' % str(command)] + Exception.__init__(self, '\n'.join(strerror)) + self.command = command + self.status = status + self.stdout = stdout + self.stderr = stderr + +def invoke(args, stdin=None, stdout=PIPE, stderr=PIPE, expect=(0,), + cwd=None, unicode_output=True, verbose=False, encoding=None): + """ + expect should be a tuple of allowed exit codes. cwd should be + the directory from which the command will be executed. When + unicode_output == True, convert stdout and stdin strings to + unicode before returing them. + """ + if cwd == None: + cwd = '.' + if verbose == True: + print >> sys.stderr, '%s$ %s' % (cwd, ' '.join(args)) + try : + if _POSIX: + q = Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr, cwd=cwd) + else: + assert _MSWINDOWS==True, 'invalid platform' + # win32 don't have os.execvp() so have to run command in a shell + q = Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr, + shell=True, cwd=cwd) + except OSError, e: + raise CommandError(args, status=e.args[0], stderr=e) + stdout,stderr = q.communicate(input=stdin) + status = q.wait() + if unicode_output == True: + if encoding == None: + encoding = get_encoding() + if stdout != None: + stdout = unicode(stdout, encoding) + if stderr != None: + stderr = unicode(stderr, encoding) + if verbose == True: + print >> sys.stderr, '%d\n%s%s' % (status, stdout, stderr) + if status not in expect: + raise CommandError(args, status, stdout, stderr) + return status, stdout, stderr + +class Pipe (object): + """ + Simple interface for executing POSIX-style pipes based on the + subprocess module. The only complication is the adaptation of + subprocess.Popen._comminucate to listen to the stderrs of all + processes involved in the pipe, as well as the terminal process' + stdout. There are two implementations of Pipe._communicate, one + for MS Windows, and one for POSIX systems. The MS Windows + implementation is currently untested. + + >>> p = Pipe([['find', '/etc/'], ['grep', '^/etc/ssh$']]) + >>> p.stdout + '/etc/ssh\\n' + >>> p.status + 1 + >>> p.statuses + [1, 0] + >>> p.stderrs # doctest: +ELLIPSIS + [...find: ...: Permission denied..., ''] + """ + def __init__(self, cmds, stdin=None): + # spawn processes + self._procs = [] + for cmd in cmds: + if len(self._procs) != 0: + stdin = self._procs[-1].stdout + self._procs.append(Popen(cmd, stdin=stdin, stdout=PIPE, stderr=PIPE)) + + self.stdout,self.stderrs = self._communicate(input=None) + + # collect process statuses + self.statuses = [] + self.status = 0 + for proc in self._procs: + self.statuses.append(proc.wait()) + if self.statuses[-1] != 0: + self.status = self.statuses[-1] + + # Code excerpted from subprocess.Popen._communicate() + if _MSWINDOWS == True: + def _communicate(self, input=None): + assert input == None, 'stdin != None not yet supported' + # listen to each process' stderr + threads = [] + std_X_arrays = [] + for proc in self._procs: + stderr_array = [] + thread = Thread(target=proc._readerthread, + args=(proc.stderr, stderr_array)) + thread.setDaemon(True) + thread.start() + threads.append(thread) + std_X_arrays.append(stderr_array) + + # also listen to the last processes stdout + stdout_array = [] + thread = Thread(target=proc._readerthread, + args=(proc.stdout, stdout_array)) + thread.setDaemon(True) + thread.start() + threads.append(thread) + std_X_arrays.append(stdout_array) + + # join threads as they die + for thread in threads: + thread.join() + + # read output from reader threads + std_X_strings = [] + for std_X_array in std_X_arrays: + std_X_strings.append(std_X_array[0]) + + stdout = std_X_strings.pop(-1) + stderrs = std_X_strings + return (stdout, stderrs) + else: + assert _POSIX==True, 'invalid platform' + def _communicate(self, input=None): + read_set = [] + write_set = [] + read_arrays = [] + stdout = None # Return + stderr = None # Return + + if self._procs[0].stdin: + # Flush stdio buffer. This might block, if the user has + # been writing to .stdin in an uncontrolled fashion. + self._procs[0].stdin.flush() + if input: + write_set.append(self._procs[0].stdin) + else: + self._procs[0].stdin.close() + for proc in self._procs: + read_set.append(proc.stderr) + read_arrays.append([]) + read_set.append(self._procs[-1].stdout) + read_arrays.append([]) + + input_offset = 0 + while read_set or write_set: + try: + rlist, wlist, xlist = select.select(read_set, write_set, []) + except select.error, e: + if e.args[0] == errno.EINTR: + continue + raise + if self._procs[0].stdin in wlist: + # When select has indicated that the file is writable, + # we can write up to PIPE_BUF bytes without risk + # blocking. POSIX defines PIPE_BUF >= 512 + chunk = input[input_offset : input_offset + 512] + bytes_written = os.write(self.stdin.fileno(), chunk) + input_offset += bytes_written + if input_offset >= len(input): + self._procs[0].stdin.close() + write_set.remove(self._procs[0].stdin) + if self._procs[-1].stdout in rlist: + data = os.read(self._procs[-1].stdout.fileno(), 1024) + if data == '': + self._procs[-1].stdout.close() + read_set.remove(self._procs[-1].stdout) + read_arrays[-1].append(data) + for i,proc in enumerate(self._procs): + if proc.stderr in rlist: + data = os.read(proc.stderr.fileno(), 1024) + if data == '': + proc.stderr.close() + read_set.remove(proc.stderr) + read_arrays[i].append(data) + + # All data exchanged. Translate lists into strings. + read_strings = [] + for read_array in read_arrays: + read_strings.append(''.join(read_array)) + + stdout = read_strings.pop(-1) + stderrs = read_strings + return (stdout, stderrs) + +if libbe.TESTING == True: + suite = doctest.DocTestSuite() diff --git a/libbe/util/tree.py b/libbe/util/tree.py new file mode 100644 index 0000000..812b0bd --- /dev/null +++ b/libbe/util/tree.py @@ -0,0 +1,258 @@ +# Bugs Everywhere, a distributed bugtracker +# Copyright (C) 2008-2010 Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Define :class:`Tree`, a traversable tree structure. +""" + +import libbe +if libbe.TESTING == True: + import doctest + +class Tree(list): + """A traversable tree structure. + + Examples + -------- + + Construct:: + + +-b---d-g + a-+ +-e + +-c-+-f-h-i + + with + + >>> i = Tree(); i.n = "i" + >>> h = Tree([i]); h.n = "h" + >>> f = Tree([h]); f.n = "f" + >>> e = Tree(); e.n = "e" + >>> c = Tree([f,e]); c.n = "c" + >>> g = Tree(); g.n = "g" + >>> d = Tree([g]); d.n = "d" + >>> b = Tree([d]); b.n = "b" + >>> a = Tree(); a.n = "a" + >>> a.append(c) + >>> a.append(b) + + Get the longest branch length with + + >>> a.branch_len() + 5 + + Sort the tree recursively. Here we sort longest branch length + first. + + >>> a.sort(key=lambda node : -node.branch_len()) + >>> "".join([node.n for node in a.traverse()]) + 'acfhiebdg' + + And here we sort shortest branch length first. + + >>> a.sort(key=lambda node : node.branch_len()) + >>> "".join([node.n for node in a.traverse()]) + 'abdgcefhi' + + We can also do breadth-first traverses. + + >>> "".join([node.n for node in a.traverse(depth_first=False)]) + 'abcdefghi' + + Serialize the tree with depth marking branches. + + >>> for depth,node in a.thread(): + ... print "%*s" % (2*depth+1, node.n) + a + b + d + g + c + e + f + h + i + + Flattening the thread disables depth increases except at + branch splits. + + >>> for depth,node in a.thread(flatten=True): + ... print "%*s" % (2*depth+1, node.n) + a + b + d + g + c + e + f + h + i + + We can also check if a node is contained in a tree. + + >>> a.has_descendant(g) + True + >>> c.has_descendant(g) + False + >>> a.has_descendant(a) + False + >>> a.has_descendant(a, match_self=True) + True + """ + def __cmp__(self, other): + return cmp(id(self), id(other)) + + def __eq__(self, other): + return self.__cmp__(other) == 0 + + def __ne__(self, other): + return self.__cmp__(other) != 0 + + def branch_len(self): + """Return the largest number of nodes from root to leaf (inclusive). + + For the tree:: + + +-b---d-g + a-+ +-e + +-c-+-f-h-i + + this method returns 5. + + Notes + ----- + Exhaustive search every time == *slow*. + + Use only on small trees, or reimplement by overriding + child-addition methods to allow accurate caching. + """ + if len(self) == 0: + return 1 + else: + return 1 + max([child.branch_len() for child in self]) + + def sort(self, *args, **kwargs): + """Sort the tree recursively. + + This method extends :meth:`list.sort` to Trees. + + Notes + ----- + This method can be slow, e.g. on a :meth:`branch_len` sort, + since a node at depth `N` from the root has it's + :meth:`branch_len` method called `N` times. + """ + list.sort(self, *args, **kwargs) + for child in self: + child.sort(*args, **kwargs) + + def traverse(self, depth_first=True): + """Generate all the nodes in a tree, starting with the root node. + + Parameters + ---------- + depth_first : bool + Depth first by default, but you can set `depth_first` to + `False` for breadth first ordering. Siblings are returned + in the order they are stored, so you might want to + :meth:`sort` your tree first. + """ + if depth_first == True: + yield self + for child in self: + for descendant in child.traverse(): + yield descendant + else: # breadth first, Wikipedia algorithm + # http://en.wikipedia.org/wiki/Breadth-first_search + queue = [self] + while len(queue) > 0: + node = queue.pop(0) + yield node + queue.extend(node) + + def thread(self, flatten=False): + """Generate a (depth, node) tuple for every node in the tree. + + When `flatten` is `False`, the depth of any node is one + greater than the depth of its parent. That way the + inheritance is explicit, but you can end up with highly + indented threads. + + When `flatten` is `True`, the depth of any node is only + greater than the depth of its parent when there is a branch, + and the node is not the last child. This can lead to ancestry + ambiguity, but keeps the total indentation down. For example:: + + +-b +-b-c + a-+-c and a-+ + +-d-e-f +-d-e-f + + would both produce (after sorting by :meth:`branch_len`):: + + (0, a) + (1, b) + (1, c) + (0, d) + (0, e) + (0, f) + + """ + stack = [] # ancestry of the current node + if flatten == True: + depthDict = {} + + for node in self.traverse(depth_first=True): + while len(stack) > 0 \ + and id(node) not in [id(c) for c in stack[-1]]: + stack.pop(-1) + if flatten == False: + depth = len(stack) + else: + if len(stack) == 0: + depth = 0 + else: + parent = stack[-1] + depth = depthDict[id(parent)] + if len(parent) > 1 and node != parent[-1]: + depth += 1 + depthDict[id(node)] = depth + yield (depth,node) + stack.append(node) + + def has_descendant(self, descendant, depth_first=True, match_self=False): + """Check if a node is contained in a tree. + + Parameters + ---------- + descendant : Tree + The potential descendant. + depth_first : bool + The search order. Set this if you feel depth/breadth would + be a faster search. + match_self : bool + Set to `True` for:: + + x.has_descendant(x, match_self=True) -> True + """ + if descendant == self: + return match_self + for d in self.traverse(depth_first): + if descendant == d: + return True + return False + +if libbe.TESTING == True: + suite = doctest.DocTestSuite() diff --git a/libbe/util/utility.py b/libbe/util/utility.py new file mode 100644 index 0000000..92ca0d5 --- /dev/null +++ b/libbe/util/utility.py @@ -0,0 +1,248 @@ +# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc. +# Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Assorted utility functions that don't fit in anywhere else. +""" + +import calendar +import codecs +import os +import shutil +import tempfile +import time +import types + +import libbe +if libbe.TESTING == True: + import doctest + +class InvalidXML(ValueError): + """Invalid XML while parsing for a `*.from_xml()` method. + + Parameters + ---------- + type : str + String identifying `*`, e.g. "bug", "comment", ... + element : :class:`ElementTree.Element` + ElementTree.Element instance which caused the error. + error : str + Error description. + """ + def __init__(self, type, element, error): + msg = 'Invalid %s xml: %s\n %s\n' \ + % (type, error, ElementTree.tostring(element)) + ValueError.__init__(self, msg) + self.type = type + self.element = element + self.error = error + +def search_parent_directories(path, filename): + """ + Find the file (or directory) named filename in path or in any + of path's parents. For example:: + + search_parent_directories("/a/b/c", ".be") + + will return the path to the first existing file from:: + + /a/b/c/.be + /a/b/.be + /a/.be + /.be + + or `None` if none of those files exist. + """ + path = os.path.realpath(path) + assert os.path.exists(path) + old_path = None + while True: + check_path = os.path.join(path, filename) + if os.path.exists(check_path): + return check_path + if path == old_path: + return None + old_path = path + path = os.path.dirname(path) + +class Dir (object): + """A temporary directory for testing use. + + Make sure you run :meth:`cleanup` after you're done using the + directory. + """ + def __init__(self): + self.path = tempfile.mkdtemp(prefix="BEtest") + self.removed = False + def cleanup(self): + if self.removed == False: + shutil.rmtree(self.path) + self.removed = True + def __call__(self): + return self.path + +RFC_2822_TIME_FMT = "%a, %d %b %Y %H:%M:%S +0000" +"""RFC 2822 [#]_ format string for :func:`time.strftime` and +:func:`time.strptime`. + +.. [#] See `RFC 2822`_, sections 3.3 and A.1.1. +.. _RFC 2822: http://www.faqs.org/rfcs/rfc2822.html +""" + +def time_to_str(time_val): + """Convert a time number into an RFC 2822-formatted string. + + Parameters + ---------- + time_val : float + Float seconds since the Epoc, see :func:`time.time`. + Note that while `time_val` may contain sub-second data, + the output string will not. + + Examples + -------- + + >>> time_to_str(0) + 'Thu, 01 Jan 1970 00:00:00 +0000' + + See Also + -------- + str_to_time : inverse + handy_time : localtime string + """ + return time.strftime(RFC_2822_TIME_FMT, time.gmtime(time_val)) + +def str_to_time(str_time): + """Convert an RFC 2822-fomatted string into a time value. + + Parameters + ---------- + str_time : str + An RFC 2822-formatted string. + + Examples + -------- + + >>> str_to_time("Thu, 01 Jan 1970 00:00:00 +0000") + 0 + >>> q = time.time() + >>> str_to_time(time_to_str(q)) == int(q) + True + >>> str_to_time("Thu, 01 Jan 1970 00:00:00 -1000") + 36000 + + See Also + -------- + time_to_str : inverse + """ + timezone_str = str_time[-5:] + if timezone_str != "+0000": + str_time = str_time.replace(timezone_str, "+0000") + time_val = calendar.timegm(time.strptime(str_time, RFC_2822_TIME_FMT)) + timesign = -int(timezone_str[0]+"1") # "+" -> time_val ahead of GMT + timezone_tuple = time.strptime(timezone_str[1:], "%H%M") + timezone = timezone_tuple.tm_hour*3600 + timezone_tuple.tm_min*60 + return time_val + timesign*timezone + +def handy_time(time_val): + """Convert a time number into a useful localtime. + + Where :func:`time_to_str` returns GMT +0000, `handy_time` returns + a string in local time. This may be more accessible for the user. + + Parameters + ---------- + time_val : float + Float seconds since the Epoc, see :func:`time.time`. + """ + return time.strftime("%a, %d %b %Y %H:%M", time.localtime(time_val)) + +def time_to_gmtime(str_time): + """Convert an RFC 2822-fomatted string to a GMT string. + + Parameters + ---------- + str_time : str + An RFC 2822-formatted string. + + Examples + -------- + + >>> time_to_gmtime("Thu, 01 Jan 1970 00:00:00 -1000") + 'Thu, 01 Jan 1970 10:00:00 +0000' + """ + time_val = str_to_time(str_time) + return time_to_str(time_val) + +def iterable_full_of_strings(value, alternative=None): + """Require an iterable full of strings. + + This is useful, for example, in validating `*.extra_strings`. + See :attr:`libbe.bugdir.BugDir.extra_strings` + + Parameters + ---------- + value : list or None + The potential list of strings. + alternative + Allow a default (e.g. `None`), such that:: + + iterable_full_of_strings(value=x, alternative=x) -> True + + Examples + -------- + + >>> iterable_full_of_strings([]) + True + >>> iterable_full_of_strings(["abc", "def", u"hij"]) + True + >>> iterable_full_of_strings(["abc", None, u"hij"]) + False + >>> iterable_full_of_strings(None, alternative=None) + True + """ + if value == alternative: + return True + elif not hasattr(value, '__iter__'): + return False + for x in value: + if type(x) not in types.StringTypes: + return False + return True + +def underlined(string, char='='): + """Produces a version of a string that is underlined. + + Parameters + ---------- + string : str + The string to underline + char : str + The character to use for the underlining. + + Examples + -------- + + >>> underlined("Underlined String") + 'Underlined String\\n=================' + """ + assert len(char) == 0, char + return '%s\n%s' % (string, char*len(string)) + +if libbe.TESTING == True: + suite = doctest.DocTestSuite() |