diff options
Diffstat (limited to 'libbe/util')
-rw-r--r-- | libbe/util/beuuid.py | 67 | ||||
-rw-r--r-- | libbe/util/encoding.py | 66 | ||||
-rw-r--r-- | libbe/util/subproc.py | 223 | ||||
-rw-r--r-- | libbe/util/tree.py | 193 | ||||
-rw-r--r-- | libbe/util/utility.py | 151 |
5 files changed, 700 insertions, 0 deletions
diff --git a/libbe/util/beuuid.py b/libbe/util/beuuid.py new file mode 100644 index 0000000..a3a3b6c --- /dev/null +++ b/libbe/util/beuuid.py @@ -0,0 +1,67 @@ +# Copyright (C) 2008-2009 Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Backwards compatibility support for Python 2.4. Once people give up +on 2.4 ;), the uuid call should be merged into bugdir.py +""" + +import libbe +if libbe.TESTING == True: + import unittest + + +try: + from uuid import uuid4 # Python >= 2.5 + def uuid_gen(): + id = uuid4() + idstr = id.urn + start = "urn:uuid:" + assert idstr.startswith(start) + return idstr[len(start):] +except ImportError: + import os + import sys + from subprocess import Popen, PIPE + + def uuid_gen(): + # Shell-out to system uuidgen + args = ['uuidgen', 'r'] + try: + if sys.platform != "win32": + q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) + else: + # win32 don't have os.execvp() so have to run command in a shell + q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, + shell=True, cwd=cwd) + except OSError, e : + strerror = "%s\nwhile executing %s" % (e.args[1], args) + raise OSError, strerror + output, error = q.communicate() + status = q.wait() + if status != 0: + strerror = "%s\nwhile executing %s" % (status, args) + raise Exception, strerror + return output.rstrip('\n') + +if libbe.TESTING == True: + class UUIDtestCase(unittest.TestCase): + def testUUID_gen(self): + id = uuid_gen() + self.failUnless(len(id) == 36, "invalid UUID '%s'" % id) + + suite = unittest.TestLoader().loadTestsFromTestCase(UUIDtestCase) diff --git a/libbe/util/encoding.py b/libbe/util/encoding.py new file mode 100644 index 0000000..d09117f --- /dev/null +++ b/libbe/util/encoding.py @@ -0,0 +1,66 @@ +# Bugs Everywhere, a distributed bugtracker +# Copyright (C) 2008-2009 Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Support input/output/filesystem encodings (e.g. UTF-8). +""" + +import codecs +import locale +import sys + +import libbe +if libbe.TESTING == True: + import doctest + + +ENCODING = None # override get_encoding() output by setting this + +def get_encoding(): + """ + Guess a useful input/output/filesystem encoding... Maybe we need + seperate encodings for input/output and filesystem? Hmm... + """ + if ENCODING != None: + return ENCODING + encoding = locale.getpreferredencoding() or sys.getdefaultencoding() + if sys.platform != 'win32' or sys.version_info[:2] > (2, 3): + encoding = locale.getlocale(locale.LC_TIME)[1] or encoding + # Python 2.3 on windows doesn't know about 'XYZ' alias for 'cpXYZ' + return encoding + +def known_encoding(encoding): + """ + >>> known_encoding("highly-unlikely-encoding") + False + >>> known_encoding(get_encoding()) + True + """ + try: + codecs.lookup(encoding) + return True + except LookupError: + return False + +def set_IO_stream_encodings(encoding): + sys.stdin = codecs.getreader(encoding)(sys.__stdin__) + sys.stdout = codecs.getwriter(encoding)(sys.__stdout__) + sys.stderr = codecs.getwriter(encoding)(sys.__stderr__) + +if libbe.TESTING == True: + suite = doctest.DocTestSuite() diff --git a/libbe/util/subproc.py b/libbe/util/subproc.py new file mode 100644 index 0000000..8806e26 --- /dev/null +++ b/libbe/util/subproc.py @@ -0,0 +1,223 @@ +# Copyright (C) 2009 W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Functions for running external commands in subprocesses. +""" + +from subprocess import Popen, PIPE +import sys + +import libbe +from encoding import get_encoding +if libbe.TESTING == True: + import doctest + +_MSWINDOWS = sys.platform == 'win32' +_POSIX = not _MSWINDOWS + +if _POSIX == True: + import os + import select + +class CommandError(Exception): + def __init__(self, command, status, stdout=None, stderr=None): + strerror = ['Command failed (%d):\n %s\n' % (status, stderr), + 'while executing\n %s' % command] + Exception.__init__(self, '\n'.join(strerror)) + self.command = command + self.status = status + self.stdout = stdout + self.stderr = stderr + +def invoke(args, stdin=None, stdout=PIPE, stderr=PIPE, expect=(0,), + cwd=None, unicode_output=True, verbose=False, encoding=None): + """ + expect should be a tuple of allowed exit codes. cwd should be + the directory from which the command will be executed. When + unicode_output == True, convert stdout and stdin strings to + unicode before returing them. + """ + if cwd == None: + cwd = '.' + if verbose == True: + print >> sys.stderr, '%s$ %s' % (cwd, ' '.join(args)) + try : + if _POSIX: + q = Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr, cwd=cwd) + else: + assert _MSWINDOWS==True, 'invalid platform' + # win32 don't have os.execvp() so have to run command in a shell + q = Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr, + shell=True, cwd=cwd) + except OSError, e: + raise CommandError(args, status=e.args[0], stderr=e) + stdout,stderr = q.communicate(input=stdin) + status = q.wait() + if unicode_output == True: + if encoding == None: + encoding = get_encoding() + if stdout != None: + stdout = unicode(stdout, encoding) + if stderr != None: + stderr = unicode(stderr, encoding) + if verbose == True: + print >> sys.stderr, '%d\n%s%s' % (status, stdout, stderr) + if status not in expect: + raise CommandError(args, status, stdout, stderr) + return status, stdout, stderr + +class Pipe (object): + """ + Simple interface for executing POSIX-style pipes based on the + subprocess module. The only complication is the adaptation of + subprocess.Popen._comminucate to listen to the stderrs of all + processes involved in the pipe, as well as the terminal process' + stdout. There are two implementations of Pipe._communicate, one + for MS Windows, and one for POSIX systems. The MS Windows + implementation is currently untested. + + >>> p = Pipe([['find', '/etc/'], ['grep', '^/etc/ssh$']]) + >>> p.stdout + '/etc/ssh\\n' + >>> p.status + 1 + >>> p.statuses + [1, 0] + >>> p.stderrs # doctest: +ELLIPSIS + [...find: ...: Permission denied..., ''] + """ + def __init__(self, cmds, stdin=None): + # spawn processes + self._procs = [] + for cmd in cmds: + if len(self._procs) != 0: + stdin = self._procs[-1].stdout + self._procs.append(Popen(cmd, stdin=stdin, stdout=PIPE, stderr=PIPE)) + + self.stdout,self.stderrs = self._communicate(input=None) + + # collect process statuses + self.statuses = [] + self.status = 0 + for proc in self._procs: + self.statuses.append(proc.wait()) + if self.statuses[-1] != 0: + self.status = self.statuses[-1] + + # Code excerpted from subprocess.Popen._communicate() + if _MSWINDOWS == True: + def _communicate(self, input=None): + assert input == None, 'stdin != None not yet supported' + # listen to each process' stderr + threads = [] + std_X_arrays = [] + for proc in self._procs: + stderr_array = [] + thread = Thread(target=proc._readerthread, + args=(proc.stderr, stderr_array)) + thread.setDaemon(True) + thread.start() + threads.append(thread) + std_X_arrays.append(stderr_array) + + # also listen to the last processes stdout + stdout_array = [] + thread = Thread(target=proc._readerthread, + args=(proc.stdout, stdout_array)) + thread.setDaemon(True) + thread.start() + threads.append(thread) + std_X_arrays.append(stdout_array) + + # join threads as they die + for thread in threads: + thread.join() + + # read output from reader threads + std_X_strings = [] + for std_X_array in std_X_arrays: + std_X_strings.append(std_X_array[0]) + + stdout = std_X_strings.pop(-1) + stderrs = std_X_strings + return (stdout, stderrs) + else: + assert _POSIX==True, 'invalid platform' + def _communicate(self, input=None): + read_set = [] + write_set = [] + read_arrays = [] + stdout = None # Return + stderr = None # Return + + if self._procs[0].stdin: + # Flush stdio buffer. This might block, if the user has + # been writing to .stdin in an uncontrolled fashion. + self._procs[0].stdin.flush() + if input: + write_set.append(self._procs[0].stdin) + else: + self._procs[0].stdin.close() + for proc in self._procs: + read_set.append(proc.stderr) + read_arrays.append([]) + read_set.append(self._procs[-1].stdout) + read_arrays.append([]) + + input_offset = 0 + while read_set or write_set: + try: + rlist, wlist, xlist = select.select(read_set, write_set, []) + except select.error, e: + if e.args[0] == errno.EINTR: + continue + raise + if self._procs[0].stdin in wlist: + # When select has indicated that the file is writable, + # we can write up to PIPE_BUF bytes without risk + # blocking. POSIX defines PIPE_BUF >= 512 + chunk = input[input_offset : input_offset + 512] + bytes_written = os.write(self.stdin.fileno(), chunk) + input_offset += bytes_written + if input_offset >= len(input): + self._procs[0].stdin.close() + write_set.remove(self._procs[0].stdin) + if self._procs[-1].stdout in rlist: + data = os.read(self._procs[-1].stdout.fileno(), 1024) + if data == '': + self._procs[-1].stdout.close() + read_set.remove(self._procs[-1].stdout) + read_arrays[-1].append(data) + for i,proc in enumerate(self._procs): + if proc.stderr in rlist: + data = os.read(proc.stderr.fileno(), 1024) + if data == '': + proc.stderr.close() + read_set.remove(proc.stderr) + read_arrays[i].append(data) + + # All data exchanged. Translate lists into strings. + read_strings = [] + for read_array in read_arrays: + read_strings.append(''.join(read_array)) + + stdout = read_strings.pop(-1) + stderrs = read_strings + return (stdout, stderrs) + +if libbe.TESTING == True: + suite = doctest.DocTestSuite() diff --git a/libbe/util/tree.py b/libbe/util/tree.py new file mode 100644 index 0000000..1daac44 --- /dev/null +++ b/libbe/util/tree.py @@ -0,0 +1,193 @@ +# Bugs Everywhere, a distributed bugtracker +# Copyright (C) 2008-2009 Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Define a traversable tree structure. +""" + +import libbe +if libbe.TESTING == True: + import doctest + +class Tree(list): + """ + Construct + +-b---d-g + a-+ +-e + +-c-+-f-h-i + with + >>> i = Tree(); i.n = "i" + >>> h = Tree([i]); h.n = "h" + >>> f = Tree([h]); f.n = "f" + >>> e = Tree(); e.n = "e" + >>> c = Tree([f,e]); c.n = "c" + >>> g = Tree(); g.n = "g" + >>> d = Tree([g]); d.n = "d" + >>> b = Tree([d]); b.n = "b" + >>> a = Tree(); a.n = "a" + >>> a.append(c) + >>> a.append(b) + + >>> a.branch_len() + 5 + >>> a.sort(key=lambda node : -node.branch_len()) + >>> "".join([node.n for node in a.traverse()]) + 'acfhiebdg' + >>> a.sort(key=lambda node : node.branch_len()) + >>> "".join([node.n for node in a.traverse()]) + 'abdgcefhi' + >>> "".join([node.n for node in a.traverse(depth_first=False)]) + 'abcdefghi' + >>> for depth,node in a.thread(): + ... print "%*s" % (2*depth+1, node.n) + a + b + d + g + c + e + f + h + i + >>> for depth,node in a.thread(flatten=True): + ... print "%*s" % (2*depth+1, node.n) + a + b + d + g + c + e + f + h + i + >>> a.has_descendant(g) + True + >>> c.has_descendant(g) + False + >>> a.has_descendant(a) + False + >>> a.has_descendant(a, match_self=True) + True + """ + def __cmp__(self, other): + return cmp(id(self), id(other)) + + def __eq__(self, other): + return self.__cmp__(other) == 0 + + def __ne__(self, other): + return self.__cmp__(other) != 0 + + def branch_len(self): + """ + Exhaustive search every time == SLOW. + + Use only on small trees, or reimplement by overriding + child-addition methods to allow accurate caching. + + For the tree + +-b---d-g + a-+ +-e + +-c-+-f-h-i + this method returns 5. + """ + if len(self) == 0: + return 1 + else: + return 1 + max([child.branch_len() for child in self]) + + def sort(self, *args, **kwargs): + """ + This method can be slow, e.g. on a branch_len() sort, since a + node at depth N from the root has it's branch_len() method + called N times. + """ + list.sort(self, *args, **kwargs) + for child in self: + child.sort(*args, **kwargs) + + def traverse(self, depth_first=True): + """ + Note: you might want to sort() your tree first. + """ + if depth_first == True: + yield self + for child in self: + for descendant in child.traverse(): + yield descendant + else: # breadth first, Wikipedia algorithm + # http://en.wikipedia.org/wiki/Breadth-first_search + queue = [self] + while len(queue) > 0: + node = queue.pop(0) + yield node + queue.extend(node) + + def thread(self, flatten=False): + """ + When flatten==False, the depth of any node is one greater than + the depth of its parent. That way the inheritance is + explicit, but you can end up with highly indented threads. + + When flatten==True, the depth of any node is only greater than + the depth of its parent when there is a branch, and the node + is not the last child. This can lead to ancestry ambiguity, + but keeps the total indentation down. E.g. + +-b +-b-c + a-+-c and a-+ + +-d-e-f +-d-e-f + would both produce (after sorting by branch_len()) + (0, a) + (1, b) + (1, c) + (0, d) + (0, e) + (0, f) + """ + stack = [] # ancestry of the current node + if flatten == True: + depthDict = {} + + for node in self.traverse(depth_first=True): + while len(stack) > 0 \ + and id(node) not in [id(c) for c in stack[-1]]: + stack.pop(-1) + if flatten == False: + depth = len(stack) + else: + if len(stack) == 0: + depth = 0 + else: + parent = stack[-1] + depth = depthDict[id(parent)] + if len(parent) > 1 and node != parent[-1]: + depth += 1 + depthDict[id(node)] = depth + yield (depth,node) + stack.append(node) + + def has_descendant(self, descendant, depth_first=True, match_self=False): + if descendant == self: + return match_self + for d in self.traverse(depth_first): + if descendant == d: + return True + return False + +if libbe.TESTING == True: + suite = doctest.DocTestSuite() diff --git a/libbe/util/utility.py b/libbe/util/utility.py new file mode 100644 index 0000000..f954422 --- /dev/null +++ b/libbe/util/utility.py @@ -0,0 +1,151 @@ +# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. +# Gianluca Montecchi <gian@grys.it> +# W. Trevor King <wking@drexel.edu> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +Assorted utility functions that don't fit in anywhere else. +""" + +import calendar +import codecs +import os +import shutil +import tempfile +import time +import types + +import libbe +if libbe.TESTING == True: + import doctest + +class InvalidXML(ValueError): + """ + Invalid XML while parsing for a *.from_xml() method. + type - string identifying *, e.g. "bug", "comment", ... + element - ElementTree.Element instance which caused the error + error - string describing the error + """ + def __init__(self, type, element, error): + msg = 'Invalid %s xml: %s\n %s\n' \ + % (type, error, ElementTree.tostring(element)) + ValueError.__init__(self, msg) + self.type = type + self.element = element + self.error = error + +def search_parent_directories(path, filename): + """ + Find the file (or directory) named filename in path or in any + of path's parents. + + e.g. + search_parent_directories("/a/b/c", ".be") + will return the path to the first existing file from + /a/b/c/.be + /a/b/.be + /a/.be + /.be + or None if none of those files exist. + """ + path = os.path.realpath(path) + assert os.path.exists(path) + old_path = None + while True: + check_path = os.path.join(path, filename) + if os.path.exists(check_path): + return check_path + if path == old_path: + return None + old_path = path + path = os.path.dirname(path) + +class Dir (object): + "A temporary directory for testing use" + def __init__(self): + self.path = tempfile.mkdtemp(prefix="BEtest") + self.removed = False + def cleanup(self): + if self.removed == False: + shutil.rmtree(self.path) + self.removed = True + def __call__(self): + return self.path + +RFC_2822_TIME_FMT = "%a, %d %b %Y %H:%M:%S +0000" + + +def time_to_str(time_val): + """Convert a time value into an RFC 2822-formatted string. This format + lacks sub-second data. + >>> time_to_str(0) + 'Thu, 01 Jan 1970 00:00:00 +0000' + """ + return time.strftime(RFC_2822_TIME_FMT, time.gmtime(time_val)) + +def str_to_time(str_time): + """Convert an RFC 2822-fomatted string into a time value. + >>> str_to_time("Thu, 01 Jan 1970 00:00:00 +0000") + 0 + >>> q = time.time() + >>> str_to_time(time_to_str(q)) == int(q) + True + >>> str_to_time("Thu, 01 Jan 1970 00:00:00 -1000") + 36000 + """ + timezone_str = str_time[-5:] + if timezone_str != "+0000": + str_time = str_time.replace(timezone_str, "+0000") + time_val = calendar.timegm(time.strptime(str_time, RFC_2822_TIME_FMT)) + timesign = -int(timezone_str[0]+"1") # "+" -> time_val ahead of GMT + timezone_tuple = time.strptime(timezone_str[1:], "%H%M") + timezone = timezone_tuple.tm_hour*3600 + timezone_tuple.tm_min*60 + return time_val + timesign*timezone + +def handy_time(time_val): + return time.strftime("%a, %d %b %Y %H:%M", time.localtime(time_val)) + +def time_to_gmtime(str_time): + """Convert an RFC 2822-fomatted string to a GMT string. + >>> time_to_gmtime("Thu, 01 Jan 1970 00:00:00 -1000") + 'Thu, 01 Jan 1970 10:00:00 +0000' + """ + time_val = str_to_time(str_time) + return time_to_str(time_val) + +def iterable_full_of_strings(value, alternative=None): + """ + Require an iterable full of strings. + >>> iterable_full_of_strings([]) + True + >>> iterable_full_of_strings(["abc", "def", u"hij"]) + True + >>> iterable_full_of_strings(["abc", None, u"hij"]) + False + >>> iterable_full_of_strings(None, alternative=None) + True + """ + if value == alternative: + return True + elif not hasattr(value, "__iter__"): + return False + for x in value: + if type(x) not in types.StringTypes: + return False + return True + +if libbe.TESTING == True: + suite = doctest.DocTestSuite() |