aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/util
diff options
context:
space:
mode:
Diffstat (limited to 'libbe/util')
-rw-r--r--libbe/util/__init__.py24
-rw-r--r--libbe/util/encoding.py91
-rw-r--r--libbe/util/id.py711
-rw-r--r--libbe/util/plugin.py67
-rw-r--r--libbe/util/subproc.py223
-rw-r--r--libbe/util/tree.py258
-rw-r--r--libbe/util/utility.py248
7 files changed, 1622 insertions, 0 deletions
diff --git a/libbe/util/__init__.py b/libbe/util/__init__.py
new file mode 100644
index 0000000..0f4850f
--- /dev/null
+++ b/libbe/util/__init__.py
@@ -0,0 +1,24 @@
+# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Miscellaneous utilities.
+"""
+
+class InvalidObject (object):
+ """An object that won't come up by accident."""
+ pass
+
diff --git a/libbe/util/encoding.py b/libbe/util/encoding.py
new file mode 100644
index 0000000..8eea438
--- /dev/null
+++ b/libbe/util/encoding.py
@@ -0,0 +1,91 @@
+# Copyright (C) 2008-2010 Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Support input/output/filesystem encodings (e.g. UTF-8).
+"""
+
+import codecs
+import locale
+import sys
+import types
+
+import libbe
+if libbe.TESTING == True:
+ import doctest
+
+
+ENCODING = None # override get_encoding() output by setting this
+
+def get_encoding():
+ """
+ Guess a useful input/output/filesystem encoding... Maybe we need
+ seperate encodings for input/output and filesystem? Hmm...
+ """
+ if ENCODING != None:
+ return ENCODING
+ encoding = locale.getpreferredencoding() or sys.getdefaultencoding()
+ if sys.platform != 'win32' or sys.version_info[:2] > (2, 3):
+ encoding = locale.getlocale(locale.LC_TIME)[1] or encoding
+ # Python 2.3 on windows doesn't know about 'XYZ' alias for 'cpXYZ'
+ return encoding
+
+def get_input_encoding():
+ return get_encoding()
+
+def get_output_encoding():
+ return get_encoding()
+
+def get_filesystem_encoding():
+ return get_encoding()
+
+def known_encoding(encoding):
+ """
+ >>> known_encoding("highly-unlikely-encoding")
+ False
+ >>> known_encoding(get_encoding())
+ True
+ """
+ try:
+ codecs.lookup(encoding)
+ return True
+ except LookupError:
+ return False
+
+def get_file_contents(path, mode='r', encoding=None, decode=False):
+ if decode == True:
+ if encoding == None:
+ encoding = get_filesystem_encoding()
+ f = codecs.open(path, mode, encoding)
+ else:
+ f = open(path, mode)
+ contents = f.read()
+ f.close()
+ return contents
+
+def set_file_contents(path, contents, mode='w', encoding=None):
+ if type(contents) == types.UnicodeType:
+ if encoding == None:
+ encoding = get_filesystem_encoding()
+ f = codecs.open(path, mode, encoding)
+ else:
+ f = open(path, mode)
+ f.write(contents)
+ f.close()
+
+if libbe.TESTING == True:
+ suite = doctest.DocTestSuite()
diff --git a/libbe/util/id.py b/libbe/util/id.py
new file mode 100644
index 0000000..76079e7
--- /dev/null
+++ b/libbe/util/id.py
@@ -0,0 +1,711 @@
+# Copyright (C) 2008-2010 Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""Handle ID creation and parsing.
+
+Format
+======
+
+BE IDs are formatted::
+
+ <bug-directory>[/<bug>[/<comment>]]
+
+where each ``<..>`` is a UUID. For example::
+
+ bea86499-824e-4e77-b085-2d581fa9ccab/3438b72c-6244-4f1d-8722-8c8d41484e35
+
+refers to bug ``3438b72c-6244-4f1d-8722-8c8d41484e35`` which is
+located in bug directory ``bea86499-824e-4e77-b085-2d581fa9ccab``.
+This is a bit of a mouthful, so you can truncate each UUID so long as
+it remains unique. For example::
+
+ bea/343
+
+If there were two bugs ``3438...`` and ``343a...`` in ``bea``, you'd
+have to use::
+
+ bea/3438
+
+BE will only truncate each UUID down to three characters to slightly
+future-proof the short user ids. However, if you want to save keystrokes
+and you *know* there is only one bug directory, feel free to truncate
+all the way to zero characters::
+
+ /3438
+
+Cross references
+================
+
+To refer to other bug-directories/bugs/comments from bug comments, simply
+enclose the ID in pound signs (``#``). BE will automatically expand the
+truncations to the full UUIDs before storing the comment, and the reference
+will be appropriately truncated (and hyperlinked, if possible) when the
+comment is displayed.
+
+Scope
+=====
+
+Although bug and comment IDs always appear in compound references,
+UUIDs at each level are globally unique. For example, comment
+``bea/343/ba96f1c0-ba48-4df8-aaf0-4e3a3144fc46`` will *only* appear
+under ``bea/343``. The prefix (``bea/343``) allows BE to reduce
+caching global comment-lookup tables and enables easy error messages
+("I couldn't find ``bea/343/ba9`` because I don't know where the
+``bea`` bug directory is located").
+"""
+
+import os.path
+import re
+
+import libbe
+
+if libbe.TESTING == True:
+ import doctest
+ import sys
+ import unittest
+
+try:
+ from uuid import uuid4 # Python >= 2.5
+ def uuid_gen():
+ id = uuid4()
+ idstr = id.urn
+ start = "urn:uuid:"
+ assert idstr.startswith(start)
+ return idstr[len(start):]
+except ImportError:
+ import os
+ import sys
+ from subprocess import Popen, PIPE
+
+ def uuid_gen():
+ # Shell-out to system uuidgen
+ args = ['uuidgen', 'r']
+ try:
+ if sys.platform != "win32":
+ q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
+ else:
+ # win32 don't have os.execvp() so have to run command in a shell
+ q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE,
+ shell=True, cwd=cwd)
+ except OSError, e :
+ strerror = "%s\nwhile executing %s" % (e.args[1], args)
+ raise OSError, strerror
+ output, error = q.communicate()
+ status = q.wait()
+ if status != 0:
+ strerror = "%s\nwhile executing %s" % (status, args)
+ raise Exception, strerror
+ return output.rstrip('\n')
+
+
+HIERARCHY = ['bugdir', 'bug', 'comment']
+"""Keep track of the object type hierarchy.
+"""
+
+class MultipleIDMatches (ValueError):
+ """Multiple IDs match the given user ID.
+
+ Parameters
+ ----------
+ id : str
+ The not-specific-enough truncated UUID.
+ common : str
+ The initial characters common to all matching UUIDs.
+ matches : list of str
+ The list of possibly matching UUIDs.
+ """
+ def __init__(self, id, common, matches):
+ msg = ('More than one id matches %s. '
+ 'Please be more specific (%s*).\n%s' % (id, common, matches))
+ ValueError.__init__(self, msg)
+ self.id = id
+ self.common = common
+ self.matches = matches
+
+class NoIDMatches (KeyError):
+ """No IDs match the given user ID.
+
+ Parameters
+ ----------
+ id : str
+ The not-matching, possibly truncated UUID.
+ possible_ids : list of str
+ The list of potential UUIDs at that level.
+ msg : str, optional
+ A helpful message explaining what went wrong.
+ """
+ def __init__(self, id, possible_ids, msg=None):
+ KeyError.__init__(self, id)
+ self.id = id
+ self.possible_ids = possible_ids
+ self.msg = msg
+ def __str__(self):
+ if self.msg == None:
+ return 'No id matches %s.\n%s' % (self.id, self.possible_ids)
+ return self.msg
+
+class InvalidIDStructure (KeyError):
+ """A purported ID does not have the appropriate syntax.
+
+ Parameters
+ ----------
+ id : str
+ The purported ID.
+ msg : str, optional
+ A helpful message explaining what went wrong.
+ """
+ def __init__(self, id, msg=None):
+ KeyError.__init__(self, id)
+ self.id = id
+ self.msg = msg
+ def __str__(self):
+ if self.msg == None:
+ return 'Invalid id structure "%s"' % self.id
+ return self.msg
+
+def _assemble(args, check_length=False):
+ """Join a bunch of level UUIDs into a single ID.
+
+ See Also
+ --------
+ _split : inverse
+ """
+ args = list(args)
+ for i,arg in enumerate(args):
+ if arg == None:
+ args[i] = ''
+ id = '/'.join(args)
+ if check_length == True:
+ assert len(args) > 0, args
+ if len(args) > len(HIERARCHY):
+ raise InvalidIDStructure(
+ id, '%d > %d levels in "%s"' % (len(args), len(HIERARCHY), id))
+ return id
+
+def _split(id, check_length=False):
+ """Split an ID into a list of level UUIDs.
+
+ See Also
+ --------
+ _assemble : inverse
+ """
+ args = id.split('/')
+ for i,arg in enumerate(args):
+ if arg == '':
+ args[i] = None
+ if check_length == True:
+ assert len(args) > 0, args
+ if len(args) > len(HIERARCHY):
+ raise InvalidIDStructure(
+ id, '%d > %d levels in "%s"' % (len(args), len(HIERARCHY), id))
+ return args
+
+def _truncate(uuid, other_uuids, min_length=3):
+ """Truncate a UUID to the shortest length >= `min_length` such that it
+ is *not* a truncated form of a UUID in `other_uuids`.
+
+ Parameters
+ ----------
+ uuid : str
+ The UUID to truncate.
+ other_uuids : list of str
+ The other UUIDs which the truncation *might* (but doesn't) refer
+ to.
+ min_length : int
+ Avoid rapidly outdated truncations, even if they are unique now.
+
+ See Also
+ --------
+ _expand : inverse
+ """
+ chars = min_length
+ for id in other_uuids:
+ if id == uuid:
+ continue
+ while (id[:chars] == uuid[:chars]):
+ chars+=1
+ return uuid[:chars]
+
+def _expand(truncated_id, common, other_ids):
+ """Expand a truncated UUID.
+
+ Parameters
+ ----------
+ truncated_id : str
+ The ID to expand.
+ common : str
+ The common portion `truncated_id` shares with the UUIDs in
+ `other_ids`. Not used by ``_expand``, but passed on to the
+ matching exceptions if they occur.
+ other_uuids : list of str
+ The other UUIDs which the truncation *might* (but doesn't) refer
+ to.
+
+ Raises
+ ------
+ NoIDMatches
+ MultipleIDMatches
+
+ See Also
+ --------
+ _expand : inverse
+ """
+ other_ids = list(other_ids)
+ if len(other_ids) == 0:
+ raise NoIDMatches(truncated_id, other_ids)
+ if truncated_id == None:
+ if len(other_ids) == 1:
+ return other_ids[0]
+ raise MultipleIDMatches(truncated_id, common, other_ids)
+ matches = []
+ other_ids = list(other_ids)
+ for id in other_ids:
+ if id.startswith(truncated_id):
+ if id == truncated_id:
+ return id
+ matches.append(id)
+ if len(matches) > 1:
+ raise MultipleIDMatches(truncated_id, common, matches)
+ if len(matches) == 0:
+ raise NoIDMatches(truncated_id, other_ids)
+ return matches[0]
+
+
+class ID (object):
+ """Store an object ID and produce various representations.
+
+ Parameters
+ ----------
+ object : :class:`~libbe.bugdir.BugDir` or :class:`~libbe.bug.Bug` or :class:`~libbe.comment.Comment`
+ The object that the ID applies to.
+ type : 'bugdir' or 'bug' or 'comment'
+ The type of the object.
+
+ Notes
+ -----
+
+ IDs have several formats specialized for different uses.
+
+ In storage, all objects are represented by their uuid alone,
+ because that is the simplest globally unique identifier. You can
+ generate ids of this sort with the .storage() method. Because an
+ object's storage may be distributed across several chunks, and the
+ chunks may not have their own uuid, we generate chunk ids by
+ prepending the objects uuid to the chunk name. The user id types
+ do not support this chunk extension feature.
+
+ For users, the full uuids are a bit overwhelming, so we truncate
+ them while retaining local uniqueness (with regards to the other
+ objects currently in storage). We also prepend truncated parent
+ ids for two reasons:
+
+ 1. So that a user can locate the repository containing the
+ referenced object. It would be hard to find bug ``XYZ`` if
+ that's all you knew. Much easier with ``ABC/XYZ``, where
+ ``ABC`` is the bugdir. Each project can publish a list of
+ bugdir-id-to-location mappings, e.g.::
+
+ ABC...(full uuid)...DEF https://server.com/projectX/be/
+
+ which is easier than publishing all-object-ids-to-location
+ mappings.
+
+ 2. Because it's easier to generate and parse truncated ids if you
+ don't have to fetch all the ids in the storage repository but
+ can restrict yourself to a specific branch.
+
+ You can generate ids of this sort with the :meth:`user` method,
+ although in order to preform the truncation, your object (and its
+ parents must define a `sibling_uuids` method.
+
+ While users can use the convenient short user ids in the short
+ term, the truncation will inevitably lead to name collision. To
+ avoid that, we provide a non-truncated form of the short user ids
+ via the :meth:`long_user` method. These long user ids should be
+ converted to short user ids by intelligent user interfaces.
+
+ See Also
+ --------
+ parse_user : get uuids back out of the user ids.
+ short_to_long_user : convert a single short user id to a long user id.
+ long_to_short_user : convert a single long user id to a short user id.
+ short_to_long_text : scan text for user ids & convert to long user ids.
+ long_to_short_text : scan text for long user ids & convert to short user ids.
+ """
+ def __init__(self, object, type):
+ self._object = object
+ self._type = type
+ assert self._type in HIERARCHY, self._type
+
+ def storage(self, *args):
+ return _assemble([self._object.uuid]+list(args))
+
+ def _ancestors(self):
+ ret = [self._object]
+ index = HIERARCHY.index(self._type)
+ if index == 0:
+ return ret
+ o = self._object
+ for i in range(index, 0, -1):
+ parent_name = HIERARCHY[i-1]
+ o = getattr(o, parent_name, None)
+ ret.insert(0, o)
+ return ret
+
+ def long_user(self):
+ return _assemble([o.uuid for o in self._ancestors()],
+ check_length=True)
+
+ def user(self):
+ ids = []
+ for o in self._ancestors():
+ if o == None:
+ ids.append(None)
+ else:
+ ids.append(_truncate(o.uuid, o.sibling_uuids()))
+ return _assemble(ids, check_length=True)
+
+def child_uuids(child_storage_ids):
+ """Extract uuid children from other children generated by
+ :meth:`ID.storage`.
+
+ This is useful for separating data belonging to a particular
+ object directly from entries for its child objects. Since the
+ :class:`~libbe.storage.base.Storage` backend doesn't distinguish
+ between the two.
+
+ Examples
+ --------
+
+ >>> list(child_uuids(['abc123/values', '123abc', '123def']))
+ ['123abc', '123def']
+ """
+ for id in child_storage_ids:
+ fields = _split(id)
+ if len(fields) == 1:
+ yield fields[0]
+
+def long_to_short_user(bugdirs, id):
+ """Convert a long user ID to a short user ID (see :class:`ID`).
+ The list of bugdirs allows uniqueness-maintaining truncation of
+ the bugdir portion of the ID.
+
+ See Also
+ --------
+ short_to_long_user : inverse
+ long_to_short_text : conversion on a block of text
+ """
+ ids = _split(id, check_length=True)
+ matching_bugdirs = [bd for bd in bugdirs if bd.uuid == ids[0]]
+ if len(matching_bugdirs) == 0:
+ raise NoIDMatches(id, [bd.uuid for bd in bugdirs])
+ elif len(matching_bugdirs) > 1:
+ raise MultipleIDMatches(id, '', [bd.uuid for bd in bugdirs])
+ bugdir = matching_bugdirs[0]
+ objects = [bugdir]
+ if len(ids) >= 2:
+ bug = bugdir.bug_from_uuid(ids[1])
+ objects.append(bug)
+ if len(ids) >= 3:
+ comment = bug.comment_from_uuid(ids[2])
+ objects.append(comment)
+ for i,obj in enumerate(objects):
+ ids[i] = _truncate(ids[i], obj.sibling_uuids())
+ return _assemble(ids)
+
+def short_to_long_user(bugdirs, id):
+ """Convert a short user ID to a long user ID (see :class:`ID`). The
+ list of bugdirs allows uniqueness-checking during expansion of the
+ bugdir portion of the ID.
+
+ See Also
+ --------
+ long_to_short_user : inverse
+ short_to_long_text : conversion on a block of text
+ """
+ ids = _split(id, check_length=True)
+ ids[0] = _expand(ids[0], common=None,
+ other_ids=[bd.uuid for bd in bugdirs])
+ if len(ids) == 1:
+ return _assemble(ids)
+ bugdir = [bd for bd in bugdirs if bd.uuid == ids[0]][0]
+ ids[1] = _expand(ids[1], common=bugdir.id.user(),
+ other_ids=bugdir.uuids())
+ if len(ids) == 2:
+ return _assemble(ids)
+ bug = bugdir.bug_from_uuid(ids[1])
+ ids[2] = _expand(ids[2], common=bug.id.user(),
+ other_ids=bug.uuids())
+ return _assemble(ids)
+
+
+REGEXP = '#([-a-f0-9]*)(/[-a-g0-9]*)?(/[-a-g0-9]*)?#'
+"""Regular expression for matching IDs (both short and long) in text.
+"""
+
+class IDreplacer (object):
+ """Helper class for ID replacement in text.
+
+ Reassembles the match elements from :data:`REGEXP` matching
+ into the original ID, for easier replacement.
+
+ See Also
+ --------
+ short_to_long_text, long_to_short_text
+ """
+ def __init__(self, bugdirs, replace_fn, wrap=True):
+ self.bugdirs = bugdirs
+ self.replace_fn = replace_fn
+ self.wrap = wrap
+ def __call__(self, match):
+ ids = []
+ for m in match.groups():
+ if m == None:
+ m = ''
+ ids.append(m)
+ replacement = self.replace_fn(self.bugdirs, ''.join(ids))
+ if self.wrap == True:
+ return '#%s#' % replacement
+ return replacement
+
+def short_to_long_text(bugdirs, text):
+ """Convert short user IDs to long user IDs in text (see :class:`ID`).
+ The list of bugdirs allows uniqueness-checking during expansion of
+ the bugdir portion of the ID.
+
+ See Also
+ --------
+ short_to_long_user : conversion on a single ID
+ long_to_short_text : inverse
+ """
+ return re.sub(REGEXP, IDreplacer(bugdirs, short_to_long_user), text)
+
+def long_to_short_text(bugdirs, text):
+ """Convert long user IDs to short user IDs in text (see :class:`ID`).
+ The list of bugdirs allows uniqueness-maintaining truncation of
+ the bugdir portion of the ID.
+
+ See Also
+ --------
+ long_to_short_user : conversion on a single ID
+ short_to_long_text : inverse
+ """
+ return re.sub(REGEXP, IDreplacer(bugdirs, long_to_short_user), text)
+
+def residual(base, fragment):
+ """Split the short ID `fragment` into a portion corresponding
+ to `base`, and a portion inside `base`.
+
+ Examples
+ --------
+
+ >>> residual('ABC/DEF/', '//GHI')
+ ('//', 'GHI')
+ >>> residual('ABC/DEF/', '/D/GHI')
+ ('/D/', 'GHI')
+ >>> residual('ABC/DEF', 'A/D/GHI')
+ ('A/D/', 'GHI')
+ >>> residual('ABC/DEF', 'A/D/GHI/JKL')
+ ('A/D/', 'GHI/JKL')
+ """
+ base = base.rstrip('/') + '/'
+ ids = fragment.split('/')
+ base_count = base.count('/')
+ root_ids = ids[:base_count] + ['']
+ residual_ids = ids[base_count:]
+ return ('/'.join(root_ids), '/'.join(residual_ids))
+
+def _parse_user(id):
+ """Parse a user ID (see :class:`ID`), returning a dict of parsed
+ information.
+
+ The returned dict will contain a value for "type" (from
+ :data:`HIERARCHY`) and values for the levels that are defined.
+
+ Examples
+ --------
+
+ >>> _parse_user('ABC/DEF/GHI') == \\
+ ... {'bugdir':'ABC', 'bug':'DEF', 'comment':'GHI', 'type':'comment'}
+ True
+ >>> _parse_user('ABC/DEF') == \\
+ ... {'bugdir':'ABC', 'bug':'DEF', 'type':'bug'}
+ True
+ >>> _parse_user('ABC') == \\
+ ... {'bugdir':'ABC', 'type':'bugdir'}
+ True
+ >>> _parse_user('') == \\
+ ... {'bugdir':None, 'type':'bugdir'}
+ True
+ >>> _parse_user('/') == \\
+ ... {'bugdir':None, 'bug':None, 'type':'bug'}
+ True
+ >>> _parse_user('/DEF/') == \\
+ ... {'bugdir':None, 'bug':'DEF', 'comment':None, 'type':'comment'}
+ True
+ >>> _parse_user('a/b/c/d')
+ Traceback (most recent call last):
+ ...
+ InvalidIDStructure: 4 > 3 levels in "a/b/c/d"
+ """
+ ret = {}
+ args = _split(id, check_length=True)
+ for i,(type,arg) in enumerate(zip(HIERARCHY, args)):
+ if arg != None and len(arg) == 0:
+ raise InvalidIDStructure(
+ id, 'Invalid %s part %d "%s" of id "%s"' % (type, i, arg, id))
+ ret['type'] = type
+ ret[type] = arg
+ return ret
+
+def parse_user(bugdir, id):
+ """Parse a user ID (see :class:`ID`), returning a dict of parsed
+ information.
+
+ The returned dict will contain a value for "type" (from
+ :data:`HIERARCHY`) and values for the levels that are defined.
+
+ Notes
+ -----
+ This function tries to expand IDs before parsing, so it can handle
+ both short and long IDs successfully.
+ """
+ long_id = short_to_long_user([bugdir], id)
+ return _parse_user(long_id)
+
+if libbe.TESTING == True:
+ class UUIDtestCase(unittest.TestCase):
+ def testUUID_gen(self):
+ id = uuid_gen()
+ self.failUnless(len(id) == 36, 'invalid UUID "%s"' % id)
+
+ class DummyObject (object):
+ def __init__(self, uuid, parent=None, siblings=[]):
+ self.uuid = uuid
+ self._siblings = siblings
+ if parent == None:
+ type_i = 0
+ else:
+ assert parent.type in HIERARCHY, parent
+ setattr(self, parent.type, parent)
+ type_i = HIERARCHY.index(parent.type) + 1
+ self.type = HIERARCHY[type_i]
+ self.id = ID(self, self.type)
+ def sibling_uuids(self):
+ return self._siblings
+
+ class IDtestCase(unittest.TestCase):
+ def setUp(self):
+ self.bugdir = DummyObject('1234abcd')
+ self.bug = DummyObject('abcdef', self.bugdir, ['a1234', 'ab9876'])
+ self.comment = DummyObject('12345678', self.bug, ['1234abcd', '1234cdef'])
+ self.bd_id = self.bugdir.id
+ self.b_id = self.bug.id
+ self.c_id = self.comment.id
+ def test_storage(self):
+ self.failUnless(self.bd_id.storage() == self.bugdir.uuid,
+ self.bd_id.storage())
+ self.failUnless(self.b_id.storage() == self.bug.uuid,
+ self.b_id.storage())
+ self.failUnless(self.c_id.storage() == self.comment.uuid,
+ self.c_id.storage())
+ self.failUnless(self.bd_id.storage('x', 'y', 'z') == \
+ '1234abcd/x/y/z',
+ self.bd_id.storage('x', 'y', 'z'))
+ def test_long_user(self):
+ self.failUnless(self.bd_id.long_user() == self.bugdir.uuid,
+ self.bd_id.long_user())
+ self.failUnless(self.b_id.long_user() == \
+ '/'.join([self.bugdir.uuid, self.bug.uuid]),
+ self.b_id.long_user())
+ self.failUnless(self.c_id.long_user() ==
+ '/'.join([self.bugdir.uuid, self.bug.uuid,
+ self.comment.uuid]),
+ self.c_id.long_user)
+ def test_user(self):
+ self.failUnless(self.bd_id.user() == '123',
+ self.bd_id.user())
+ self.failUnless(self.b_id.user() == '123/abc',
+ self.b_id.user())
+ self.failUnless(self.c_id.user() == '123/abc/12345',
+ self.c_id.user())
+
+ class ShortLongParseTestCase(unittest.TestCase):
+ def setUp(self):
+ self.bugdir = DummyObject('1234abcd')
+ self.bug = DummyObject('abcdef', self.bugdir, ['a1234', 'ab9876'])
+ self.comment = DummyObject('12345678', self.bug, ['1234abcd', '1234cdef'])
+ self.bd_id = self.bugdir.id
+ self.b_id = self.bug.id
+ self.c_id = self.comment.id
+ self.bugdir.bug_from_uuid = lambda uuid: self.bug
+ self.bugdir.uuids = lambda : self.bug.sibling_uuids() + [self.bug.uuid]
+ self.bug.comment_from_uuid = lambda uuid: self.comment
+ self.bug.uuids = lambda : self.comment.sibling_uuids() + [self.comment.uuid]
+ self.short = 'bla bla #123/abc# bla bla #123/abc/12345# bla bla'
+ self.long = 'bla bla #1234abcd/abcdef# bla bla #1234abcd/abcdef/12345678# bla bla'
+ self.short_id_parse_pairs = [
+ ('', {'bugdir':'1234abcd', 'type':'bugdir'}),
+ ('123/abc', {'bugdir':'1234abcd', 'bug':'abcdef',
+ 'type':'bug'}),
+ ('123/abc/12345', {'bugdir':'1234abcd', 'bug':'abcdef',
+ 'comment':'12345678', 'type':'comment'}),
+ ]
+ self.short_id_exception_pairs = [
+ ('z', NoIDMatches('z', ['1234abcd'])),
+ ('///', InvalidIDStructure(
+ '///', msg='4 > 3 levels in "///"')),
+ ('/', MultipleIDMatches(
+ None, '123', ['a1234', 'ab9876', 'abcdef'])),
+ ('123/', MultipleIDMatches(
+ None, '123', ['a1234', 'ab9876', 'abcdef'])),
+ ('123/abc/', MultipleIDMatches(
+ None, '123/abc', ['1234abcd','1234cdef','12345678'])),
+ ]
+ def test_short_to_long_text(self):
+ self.failUnless(short_to_long_text([self.bugdir], self.short) == self.long,
+ '\n' + self.short + '\n' + short_to_long_text([self.bugdir], self.short) + '\n' + self.long)
+ def test_long_to_short_text(self):
+ self.failUnless(long_to_short_text([self.bugdir], self.long) == self.short,
+ '\n' + long_to_short_text([self.bugdir], self.long) + '\n' + self.short)
+ def test_parse_user(self):
+ for short_id,parsed in self.short_id_parse_pairs:
+ ret = parse_user(self.bugdir, short_id)
+ self.failUnless(ret == parsed,
+ 'got %s\nexpected %s' % (ret, parsed))
+ def test_parse_user_exceptions(self):
+ for short_id,exception in self.short_id_exception_pairs:
+ try:
+ ret = parse_user(self.bugdir, short_id)
+ self.fail('Expected parse_user(bugdir, "%s") to raise %s,'
+ '\n but it returned %s'
+ % (short_id, exception.__class__.__name__, ret))
+ except exception.__class__, e:
+ for attr in dir(e):
+ if attr.startswith('_') or attr == 'args':
+ continue
+ value = getattr(e, attr)
+ expected = getattr(exception, attr)
+ self.failUnless(
+ value == expected,
+ 'Expected parse_user(bugdir, "%s") %s.%s'
+ '\n to be %s, but it is %s\n\n%s'
+ % (short_id, exception.__class__.__name__,
+ attr, expected, value, e))
+
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
diff --git a/libbe/util/plugin.py b/libbe/util/plugin.py
new file mode 100644
index 0000000..e598c34
--- /dev/null
+++ b/libbe/util/plugin.py
@@ -0,0 +1,67 @@
+# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
+# Gianluca Montecchi <gian@grys.it>
+# Marien Zwart <marienz@gentoo.org>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Allow simple listing and loading of the various becommands and libbe
+submodules (i.e. "plugins").
+"""
+
+import os
+import os.path
+import sys
+
+
+_PLUGIN_PATH = os.path.realpath(
+ os.path.dirname(
+ os.path.dirname(
+ os.path.dirname(__file__))))
+if _PLUGIN_PATH not in sys.path:
+ sys.path.append(_PLUGIN_PATH)
+
+def import_by_name(modname):
+ """
+ >>> mod = import_by_name('libbe.bugdir')
+ >>> 'BugDir' in dir(mod)
+ True
+ >>> import_by_name('libbe.highly_unlikely')
+ Traceback (most recent call last):
+ ...
+ ImportError: No module named highly_unlikely
+ """
+ module = __import__(modname)
+ components = modname.split('.')
+ for comp in components[1:]:
+ module = getattr(module, comp)
+ return module
+
+def modnames(prefix):
+ """
+ >>> 'list' in [n for n in modnames('libbe.command')]
+ True
+ >>> 'plugin' in [n for n in modnames('libbe.util')]
+ True
+ """
+ components = prefix.split('.')
+ modfiles = os.listdir(os.path.join(_PLUGIN_PATH, *components))
+ modfiles.sort()
+ for modfile in modfiles:
+ if modfile.startswith('.'):
+ continue # the occasional emacs temporary file
+ if modfile.endswith('.py') and modfile != '__init__.py':
+ yield modfile[:-3]
diff --git a/libbe/util/subproc.py b/libbe/util/subproc.py
new file mode 100644
index 0000000..b02b8e8
--- /dev/null
+++ b/libbe/util/subproc.py
@@ -0,0 +1,223 @@
+# Copyright (C) 2009-2010 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Functions for running external commands in subprocesses.
+"""
+
+from subprocess import Popen, PIPE
+import sys
+
+import libbe
+from encoding import get_encoding
+if libbe.TESTING == True:
+ import doctest
+
+_MSWINDOWS = sys.platform == 'win32'
+_POSIX = not _MSWINDOWS
+
+if _POSIX == True:
+ import os
+ import select
+
+class CommandError(Exception):
+ def __init__(self, command, status, stdout=None, stderr=None):
+ strerror = ['Command failed (%d):\n %s\n' % (status, stderr),
+ 'while executing\n %s' % str(command)]
+ Exception.__init__(self, '\n'.join(strerror))
+ self.command = command
+ self.status = status
+ self.stdout = stdout
+ self.stderr = stderr
+
+def invoke(args, stdin=None, stdout=PIPE, stderr=PIPE, expect=(0,),
+ cwd=None, unicode_output=True, verbose=False, encoding=None):
+ """
+ expect should be a tuple of allowed exit codes. cwd should be
+ the directory from which the command will be executed. When
+ unicode_output == True, convert stdout and stdin strings to
+ unicode before returing them.
+ """
+ if cwd == None:
+ cwd = '.'
+ if verbose == True:
+ print >> sys.stderr, '%s$ %s' % (cwd, ' '.join(args))
+ try :
+ if _POSIX:
+ q = Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr, cwd=cwd)
+ else:
+ assert _MSWINDOWS==True, 'invalid platform'
+ # win32 don't have os.execvp() so have to run command in a shell
+ q = Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr,
+ shell=True, cwd=cwd)
+ except OSError, e:
+ raise CommandError(args, status=e.args[0], stderr=e)
+ stdout,stderr = q.communicate(input=stdin)
+ status = q.wait()
+ if unicode_output == True:
+ if encoding == None:
+ encoding = get_encoding()
+ if stdout != None:
+ stdout = unicode(stdout, encoding)
+ if stderr != None:
+ stderr = unicode(stderr, encoding)
+ if verbose == True:
+ print >> sys.stderr, '%d\n%s%s' % (status, stdout, stderr)
+ if status not in expect:
+ raise CommandError(args, status, stdout, stderr)
+ return status, stdout, stderr
+
+class Pipe (object):
+ """
+ Simple interface for executing POSIX-style pipes based on the
+ subprocess module. The only complication is the adaptation of
+ subprocess.Popen._comminucate to listen to the stderrs of all
+ processes involved in the pipe, as well as the terminal process'
+ stdout. There are two implementations of Pipe._communicate, one
+ for MS Windows, and one for POSIX systems. The MS Windows
+ implementation is currently untested.
+
+ >>> p = Pipe([['find', '/etc/'], ['grep', '^/etc/ssh$']])
+ >>> p.stdout
+ '/etc/ssh\\n'
+ >>> p.status
+ 1
+ >>> p.statuses
+ [1, 0]
+ >>> p.stderrs # doctest: +ELLIPSIS
+ [...find: ...: Permission denied..., '']
+ """
+ def __init__(self, cmds, stdin=None):
+ # spawn processes
+ self._procs = []
+ for cmd in cmds:
+ if len(self._procs) != 0:
+ stdin = self._procs[-1].stdout
+ self._procs.append(Popen(cmd, stdin=stdin, stdout=PIPE, stderr=PIPE))
+
+ self.stdout,self.stderrs = self._communicate(input=None)
+
+ # collect process statuses
+ self.statuses = []
+ self.status = 0
+ for proc in self._procs:
+ self.statuses.append(proc.wait())
+ if self.statuses[-1] != 0:
+ self.status = self.statuses[-1]
+
+ # Code excerpted from subprocess.Popen._communicate()
+ if _MSWINDOWS == True:
+ def _communicate(self, input=None):
+ assert input == None, 'stdin != None not yet supported'
+ # listen to each process' stderr
+ threads = []
+ std_X_arrays = []
+ for proc in self._procs:
+ stderr_array = []
+ thread = Thread(target=proc._readerthread,
+ args=(proc.stderr, stderr_array))
+ thread.setDaemon(True)
+ thread.start()
+ threads.append(thread)
+ std_X_arrays.append(stderr_array)
+
+ # also listen to the last processes stdout
+ stdout_array = []
+ thread = Thread(target=proc._readerthread,
+ args=(proc.stdout, stdout_array))
+ thread.setDaemon(True)
+ thread.start()
+ threads.append(thread)
+ std_X_arrays.append(stdout_array)
+
+ # join threads as they die
+ for thread in threads:
+ thread.join()
+
+ # read output from reader threads
+ std_X_strings = []
+ for std_X_array in std_X_arrays:
+ std_X_strings.append(std_X_array[0])
+
+ stdout = std_X_strings.pop(-1)
+ stderrs = std_X_strings
+ return (stdout, stderrs)
+ else:
+ assert _POSIX==True, 'invalid platform'
+ def _communicate(self, input=None):
+ read_set = []
+ write_set = []
+ read_arrays = []
+ stdout = None # Return
+ stderr = None # Return
+
+ if self._procs[0].stdin:
+ # Flush stdio buffer. This might block, if the user has
+ # been writing to .stdin in an uncontrolled fashion.
+ self._procs[0].stdin.flush()
+ if input:
+ write_set.append(self._procs[0].stdin)
+ else:
+ self._procs[0].stdin.close()
+ for proc in self._procs:
+ read_set.append(proc.stderr)
+ read_arrays.append([])
+ read_set.append(self._procs[-1].stdout)
+ read_arrays.append([])
+
+ input_offset = 0
+ while read_set or write_set:
+ try:
+ rlist, wlist, xlist = select.select(read_set, write_set, [])
+ except select.error, e:
+ if e.args[0] == errno.EINTR:
+ continue
+ raise
+ if self._procs[0].stdin in wlist:
+ # When select has indicated that the file is writable,
+ # we can write up to PIPE_BUF bytes without risk
+ # blocking. POSIX defines PIPE_BUF >= 512
+ chunk = input[input_offset : input_offset + 512]
+ bytes_written = os.write(self.stdin.fileno(), chunk)
+ input_offset += bytes_written
+ if input_offset >= len(input):
+ self._procs[0].stdin.close()
+ write_set.remove(self._procs[0].stdin)
+ if self._procs[-1].stdout in rlist:
+ data = os.read(self._procs[-1].stdout.fileno(), 1024)
+ if data == '':
+ self._procs[-1].stdout.close()
+ read_set.remove(self._procs[-1].stdout)
+ read_arrays[-1].append(data)
+ for i,proc in enumerate(self._procs):
+ if proc.stderr in rlist:
+ data = os.read(proc.stderr.fileno(), 1024)
+ if data == '':
+ proc.stderr.close()
+ read_set.remove(proc.stderr)
+ read_arrays[i].append(data)
+
+ # All data exchanged. Translate lists into strings.
+ read_strings = []
+ for read_array in read_arrays:
+ read_strings.append(''.join(read_array))
+
+ stdout = read_strings.pop(-1)
+ stderrs = read_strings
+ return (stdout, stderrs)
+
+if libbe.TESTING == True:
+ suite = doctest.DocTestSuite()
diff --git a/libbe/util/tree.py b/libbe/util/tree.py
new file mode 100644
index 0000000..812b0bd
--- /dev/null
+++ b/libbe/util/tree.py
@@ -0,0 +1,258 @@
+# Bugs Everywhere, a distributed bugtracker
+# Copyright (C) 2008-2010 Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""Define :class:`Tree`, a traversable tree structure.
+"""
+
+import libbe
+if libbe.TESTING == True:
+ import doctest
+
+class Tree(list):
+ """A traversable tree structure.
+
+ Examples
+ --------
+
+ Construct::
+
+ +-b---d-g
+ a-+ +-e
+ +-c-+-f-h-i
+
+ with
+
+ >>> i = Tree(); i.n = "i"
+ >>> h = Tree([i]); h.n = "h"
+ >>> f = Tree([h]); f.n = "f"
+ >>> e = Tree(); e.n = "e"
+ >>> c = Tree([f,e]); c.n = "c"
+ >>> g = Tree(); g.n = "g"
+ >>> d = Tree([g]); d.n = "d"
+ >>> b = Tree([d]); b.n = "b"
+ >>> a = Tree(); a.n = "a"
+ >>> a.append(c)
+ >>> a.append(b)
+
+ Get the longest branch length with
+
+ >>> a.branch_len()
+ 5
+
+ Sort the tree recursively. Here we sort longest branch length
+ first.
+
+ >>> a.sort(key=lambda node : -node.branch_len())
+ >>> "".join([node.n for node in a.traverse()])
+ 'acfhiebdg'
+
+ And here we sort shortest branch length first.
+
+ >>> a.sort(key=lambda node : node.branch_len())
+ >>> "".join([node.n for node in a.traverse()])
+ 'abdgcefhi'
+
+ We can also do breadth-first traverses.
+
+ >>> "".join([node.n for node in a.traverse(depth_first=False)])
+ 'abcdefghi'
+
+ Serialize the tree with depth marking branches.
+
+ >>> for depth,node in a.thread():
+ ... print "%*s" % (2*depth+1, node.n)
+ a
+ b
+ d
+ g
+ c
+ e
+ f
+ h
+ i
+
+ Flattening the thread disables depth increases except at
+ branch splits.
+
+ >>> for depth,node in a.thread(flatten=True):
+ ... print "%*s" % (2*depth+1, node.n)
+ a
+ b
+ d
+ g
+ c
+ e
+ f
+ h
+ i
+
+ We can also check if a node is contained in a tree.
+
+ >>> a.has_descendant(g)
+ True
+ >>> c.has_descendant(g)
+ False
+ >>> a.has_descendant(a)
+ False
+ >>> a.has_descendant(a, match_self=True)
+ True
+ """
+ def __cmp__(self, other):
+ return cmp(id(self), id(other))
+
+ def __eq__(self, other):
+ return self.__cmp__(other) == 0
+
+ def __ne__(self, other):
+ return self.__cmp__(other) != 0
+
+ def branch_len(self):
+ """Return the largest number of nodes from root to leaf (inclusive).
+
+ For the tree::
+
+ +-b---d-g
+ a-+ +-e
+ +-c-+-f-h-i
+
+ this method returns 5.
+
+ Notes
+ -----
+ Exhaustive search every time == *slow*.
+
+ Use only on small trees, or reimplement by overriding
+ child-addition methods to allow accurate caching.
+ """
+ if len(self) == 0:
+ return 1
+ else:
+ return 1 + max([child.branch_len() for child in self])
+
+ def sort(self, *args, **kwargs):
+ """Sort the tree recursively.
+
+ This method extends :meth:`list.sort` to Trees.
+
+ Notes
+ -----
+ This method can be slow, e.g. on a :meth:`branch_len` sort,
+ since a node at depth `N` from the root has it's
+ :meth:`branch_len` method called `N` times.
+ """
+ list.sort(self, *args, **kwargs)
+ for child in self:
+ child.sort(*args, **kwargs)
+
+ def traverse(self, depth_first=True):
+ """Generate all the nodes in a tree, starting with the root node.
+
+ Parameters
+ ----------
+ depth_first : bool
+ Depth first by default, but you can set `depth_first` to
+ `False` for breadth first ordering. Siblings are returned
+ in the order they are stored, so you might want to
+ :meth:`sort` your tree first.
+ """
+ if depth_first == True:
+ yield self
+ for child in self:
+ for descendant in child.traverse():
+ yield descendant
+ else: # breadth first, Wikipedia algorithm
+ # http://en.wikipedia.org/wiki/Breadth-first_search
+ queue = [self]
+ while len(queue) > 0:
+ node = queue.pop(0)
+ yield node
+ queue.extend(node)
+
+ def thread(self, flatten=False):
+ """Generate a (depth, node) tuple for every node in the tree.
+
+ When `flatten` is `False`, the depth of any node is one
+ greater than the depth of its parent. That way the
+ inheritance is explicit, but you can end up with highly
+ indented threads.
+
+ When `flatten` is `True`, the depth of any node is only
+ greater than the depth of its parent when there is a branch,
+ and the node is not the last child. This can lead to ancestry
+ ambiguity, but keeps the total indentation down. For example::
+
+ +-b +-b-c
+ a-+-c and a-+
+ +-d-e-f +-d-e-f
+
+ would both produce (after sorting by :meth:`branch_len`)::
+
+ (0, a)
+ (1, b)
+ (1, c)
+ (0, d)
+ (0, e)
+ (0, f)
+
+ """
+ stack = [] # ancestry of the current node
+ if flatten == True:
+ depthDict = {}
+
+ for node in self.traverse(depth_first=True):
+ while len(stack) > 0 \
+ and id(node) not in [id(c) for c in stack[-1]]:
+ stack.pop(-1)
+ if flatten == False:
+ depth = len(stack)
+ else:
+ if len(stack) == 0:
+ depth = 0
+ else:
+ parent = stack[-1]
+ depth = depthDict[id(parent)]
+ if len(parent) > 1 and node != parent[-1]:
+ depth += 1
+ depthDict[id(node)] = depth
+ yield (depth,node)
+ stack.append(node)
+
+ def has_descendant(self, descendant, depth_first=True, match_self=False):
+ """Check if a node is contained in a tree.
+
+ Parameters
+ ----------
+ descendant : Tree
+ The potential descendant.
+ depth_first : bool
+ The search order. Set this if you feel depth/breadth would
+ be a faster search.
+ match_self : bool
+ Set to `True` for::
+
+ x.has_descendant(x, match_self=True) -> True
+ """
+ if descendant == self:
+ return match_self
+ for d in self.traverse(depth_first):
+ if descendant == d:
+ return True
+ return False
+
+if libbe.TESTING == True:
+ suite = doctest.DocTestSuite()
diff --git a/libbe/util/utility.py b/libbe/util/utility.py
new file mode 100644
index 0000000..92ca0d5
--- /dev/null
+++ b/libbe/util/utility.py
@@ -0,0 +1,248 @@
+# Copyright (C) 2005-2010 Aaron Bentley and Panometrics, Inc.
+# Gianluca Montecchi <gian@grys.it>
+# W. Trevor King <wking@drexel.edu>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+Assorted utility functions that don't fit in anywhere else.
+"""
+
+import calendar
+import codecs
+import os
+import shutil
+import tempfile
+import time
+import types
+
+import libbe
+if libbe.TESTING == True:
+ import doctest
+
+class InvalidXML(ValueError):
+ """Invalid XML while parsing for a `*.from_xml()` method.
+
+ Parameters
+ ----------
+ type : str
+ String identifying `*`, e.g. "bug", "comment", ...
+ element : :class:`ElementTree.Element`
+ ElementTree.Element instance which caused the error.
+ error : str
+ Error description.
+ """
+ def __init__(self, type, element, error):
+ msg = 'Invalid %s xml: %s\n %s\n' \
+ % (type, error, ElementTree.tostring(element))
+ ValueError.__init__(self, msg)
+ self.type = type
+ self.element = element
+ self.error = error
+
+def search_parent_directories(path, filename):
+ """
+ Find the file (or directory) named filename in path or in any
+ of path's parents. For example::
+
+ search_parent_directories("/a/b/c", ".be")
+
+ will return the path to the first existing file from::
+
+ /a/b/c/.be
+ /a/b/.be
+ /a/.be
+ /.be
+
+ or `None` if none of those files exist.
+ """
+ path = os.path.realpath(path)
+ assert os.path.exists(path)
+ old_path = None
+ while True:
+ check_path = os.path.join(path, filename)
+ if os.path.exists(check_path):
+ return check_path
+ if path == old_path:
+ return None
+ old_path = path
+ path = os.path.dirname(path)
+
+class Dir (object):
+ """A temporary directory for testing use.
+
+ Make sure you run :meth:`cleanup` after you're done using the
+ directory.
+ """
+ def __init__(self):
+ self.path = tempfile.mkdtemp(prefix="BEtest")
+ self.removed = False
+ def cleanup(self):
+ if self.removed == False:
+ shutil.rmtree(self.path)
+ self.removed = True
+ def __call__(self):
+ return self.path
+
+RFC_2822_TIME_FMT = "%a, %d %b %Y %H:%M:%S +0000"
+"""RFC 2822 [#]_ format string for :func:`time.strftime` and
+:func:`time.strptime`.
+
+.. [#] See `RFC 2822`_, sections 3.3 and A.1.1.
+.. _RFC 2822: http://www.faqs.org/rfcs/rfc2822.html
+"""
+
+def time_to_str(time_val):
+ """Convert a time number into an RFC 2822-formatted string.
+
+ Parameters
+ ----------
+ time_val : float
+ Float seconds since the Epoc, see :func:`time.time`.
+ Note that while `time_val` may contain sub-second data,
+ the output string will not.
+
+ Examples
+ --------
+
+ >>> time_to_str(0)
+ 'Thu, 01 Jan 1970 00:00:00 +0000'
+
+ See Also
+ --------
+ str_to_time : inverse
+ handy_time : localtime string
+ """
+ return time.strftime(RFC_2822_TIME_FMT, time.gmtime(time_val))
+
+def str_to_time(str_time):
+ """Convert an RFC 2822-fomatted string into a time value.
+
+ Parameters
+ ----------
+ str_time : str
+ An RFC 2822-formatted string.
+
+ Examples
+ --------
+
+ >>> str_to_time("Thu, 01 Jan 1970 00:00:00 +0000")
+ 0
+ >>> q = time.time()
+ >>> str_to_time(time_to_str(q)) == int(q)
+ True
+ >>> str_to_time("Thu, 01 Jan 1970 00:00:00 -1000")
+ 36000
+
+ See Also
+ --------
+ time_to_str : inverse
+ """
+ timezone_str = str_time[-5:]
+ if timezone_str != "+0000":
+ str_time = str_time.replace(timezone_str, "+0000")
+ time_val = calendar.timegm(time.strptime(str_time, RFC_2822_TIME_FMT))
+ timesign = -int(timezone_str[0]+"1") # "+" -> time_val ahead of GMT
+ timezone_tuple = time.strptime(timezone_str[1:], "%H%M")
+ timezone = timezone_tuple.tm_hour*3600 + timezone_tuple.tm_min*60
+ return time_val + timesign*timezone
+
+def handy_time(time_val):
+ """Convert a time number into a useful localtime.
+
+ Where :func:`time_to_str` returns GMT +0000, `handy_time` returns
+ a string in local time. This may be more accessible for the user.
+
+ Parameters
+ ----------
+ time_val : float
+ Float seconds since the Epoc, see :func:`time.time`.
+ """
+ return time.strftime("%a, %d %b %Y %H:%M", time.localtime(time_val))
+
+def time_to_gmtime(str_time):
+ """Convert an RFC 2822-fomatted string to a GMT string.
+
+ Parameters
+ ----------
+ str_time : str
+ An RFC 2822-formatted string.
+
+ Examples
+ --------
+
+ >>> time_to_gmtime("Thu, 01 Jan 1970 00:00:00 -1000")
+ 'Thu, 01 Jan 1970 10:00:00 +0000'
+ """
+ time_val = str_to_time(str_time)
+ return time_to_str(time_val)
+
+def iterable_full_of_strings(value, alternative=None):
+ """Require an iterable full of strings.
+
+ This is useful, for example, in validating `*.extra_strings`.
+ See :attr:`libbe.bugdir.BugDir.extra_strings`
+
+ Parameters
+ ----------
+ value : list or None
+ The potential list of strings.
+ alternative
+ Allow a default (e.g. `None`), such that::
+
+ iterable_full_of_strings(value=x, alternative=x) -> True
+
+ Examples
+ --------
+
+ >>> iterable_full_of_strings([])
+ True
+ >>> iterable_full_of_strings(["abc", "def", u"hij"])
+ True
+ >>> iterable_full_of_strings(["abc", None, u"hij"])
+ False
+ >>> iterable_full_of_strings(None, alternative=None)
+ True
+ """
+ if value == alternative:
+ return True
+ elif not hasattr(value, '__iter__'):
+ return False
+ for x in value:
+ if type(x) not in types.StringTypes:
+ return False
+ return True
+
+def underlined(string, char='='):
+ """Produces a version of a string that is underlined.
+
+ Parameters
+ ----------
+ string : str
+ The string to underline
+ char : str
+ The character to use for the underlining.
+
+ Examples
+ --------
+
+ >>> underlined("Underlined String")
+ 'Underlined String\\n================='
+ """
+ assert len(char) == 0, char
+ return '%s\n%s' % (string, char*len(string))
+
+if libbe.TESTING == True:
+ suite = doctest.DocTestSuite()