diff options
author | Gianluca Montecchi <gian@grys.it> | 2009-10-01 23:39:28 +0200 |
---|---|---|
committer | Gianluca Montecchi <gian@grys.it> | 2009-10-01 23:39:28 +0200 |
commit | ed4a943875d81732bfa3127eb252c2db2e3588f4 (patch) | |
tree | 1fa7da00b01b8807adac3f3e3231fc8a78b3a6c7 /libbe | |
parent | e42729af0efc1a4c064e8875e728f9c3c1694a1d (diff) | |
download | bugseverywhere-ed4a943875d81732bfa3127eb252c2db2e3588f4.tar.gz |
Merged with head branch
Diffstat (limited to 'libbe')
-rw-r--r-- | libbe/arch.py | 295 | ||||
-rw-r--r-- | libbe/beuuid.py | 62 | ||||
-rw-r--r-- | libbe/bug.py | 527 | ||||
-rw-r--r-- | libbe/bugdir.py | 628 | ||||
-rw-r--r-- | libbe/bzr.py | 104 | ||||
-rw-r--r-- | libbe/cmdutil.py | 191 | ||||
-rw-r--r-- | libbe/comment.py | 538 | ||||
-rw-r--r-- | libbe/config.py | 84 | ||||
-rw-r--r-- | libbe/darcs.py | 162 | ||||
-rw-r--r-- | libbe/diff.py | 117 | ||||
-rw-r--r-- | libbe/editor.py | 103 | ||||
-rw-r--r-- | libbe/encoding.py | 53 | ||||
-rw-r--r-- | libbe/git.py | 111 | ||||
-rw-r--r-- | libbe/hg.py | 91 | ||||
-rw-r--r-- | libbe/mapfile.py | 128 | ||||
-rw-r--r-- | libbe/plugin.py | 72 | ||||
-rw-r--r-- | libbe/properties.py | 614 | ||||
-rw-r--r-- | libbe/rcs.py | 838 | ||||
-rw-r--r-- | libbe/settings_object.py | 411 | ||||
-rw-r--r-- | libbe/tree.py | 161 | ||||
-rw-r--r-- | libbe/utility.py | 93 |
21 files changed, 0 insertions, 5383 deletions
diff --git a/libbe/arch.py b/libbe/arch.py index 3051b34..e69de29 100644 --- a/libbe/arch.py +++ b/libbe/arch.py @@ -1,295 +0,0 @@ -# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. -# Ben Finney <ben+python@benfinney.id.au> -# James Rowe <jnrowe@ukfsn.org> -# W. Trevor King <wking@drexel.edu> -# <abentley@panoramicfeedback.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -import codecs -import os -import re -import shutil -import sys -import time -import unittest -import doctest - -import config -from beuuid import uuid_gen -import rcs -from rcs import RCS - -DEFAULT_CLIENT = "tla" - -client = config.get_val("arch_client", default=DEFAULT_CLIENT) - -def new(): - return Arch() - -class Arch(RCS): - name = "Arch" - client = client - versioned = True - _archive_name = None - _archive_dir = None - _tmp_archive = False - _project_name = None - _tmp_project = False - _arch_paramdir = os.path.expanduser("~/.arch-params") - def _rcs_help(self): - status,output,error = self._u_invoke_client("--help") - return output - def _rcs_detect(self, path): - """Detect whether a directory is revision-controlled using Arch""" - if self._u_search_parent_directories(path, "{arch}") != None : - config.set_val("arch_client", client) - return True - return False - def _rcs_init(self, path): - self._create_archive(path) - self._create_project(path) - self._add_project_code(path) - def _create_archive(self, path): - # Create a new archive - # http://regexps.srparish.net/tutorial-tla/new-archive.html#Creating_a_New_Archive - assert self._archive_name == None - id = self.get_user_id() - name, email = self._u_parse_id(id) - if email == None: - email = "%s@example.com" % name - trailer = "%s-%s" % ("bugs-everywhere-auto", uuid_gen()[0:8]) - self._archive_name = "%s--%s" % (email, trailer) - self._archive_dir = "/tmp/%s" % trailer - self._tmp_archive = True - self._u_invoke_client("make-archive", self._archive_name, - self._archive_dir, directory=path) - def _invoke_client(self, *args, **kwargs): - """ - Invoke the client on our archive. - """ - assert self._archive_name != None - command = args[0] - if len(args) > 1: - tailargs = args[1:] - else: - tailargs = [] - arglist = [command, "-A", self._archive_name] - arglist.extend(tailargs) - args = tuple(arglist) - return self._u_invoke_client(*args, **kwargs) - def _remove_archive(self): - assert self._tmp_archive == True - assert self._archive_dir != None - assert self._archive_name != None - os.remove(os.path.join(self._arch_paramdir, - "=locations", self._archive_name)) - shutil.rmtree(self._archive_dir) - self._tmp_archive = False - self._archive_dir = False - self._archive_name = False - def _create_project(self, path): - """ - Create a temporary Arch project in the directory PATH. This - project will be removed by - __del__->cleanup->_rcs_cleanup->_remove_project - """ - # http://mwolson.org/projects/GettingStartedWithArch.html - # http://regexps.srparish.net/tutorial-tla/new-project.html#Starting_a_New_Project - category = "bugs-everywhere" - branch = "mainline" - version = "0.1" - self._project_name = "%s--%s--%s" % (category, branch, version) - self._invoke_client("archive-setup", self._project_name, - directory=path) - self._tmp_project = True - def _remove_project(self): - assert self._tmp_project == True - assert self._project_name != None - assert self._archive_dir != None - shutil.rmtree(os.path.join(self._archive_dir, self._project_name)) - self._tmp_project = False - self._project_name = False - def _archive_project_name(self): - assert self._archive_name != None - assert self._project_name != None - return "%s/%s" % (self._archive_name, self._project_name) - def _adjust_naming_conventions(self, path): - """ - By default, Arch restricts source code filenames to - ^[_=a-zA-Z0-9].*$ - See - http://regexps.srparish.net/tutorial-tla/naming-conventions.html - Since our bug directory '.be' doesn't satisfy these conventions, - we need to adjust them. - - The conventions are specified in - project-root/{arch}/=tagging-method - """ - tagpath = os.path.join(path, "{arch}", "=tagging-method") - lines_out = [] - f = codecs.open(tagpath, "r", self.encoding) - for line in f: - if line.startswith("source "): - lines_out.append("source ^[._=a-zA-X0-9].*$\n") - else: - lines_out.append(line) - f.close() - f = codecs.open(tagpath, "w", self.encoding) - f.write("".join(lines_out)) - f.close() - - def _add_project_code(self, path): - # http://mwolson.org/projects/GettingStartedWithArch.html - # http://regexps.srparish.net/tutorial-tla/new-source.html - # http://regexps.srparish.net/tutorial-tla/importing-first.html - self._invoke_client("init-tree", self._project_name, - directory=path) - self._adjust_naming_conventions(path) - self._invoke_client("import", "--summary", "Began versioning", - directory=path) - def _rcs_cleanup(self): - if self._tmp_project == True: - self._remove_project() - if self._tmp_archive == True: - self._remove_archive() - - def _rcs_root(self, path): - if not os.path.isdir(path): - dirname = os.path.dirname(path) - else: - dirname = path - status,output,error = self._u_invoke_client("tree-root", dirname) - root = output.rstrip('\n') - - self._get_archive_project_name(root) - - return root - - def _get_archive_name(self, root): - status,output,error = self._u_invoke_client("archives") - lines = output.split('\n') - # e.g. output: - # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52 - # /tmp/BEtestXXXXXX/rootdir - # (+ repeats) - for archive,location in zip(lines[::2], lines[1::2]): - if os.path.realpath(location) == os.path.realpath(root): - self._archive_name = archive - assert self._archive_name != None - - def _get_archive_project_name(self, root): - # get project names - status,output,error = self._u_invoke_client("tree-version", directory=root) - # e.g output - # jdoe@example.com--bugs-everywhere-auto-2008.22.24.52/be--mainline--0.1 - archive_name,project_name = output.rstrip('\n').split('/') - self._archive_name = archive_name - self._project_name = project_name - def _rcs_get_user_id(self): - try: - status,output,error = self._u_invoke_client('my-id') - return output.rstrip('\n') - except Exception, e: - if 'no arch user id set' in e.args[0]: - return None - else: - raise - def _rcs_set_user_id(self, value): - self._u_invoke_client('my-id', value) - def _rcs_add(self, path): - self._u_invoke_client("add-id", path) - realpath = os.path.realpath(self._u_abspath(path)) - pathAdded = realpath in self._list_added(self.rootdir) - if self.paranoid and not pathAdded: - self._force_source(path) - def _list_added(self, root): - assert os.path.exists(root) - assert os.access(root, os.X_OK) - root = os.path.realpath(root) - status,output,error = self._u_invoke_client("inventory", "--source", - "--both", "--all", root) - inv_str = output.rstrip('\n') - return [os.path.join(root, p) for p in inv_str.split('\n')] - def _add_dir_rule(self, rule, dirname, root): - inv_path = os.path.join(dirname, '.arch-inventory') - f = codecs.open(inv_path, "a", self.encoding) - f.write(rule) - f.close() - if os.path.realpath(inv_path) not in self._list_added(root): - paranoid = self.paranoid - self.paranoid = False - self.add(inv_path) - self.paranoid = paranoid - def _force_source(self, path): - rule = "source %s\n" % self._u_rel_path(path) - self._add_dir_rule(rule, os.path.dirname(path), self.rootdir) - if os.path.realpath(path) not in self._list_added(self.rootdir): - raise CantAddFile(path) - def _rcs_remove(self, path): - if not '.arch-ids' in path: - self._u_invoke_client("delete-id", path) - def _rcs_update(self, path): - pass - def _rcs_get_file_contents(self, path, revision=None, binary=False): - if revision == None: - return RCS._rcs_get_file_contents(self, path, revision, binary=binary) - else: - status,output,error = \ - self._invoke_client("file-find", path, revision) - relpath = output.rstrip('\n') - abspath = os.path.join(self.rootdir, relpath) - f = codecs.open(abspath, "r", self.encoding) - contents = f.read() - f.close() - return contents - def _rcs_duplicate_repo(self, directory, revision=None): - if revision == None: - RCS._rcs_duplicate_repo(self, directory, revision) - else: - status,output,error = \ - self._u_invoke_client("get", revision,directory) - def _rcs_commit(self, commitfile): - summary,body = self._u_parse_commitfile(commitfile) - #status,output,error = self._invoke_client("make-log") - if body == None: - status,output,error \ - = self._u_invoke_client("commit","--summary",summary) - else: - status,output,error \ - = self._u_invoke_client("commit","--summary",summary, - "--log-message",body) - revision = None - revline = re.compile("[*] committed (.*)") - match = revline.search(output) - assert match != None, output+error - assert len(match.groups()) == 1 - revpath = match.groups()[0] - assert not " " in revpath, revpath - assert revpath.startswith(self._archive_project_name()+'--') - revision = revpath[len(self._archive_project_name()+'--'):] - return revpath - -class CantAddFile(Exception): - def __init__(self, file): - self.file = file - Exception.__init__(self, "Can't automatically add file %s" % file) - - - -rcs.make_rcs_testcase_subclasses(Arch, sys.modules[__name__]) - -unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) -suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/beuuid.py b/libbe/beuuid.py index de67cb7..e69de29 100644 --- a/libbe/beuuid.py +++ b/libbe/beuuid.py @@ -1,62 +0,0 @@ -# Copyright (C) 2008-2009 W. Trevor King <wking@drexel.edu> -# <abentley@panoramicfeedback.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -""" -Backwards compatibility support for Python 2.4. Once people give up -on 2.4 ;), the uuid call should be merged into bugdir.py -""" - -import unittest - -try: - from uuid import uuid4 # Python >= 2.5 - def uuid_gen(): - id = uuid4() - idstr = id.urn - start = "urn:uuid:" - assert idstr.startswith(start) - return idstr[len(start):] -except ImportError: - import os - import sys - from subprocess import Popen, PIPE - - def uuid_gen(): - # Shell-out to system uuidgen - args = ['uuidgen', 'r'] - try: - if sys.platform != "win32": - q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) - else: - # win32 don't have os.execvp() so have to run command in a shell - q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, - shell=True, cwd=cwd) - except OSError, e : - strerror = "%s\nwhile executing %s" % (e.args[1], args) - raise OSError, strerror - output, error = q.communicate() - status = q.wait() - if status != 0: - strerror = "%s\nwhile executing %s" % (status, args) - raise Exception, strerror - return output.rstrip('\n') - -class UUIDtestCase(unittest.TestCase): - def testUUID_gen(self): - id = uuid_gen() - self.failUnless(len(id) == 36, "invalid UUID '%s'" % id) - -suite = unittest.TestLoader().loadTestsFromTestCase(UUIDtestCase) diff --git a/libbe/bug.py b/libbe/bug.py index dfa49f2..e69de29 100644 --- a/libbe/bug.py +++ b/libbe/bug.py @@ -1,527 +0,0 @@ -# Copyright (C) 2008-2009 Chris Ball <cjb@laptop.org> -# Thomas Habets <thomas@habets.pp.se> -# W. Trevor King <wking@drexel.edu> -# <abentley@panoramicfeedback.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -import os -import os.path -import errno -import time -import types -import xml.sax.saxutils -import doctest - -from beuuid import uuid_gen -from properties import Property, doc_property, local_property, \ - defaulting_property, checked_property, cached_property, \ - primed_property, change_hook_property, settings_property -import settings_object -import mapfile -import comment -import utility - - -### Define and describe valid bug categories -# Use a tuple of (category, description) tuples since we don't have -# ordered dicts in Python yet http://www.python.org/dev/peps/pep-0372/ - -# in order of increasing severity. (name, description) pairs -severity_def = ( - ("wishlist","A feature that could improve usefulness, but not a bug."), - ("minor","The standard bug level."), - ("serious","A bug that requires workarounds."), - ("critical","A bug that prevents some features from working at all."), - ("fatal","A bug that makes the package unusable.")) - -# in order of increasing resolution -# roughly following http://www.bugzilla.org/docs/3.2/en/html/lifecycle.html -active_status_def = ( - ("unconfirmed","A possible bug which lacks independent existance confirmation."), - ("open","A working bug that has not been assigned to a developer."), - ("assigned","A working bug that has been assigned to a developer."), - ("test","The code has been adjusted, but the fix is still being tested.")) -inactive_status_def = ( - ("closed", "The bug is no longer relevant."), - ("fixed", "The bug should no longer occur."), - ("wontfix","It's not a bug, it's a feature.")) - - -### Convert the description tuples to more useful formats - -severity_values = () -severity_description = {} -severity_index = {} -def load_severities(severity_def): - global severity_values - global severity_description - global severity_index - if severity_def == settings_object.EMPTY: - return - severity_values = tuple([val for val,description in severity_def]) - severity_description = dict(severity_def) - severity_index = {} - for i,severity in enumerate(severity_values): - severity_index[severity] = i -load_severities(severity_def) - -active_status_values = [] -inactive_status_values = [] -status_values = [] -status_description = {} -status_index = {} -def load_status(active_status_def, inactive_status_def): - global active_status_values - global inactive_status_values - global status_values - global status_description - global status_index - if active_status_def == settings_object.EMPTY: - active_status_def = globals()["active_status_def"] - if inactive_status_def == settings_object.EMPTY: - inactive_status_def = globals()["inactive_status_def"] - active_status_values = tuple([val for val,description in active_status_def]) - inactive_status_values = tuple([val for val,description in inactive_status_def]) - status_values = active_status_values + inactive_status_values - status_description = dict(tuple(active_status_def) + tuple(inactive_status_def)) - status_index = {} - for i,status in enumerate(status_values): - status_index[status] = i -load_status(active_status_def, inactive_status_def) - - -class Bug(settings_object.SavedSettingsObject): - """ - >>> b = Bug() - >>> print b.status - open - >>> print b.severity - minor - - There are two formats for time, int and string. Setting either - one will adjust the other appropriately. The string form is the - one stored in the bug's settings file on disk. - >>> print type(b.time) - <type 'int'> - >>> print type(b.time_string) - <type 'str'> - >>> b.time = 0 - >>> print b.time_string - Thu, 01 Jan 1970 00:00:00 +0000 - >>> b.time_string="Thu, 01 Jan 1970 00:01:00 +0000" - >>> b.time - 60 - >>> print b.settings["time"] - Thu, 01 Jan 1970 00:01:00 +0000 - """ - settings_properties = [] - required_saved_properties = [] - _prop_save_settings = settings_object.prop_save_settings - _prop_load_settings = settings_object.prop_load_settings - def _versioned_property(settings_properties=settings_properties, - required_saved_properties=required_saved_properties, - **kwargs): - if "settings_properties" not in kwargs: - kwargs["settings_properties"] = settings_properties - if "required_saved_properties" not in kwargs: - kwargs["required_saved_properties"]=required_saved_properties - return settings_object.versioned_property(**kwargs) - - @_versioned_property(name="severity", - doc="A measure of the bug's importance", - default="minor", - check_fn=lambda s: s in severity_values, - require_save=True) - def severity(): return {} - - @_versioned_property(name="status", - doc="The bug's current status", - default="open", - check_fn=lambda s: s in status_values, - require_save=True) - def status(): return {} - - @property - def active(self): - return self.status in active_status_values - - @_versioned_property(name="target", - doc="The deadline for fixing this bug") - def target(): return {} - - @_versioned_property(name="creator", - doc="The user who entered the bug into the system") - def creator(): return {} - - @_versioned_property(name="reporter", - doc="The user who reported the bug") - def reporter(): return {} - - @_versioned_property(name="assigned", - doc="The developer in charge of the bug") - def assigned(): return {} - - @_versioned_property(name="time", - doc="An RFC 2822 timestamp for bug creation") - def time_string(): return {} - - def _get_time(self): - if self.time_string == None or self.time_string == settings_object.EMPTY: - return None - return utility.str_to_time(self.time_string) - def _set_time(self, value): - self.time_string = utility.time_to_str(value) - time = property(fget=_get_time, - fset=_set_time, - doc="An integer version of .time_string") - - def _extra_strings_check_fn(value): - "Require an iterable full of strings" - if value == settings_object.EMPTY: - return True - elif not hasattr(value, "__iter__"): - return False - for x in value: - if type(x) not in types.StringTypes: - return False - return True - def _extra_strings_change_hook(self, old, new): - self.extra_strings.sort() # to make merging easier - self._prop_save_settings(old, new) - @_versioned_property(name="extra_strings", - doc="Space for an array of extra strings. Useful for storing state for functionality implemented purely in becommands/<some_function>.py.", - default=[], - check_fn=_extra_strings_check_fn, - change_hook=_extra_strings_change_hook, - mutable=True) - def extra_strings(): return {} - - @_versioned_property(name="summary", - doc="A one-line bug description") - def summary(): return {} - - def _get_comment_root(self, load_full=False): - if self.sync_with_disk: - return comment.loadComments(self, load_full=load_full) - else: - return comment.Comment(self, uuid=comment.INVALID_UUID) - - @Property - @cached_property(generator=_get_comment_root) - @local_property("comment_root") - @doc_property(doc="The trunk of the comment tree") - def comment_root(): return {} - - def _get_rcs(self): - if hasattr(self.bugdir, "rcs"): - return self.bugdir.rcs - - @Property - @cached_property(generator=_get_rcs) - @local_property("rcs") - @doc_property(doc="A revision control system instance.") - def rcs(): return {} - - def __init__(self, bugdir=None, uuid=None, from_disk=False, - load_comments=False, summary=None): - settings_object.SavedSettingsObject.__init__(self) - self.bugdir = bugdir - self.uuid = uuid - if from_disk == True: - self.sync_with_disk = True - else: - self.sync_with_disk = False - if uuid == None: - self.uuid = uuid_gen() - self.time = int(time.time()) # only save to second precision - if self.rcs != None: - self.creator = self.rcs.get_user_id() - self.summary = summary - - def __repr__(self): - return "Bug(uuid=%r)" % self.uuid - - def _setting_attr_string(self, setting): - value = getattr(self, setting) - if value == settings_object.EMPTY: - return "" - else: - return str(value) - - def xml(self, show_comments=False): - if self.bugdir == None: - shortname = self.uuid - else: - shortname = self.bugdir.bug_shortname(self) - - if self.time == None: - timestring = "" - else: - timestring = utility.time_to_str(self.time) - - info = [("uuid", self.uuid), - ("short-name", shortname), - ("severity", self.severity), - ("status", self.status), - ("assigned", self.assigned), - ("target", self.target), - ("reporter", self.reporter), - ("creator", self.creator), - ("created", timestring), - ("summary", self.summary)] - ret = '<bug>\n' - for (k,v) in info: - if v is not settings_object.EMPTY: - ret += ' <%s>%s</%s>\n' % (k,xml.sax.saxutils.escape(v),k) - for estr in self.extra_strings: - ret += ' <extra-string>%s</extra-string>\n' % estr - if show_comments == True: - comout = self.comment_root.xml_thread(auto_name_map=True, - bug_shortname=shortname) - if len(comout) > 0: - ret += comout+'\n' - ret += '</bug>' - return ret - - def string(self, shortlist=False, show_comments=False): - if self.bugdir == None: - shortname = self.uuid - else: - shortname = self.bugdir.bug_shortname(self) - if shortlist == False: - if self.time == None: - timestring = "" - else: - htime = utility.handy_time(self.time) - timestring = "%s (%s)" % (htime, self.time_string) - info = [("ID", self.uuid), - ("Short name", shortname), - ("Severity", self.severity), - ("Status", self.status), - ("Assigned", self._setting_attr_string("assigned")), - ("Target", self._setting_attr_string("target")), - ("Reporter", self._setting_attr_string("reporter")), - ("Creator", self._setting_attr_string("creator")), - ("Created", timestring)] - longest_key_len = max([len(k) for k,v in info]) - infolines = [" %*s : %s\n" %(longest_key_len,k,v) for k,v in info] - bugout = "".join(infolines) + "%s" % self.summary.rstrip('\n') - else: - statuschar = self.status[0] - severitychar = self.severity[0] - chars = "%c%c" % (statuschar, severitychar) - bugout = "%s:%s: %s" % (shortname,chars,self.summary.rstrip('\n')) - - if show_comments == True: - # take advantage of the string_thread(auto_name_map=True) - # SIDE-EFFECT of sorting by comment time. - comout = self.comment_root.string_thread(flatten=False, - auto_name_map=True, - bug_shortname=shortname) - output = bugout + '\n' + comout.rstrip('\n') - else : - output = bugout - return output - - def __str__(self): - return self.string(shortlist=True) - - def __cmp__(self, other): - return cmp_full(self, other) - - def get_path(self, name=None): - my_dir = os.path.join(self.bugdir.get_path("bugs"), self.uuid) - if name is None: - return my_dir - assert name in ["values", "comments"] - return os.path.join(my_dir, name) - - def load_settings(self): - self.settings = mapfile.map_load(self.rcs, self.get_path("values")) - self._setup_saved_settings() - - def load_comments(self, load_full=True): - if load_full == True: - # Force a complete load of the whole comment tree - self.comment_root = self._get_comment_root(load_full=True) - else: - # Setup for fresh lazy-loading. Clear _comment_root, so - # _get_comment_root returns a fresh version. Turn of - # syncing temporarily so we don't write our blank comment - # tree to disk. - self.sync_with_disk = False - self.comment_root = None - self.sync_with_disk = True - - def save_settings(self): - assert self.summary != None, "Can't save blank bug" - - self.rcs.mkdir(self.get_path()) - path = self.get_path("values") - mapfile.map_save(self.rcs, path, self._get_saved_settings()) - - def save(self): - self.save_settings() - - if len(self.comment_root) > 0: - self.rcs.mkdir(self.get_path("comments")) - comment.saveComments(self) - - def remove(self): - self.comment_root.remove() - path = self.get_path() - self.rcs.recursive_remove(path) - - def comments(self): - for comment in self.comment_root.traverse(): - yield comment - - def new_comment(self, body=None): - comm = self.comment_root.new_reply(body=body) - return comm - - def comment_from_shortname(self, shortname, *args, **kwargs): - return self.comment_root.comment_from_shortname(shortname, - *args, **kwargs) - - def comment_from_uuid(self, uuid): - return self.comment_root.comment_from_uuid(uuid) - - def comment_shortnames(self, shortname=None): - """ - SIDE-EFFECT : Comment.comment_shortnames will sort the comment - tree by comment.time - """ - for id, comment in self.comment_root.comment_shortnames(shortname): - yield (id, comment) - - -# The general rule for bug sorting is that "more important" bugs are -# less than "less important" bugs. This way sorting a list of bugs -# will put the most important bugs first in the list. When relative -# importance is unclear, the sorting follows some arbitrary convention -# (i.e. dictionary order). - -def cmp_severity(bug_1, bug_2): - """ - Compare the severity levels of two bugs, with more severe bugs - comparing as less. - >>> bugA = Bug() - >>> bugB = Bug() - >>> bugA.severity = bugB.severity = "wishlist" - >>> cmp_severity(bugA, bugB) == 0 - True - >>> bugB.severity = "minor" - >>> cmp_severity(bugA, bugB) > 0 - True - >>> bugA.severity = "critical" - >>> cmp_severity(bugA, bugB) < 0 - True - """ - if not hasattr(bug_2, "severity") : - return 1 - return -cmp(severity_index[bug_1.severity], severity_index[bug_2.severity]) - -def cmp_status(bug_1, bug_2): - """ - Compare the status levels of two bugs, with more 'open' bugs - comparing as less. - >>> bugA = Bug() - >>> bugB = Bug() - >>> bugA.status = bugB.status = "open" - >>> cmp_status(bugA, bugB) == 0 - True - >>> bugB.status = "closed" - >>> cmp_status(bugA, bugB) < 0 - True - >>> bugA.status = "fixed" - >>> cmp_status(bugA, bugB) > 0 - True - """ - if not hasattr(bug_2, "status") : - return 1 - val_2 = status_index[bug_2.status] - return cmp(status_index[bug_1.status], status_index[bug_2.status]) - -def cmp_attr(bug_1, bug_2, attr, invert=False): - """ - Compare a general attribute between two bugs using the conventional - comparison rule for that attribute type. If invert == True, sort - *against* that convention. - >>> attr="severity" - >>> bugA = Bug() - >>> bugB = Bug() - >>> bugA.severity = "critical" - >>> bugB.severity = "wishlist" - >>> cmp_attr(bugA, bugB, attr) < 0 - True - >>> cmp_attr(bugA, bugB, attr, invert=True) > 0 - True - >>> bugB.severity = "critical" - >>> cmp_attr(bugA, bugB, attr) == 0 - True - """ - if not hasattr(bug_2, attr) : - return 1 - val_1 = getattr(bug_1, attr) - val_2 = getattr(bug_2, attr) - if val_1 == settings_object.EMPTY: val_1 = None - if val_2 == settings_object.EMPTY: val_2 = None - - if invert == True : - return -cmp(val_1, val_2) - else : - return cmp(val_1, val_2) - -# alphabetical rankings (a < z) -cmp_creator = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "creator") -cmp_assigned = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "assigned") -# chronological rankings (newer < older) -cmp_time = lambda bug_1, bug_2 : cmp_attr(bug_1, bug_2, "time", invert=True) - -DEFAULT_CMP_FULL_CMP_LIST = \ - (cmp_status,cmp_severity,cmp_assigned,cmp_time,cmp_creator) - -class BugCompoundComparator (object): - def __init__(self, cmp_list=DEFAULT_CMP_FULL_CMP_LIST): - self.cmp_list = cmp_list - def __call__(self, bug_1, bug_2): - for comparison in self.cmp_list : - val = comparison(bug_1, bug_2) - if val != 0 : - return val - return 0 - -cmp_full = BugCompoundComparator() - - -# define some bonus cmp_* functions -def cmp_last_modified(bug_1, bug_2): - """ - Like cmp_time(), but use most recent comment instead of bug - creation for the timestamp. - """ - def last_modified(bug): - time = bug.time - for comment in bug.comment_root.traverse(): - if comment.time > time: - time = comment.time - return time - val_1 = last_modified(bug_1) - val_2 = last_modified(bug_2) - return -cmp(val_1, val_2) - - -suite = doctest.DocTestSuite() diff --git a/libbe/bugdir.py b/libbe/bugdir.py index d4a39cb..e69de29 100644 --- a/libbe/bugdir.py +++ b/libbe/bugdir.py @@ -1,628 +0,0 @@ -# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. -# Alexander Belchenko <bialix@ukr.net> -# Chris Ball <cjb@laptop.org> -# Oleg Romanyshyn <oromanyshyn@panoramicfeedback.com> -# W. Trevor King <wking@drexel.edu> -# <abentley@panoramicfeedback.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -import os -import os.path -import errno -import time -import copy -import unittest -import doctest - -from properties import Property, doc_property, local_property, \ - defaulting_property, checked_property, fn_checked_property, \ - cached_property, primed_property, change_hook_property, \ - settings_property -import settings_object -import mapfile -import bug -import rcs -import encoding -import utility - - -class NoBugDir(Exception): - def __init__(self, path): - msg = "The directory \"%s\" has no bug directory." % path - Exception.__init__(self, msg) - self.path = path - -class NoRootEntry(Exception): - def __init__(self, path): - self.path = path - Exception.__init__(self, "Specified root does not exist: %s" % path) - -class AlreadyInitialized(Exception): - def __init__(self, path): - self.path = path - Exception.__init__(self, - "Specified root is already initialized: %s" % path) - -class MultipleBugMatches(ValueError): - def __init__(self, shortname, matches): - msg = ("More than one bug matches %s. " - "Please be more specific.\n%s" % (shortname, matches)) - ValueError.__init__(self, msg) - self.shortname = shortname - self.matches = matches - - -TREE_VERSION_STRING = "Bugs Everywhere Tree 1 0\n" - - -class BugDir (list, settings_object.SavedSettingsObject): - """ - Sink to existing root - ====================== - - Consider the following usage case: - You have a bug directory rooted in - /path/to/source - by which I mean the '.be' directory is at - /path/to/source/.be - However, you're of in some subdirectory like - /path/to/source/GUI/testing - and you want to comment on a bug. Setting sink_to_root=True wen - you initialize your BugDir will cause it to search for the '.be' - file in the ancestors of the path you passed in as 'root'. - /path/to/source/GUI/testing/.be miss - /path/to/source/GUI/.be miss - /path/to/source/.be hit! - So it still roots itself appropriately without much work for you. - - File-system access - ================== - - When rooted in non-bugdir directory, BugDirs live completely in - memory until the first call to .save(). This creates a '.be' - sub-directory containing configurations options, bugs, comments, - etc. Once this sub-directory has been created (possibly by - another BugDir instance) any changes to the BugDir in memory will - be flushed to the file system automatically. However, the BugDir - will only load information from the file system when it loads new - bugs/comments that it doesn't already have in memory, or when it - explicitly asked to do so (e.g. .load() or __init__(from_disk=True)). - - Allow RCS initialization - ======================== - - This one is for testing purposes. Setting it to True allows the - BugDir to search for an installed RCS backend and initialize it in - the root directory. This is a convenience option for supporting - tests of versioning functionality (e.g. .duplicate_bugdir). - - Disable encoding manipulation - ============================= - - This one is for testing purposed. You might have non-ASCII - Unicode in your bugs, comments, files, etc. BugDir instances try - and support your preferred encoding scheme (e.g. "utf-8") when - dealing with stream and file input/output. For stream output, - this involves replacing sys.stdout and sys.stderr - (libbe.encode.set_IO_stream_encodings). However this messes up - doctest's output catching. In order to support doctest tests - using BugDirs, set manipulate_encodings=False, and stick to ASCII - in your tests. - """ - - settings_properties = [] - required_saved_properties = [] - _prop_save_settings = settings_object.prop_save_settings - _prop_load_settings = settings_object.prop_load_settings - def _versioned_property(settings_properties=settings_properties, - required_saved_properties=required_saved_properties, - **kwargs): - if "settings_properties" not in kwargs: - kwargs["settings_properties"] = settings_properties - if "required_saved_properties" not in kwargs: - kwargs["required_saved_properties"]=required_saved_properties - return settings_object.versioned_property(**kwargs) - - @_versioned_property(name="target", - doc="The current project development target") - def target(): return {} - - def _guess_encoding(self): - return encoding.get_encoding() - def _check_encoding(value): - if value != None and value != settings_object.EMPTY: - return encoding.known_encoding(value) - def _setup_encoding(self, new_encoding): - if new_encoding != None and new_encoding != settings_object.EMPTY: - if self._manipulate_encodings == True: - encoding.set_IO_stream_encodings(new_encoding) - def _set_encoding(self, old_encoding, new_encoding): - self._setup_encoding(new_encoding) - self._prop_save_settings(old_encoding, new_encoding) - - @_versioned_property(name="encoding", - doc="""The default input/output encoding to use (e.g. "utf-8").""", - change_hook=_set_encoding, - generator=_guess_encoding, - check_fn=_check_encoding) - def encoding(): return {} - - def _setup_user_id(self, user_id): - self.rcs.user_id = user_id - def _guess_user_id(self): - return self.rcs.get_user_id() - def _set_user_id(self, old_user_id, new_user_id): - self._setup_user_id(new_user_id) - self._prop_save_settings(old_user_id, new_user_id) - - @_versioned_property(name="user_id", - doc= -"""The user's prefered name, e.g. 'John Doe <jdoe@example.com>'. Note -that the Arch RCS backend *enforces* ids with this format.""", - change_hook=_set_user_id, - generator=_guess_user_id) - def user_id(): return {} - - @_versioned_property(name="default_assignee", - doc= -"""The default assignee for new bugs e.g. 'John Doe <jdoe@example.com>'.""") - def default_assignee(): return {} - - @_versioned_property(name="rcs_name", - doc="""The name of the current RCS. Kept seperate to make saving/loading -settings easy. Don't set this attribute. Set .rcs instead, and -.rcs_name will be automatically adjusted.""", - default="None", - allowed=["None", "Arch", "bzr", "darcs", "git", "hg"]) - def rcs_name(): return {} - - def _get_rcs(self, rcs_name=None): - """Get and root a new revision control system""" - if rcs_name == None: - rcs_name = self.rcs_name - new_rcs = rcs.rcs_by_name(rcs_name) - self._change_rcs(None, new_rcs) - return new_rcs - def _change_rcs(self, old_rcs, new_rcs): - new_rcs.encoding = self.encoding - new_rcs.root(self.root) - self.rcs_name = new_rcs.name - - @Property - @change_hook_property(hook=_change_rcs) - @cached_property(generator=_get_rcs) - @local_property("rcs") - @doc_property(doc="A revision control system instance.") - def rcs(): return {} - - def _bug_map_gen(self): - map = {} - for bug in self: - map[bug.uuid] = bug - for uuid in self.list_uuids(): - if uuid not in map: - map[uuid] = None - self._bug_map_value = map # ._bug_map_value used by @local_property - - @Property - @primed_property(primer=_bug_map_gen) - @local_property("bug_map") - @doc_property(doc="A dict of (bug-uuid, bug-instance) pairs.") - def _bug_map(): return {} - - def _setup_severities(self, severities): - if severities != None and severities != settings_object.EMPTY: - bug.load_severities(severities) - def _set_severities(self, old_severities, new_severities): - self._setup_severities(new_severities) - self._prop_save_settings(old_severities, new_severities) - @_versioned_property(name="severities", - doc="The allowed bug severities and their descriptions.", - change_hook=_set_severities) - def severities(): return {} - - def _setup_status(self, active_status, inactive_status): - bug.load_status(active_status, inactive_status) - def _set_active_status(self, old_active_status, new_active_status): - self._setup_status(new_active_status, self.inactive_status) - self._prop_save_settings(old_active_status, new_active_status) - @_versioned_property(name="active_status", - doc="The allowed active bug states and their descriptions.", - change_hook=_set_active_status) - def active_status(): return {} - - def _set_inactive_status(self, old_inactive_status, new_inactive_status): - self._setup_status(self.active_status, new_inactive_status) - self._prop_save_settings(old_inactive_status, new_inactive_status) - @_versioned_property(name="inactive_status", - doc="The allowed inactive bug states and their descriptions.", - change_hook=_set_inactive_status) - def inactive_status(): return {} - - - def __init__(self, root=None, sink_to_existing_root=True, - assert_new_BugDir=False, allow_rcs_init=False, - manipulate_encodings=True, - from_disk=False, rcs=None): - list.__init__(self) - settings_object.SavedSettingsObject.__init__(self) - self._manipulate_encodings = manipulate_encodings - if root == None: - root = os.getcwd() - if sink_to_existing_root == True: - self.root = self._find_root(root) - else: - if not os.path.exists(root): - raise NoRootEntry(root) - self.root = root - # get a temporary rcs until we've loaded settings - self.sync_with_disk = False - self.rcs = self._guess_rcs() - - if from_disk == True: - self.sync_with_disk = True - self.load() - else: - self.sync_with_disk = False - if assert_new_BugDir == True: - if os.path.exists(self.get_path()): - raise AlreadyInitialized, self.get_path() - if rcs == None: - rcs = self._guess_rcs(allow_rcs_init) - self.rcs = rcs - self._setup_user_id(self.user_id) - - def _find_root(self, path): - """ - Search for an existing bug database dir and it's ancestors and - return a BugDir rooted there. - """ - if not os.path.exists(path): - raise NoRootEntry(path) - versionfile=utility.search_parent_directories(path, - os.path.join(".be", "version")) - if versionfile != None: - beroot = os.path.dirname(versionfile) - root = os.path.dirname(beroot) - return root - else: - beroot = utility.search_parent_directories(path, ".be") - if beroot == None: - raise NoBugDir(path) - return beroot - - def get_version(self, path=None, use_none_rcs=False): - if use_none_rcs == True: - RCS = rcs.rcs_by_name("None") - RCS.root(self.root) - RCS.encoding = encoding.get_encoding() - else: - RCS = self.rcs - - if path == None: - path = self.get_path("version") - tree_version = RCS.get_file_contents(path) - return tree_version - - def set_version(self): - self.rcs.set_file_contents(self.get_path("version"), - TREE_VERSION_STRING) - - def get_path(self, *args): - my_dir = os.path.join(self.root, ".be") - if len(args) == 0: - return my_dir - assert args[0] in ["version", "settings", "bugs"], str(args) - return os.path.join(my_dir, *args) - - def _guess_rcs(self, allow_rcs_init=False): - deepdir = self.get_path() - if not os.path.exists(deepdir): - deepdir = os.path.dirname(deepdir) - new_rcs = rcs.detect_rcs(deepdir) - install = False - if new_rcs.name == "None": - if allow_rcs_init == True: - new_rcs = rcs.installed_rcs() - new_rcs.init(self.root) - return new_rcs - - def load(self): - version = self.get_version(use_none_rcs=True) - if version != TREE_VERSION_STRING: - raise NotImplementedError, \ - "BugDir cannot handle version '%s' yet." % version - else: - if not os.path.exists(self.get_path()): - raise NoBugDir(self.get_path()) - self.load_settings() - - self.rcs = rcs.rcs_by_name(self.rcs_name) - self._setup_user_id(self.user_id) - - def load_all_bugs(self): - "Warning: this could take a while." - self._clear_bugs() - for uuid in self.list_uuids(): - self._load_bug(uuid) - - def save(self): - self.rcs.mkdir(self.get_path()) - self.set_version() - self.save_settings() - self.rcs.mkdir(self.get_path("bugs")) - for bug in self: - bug.save() - - def load_settings(self): - self.settings = self._get_settings(self.get_path("settings")) - self._setup_saved_settings() - self._setup_user_id(self.user_id) - self._setup_encoding(self.encoding) - self._setup_severities(self.severities) - self._setup_status(self.active_status, self.inactive_status) - - def _get_settings(self, settings_path): - allow_no_rcs = not self.rcs.path_in_root(settings_path) - # allow_no_rcs=True should only be for the special case of - # configuring duplicate bugdir settings - - try: - settings = mapfile.map_load(self.rcs, settings_path, allow_no_rcs) - except rcs.NoSuchFile: - settings = {"rcs_name": "None"} - return settings - - def save_settings(self): - settings = self._get_saved_settings() - self._save_settings(self.get_path("settings"), settings) - - def _save_settings(self, settings_path, settings): - allow_no_rcs = not self.rcs.path_in_root(settings_path) - # allow_no_rcs=True should only be for the special case of - # configuring duplicate bugdir settings - mapfile.map_save(self.rcs, settings_path, settings, allow_no_rcs) - - def duplicate_bugdir(self, revision): - duplicate_path = self.rcs.duplicate_repo(revision) - - # setup revision RCS as None, since the duplicate may not be - # initialized for versioning - duplicate_settings_path = os.path.join(duplicate_path, - ".be", "settings") - duplicate_settings = self._get_settings(duplicate_settings_path) - if "rcs_name" in duplicate_settings: - duplicate_settings["rcs_name"] = "None" - duplicate_settings["user_id"] = self.user_id - if "disabled" in bug.status_values: - # Hack to support old versions of BE bugs - duplicate_settings["inactive_status"] = self.inactive_status - self._save_settings(duplicate_settings_path, duplicate_settings) - - return BugDir(duplicate_path, from_disk=True, manipulate_encodings=self._manipulate_encodings) - - def remove_duplicate_bugdir(self): - self.rcs.remove_duplicate_repo() - - def list_uuids(self): - uuids = [] - if os.path.exists(self.get_path()): - # list the uuids on disk - for uuid in os.listdir(self.get_path("bugs")): - if not (uuid.startswith('.')): - uuids.append(uuid) - yield uuid - # and the ones that are still just in memory - for bug in self: - if bug.uuid not in uuids: - uuids.append(bug.uuid) - yield bug.uuid - - def _clear_bugs(self): - while len(self) > 0: - self.pop() - self._bug_map_gen() - - def _load_bug(self, uuid): - bg = bug.Bug(bugdir=self, uuid=uuid, from_disk=True) - self.append(bg) - self._bug_map_gen() - return bg - - def new_bug(self, uuid=None, summary=None): - bg = bug.Bug(bugdir=self, uuid=uuid, summary=summary) - self.append(bg) - self._bug_map_gen() - return bg - - def remove_bug(self, bug): - self.remove(bug) - bug.remove() - - def bug_shortname(self, bug): - """ - Generate short names from uuids. Picks the minimum number of - characters (>=3) from the beginning of the uuid such that the - short names are unique. - - Obviously, as the number of bugs in the database grows, these - short names will cease to be unique. The complete uuid should be - used for long term reference. - """ - chars = 3 - for uuid in self._bug_map.keys(): - if bug.uuid == uuid: - continue - while (bug.uuid[:chars] == uuid[:chars]): - chars+=1 - return bug.uuid[:chars] - - def bug_from_shortname(self, shortname): - """ - >>> bd = simple_bug_dir() - >>> bug_a = bd.bug_from_shortname('a') - >>> print type(bug_a) - <class 'libbe.bug.Bug'> - >>> print bug_a - a:om: Bug A - """ - matches = [] - self._bug_map_gen() - for uuid in self._bug_map.keys(): - if uuid.startswith(shortname): - matches.append(uuid) - if len(matches) > 1: - raise MultipleBugMatches(shortname, matches) - if len(matches) == 1: - return self.bug_from_uuid(matches[0]) - raise KeyError("No bug matches %s" % shortname) - - def bug_from_uuid(self, uuid): - if not self.has_bug(uuid): - raise KeyError("No bug matches %s\n bug map: %s\n root: %s" \ - % (uuid, self._bug_map, self.root)) - if self._bug_map[uuid] == None: - self._load_bug(uuid) - return self._bug_map[uuid] - - def has_bug(self, bug_uuid): - if bug_uuid not in self._bug_map: - self._bug_map_gen() - if bug_uuid not in self._bug_map: - return False - return True - - -def simple_bug_dir(): - """ - For testing - >>> bugdir = simple_bug_dir() - >>> ls = list(bugdir.list_uuids()) - >>> ls.sort() - >>> print ls - ['a', 'b'] - """ - dir = utility.Dir() - assert os.path.exists(dir.path) - bugdir = BugDir(dir.path, sink_to_existing_root=False, allow_rcs_init=True, - manipulate_encodings=False) - bugdir._dir_ref = dir # postpone cleanup since dir.__del__() removes dir. - bug_a = bugdir.new_bug("a", summary="Bug A") - bug_a.creator = "John Doe <jdoe@example.com>" - bug_a.time = 0 - bug_b = bugdir.new_bug("b", summary="Bug B") - bug_b.creator = "Jane Doe <jdoe@example.com>" - bug_b.time = 0 - bug_b.status = "closed" - bugdir.save() - return bugdir - - -class BugDirTestCase(unittest.TestCase): - def __init__(self, *args, **kwargs): - unittest.TestCase.__init__(self, *args, **kwargs) - def setUp(self): - self.dir = utility.Dir() - self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False, - allow_rcs_init=True) - self.rcs = self.bugdir.rcs - def tearDown(self): - self.rcs.cleanup() - self.dir.cleanup() - def fullPath(self, path): - return os.path.join(self.dir.path, path) - def assertPathExists(self, path): - fullpath = self.fullPath(path) - self.failUnless(os.path.exists(fullpath)==True, - "path %s does not exist" % fullpath) - self.assertRaises(AlreadyInitialized, BugDir, - self.dir.path, assertNewBugDir=True) - def versionTest(self): - if self.rcs.versioned == False: - return - original = self.bugdir.rcs.commit("Began versioning") - bugA = self.bugdir.bug_from_uuid("a") - bugA.status = "fixed" - self.bugdir.save() - new = self.rcs.commit("Fixed bug a") - dupdir = self.bugdir.duplicate_bugdir(original) - self.failUnless(dupdir.root != self.bugdir.root, - "%s, %s" % (dupdir.root, self.bugdir.root)) - bugAorig = dupdir.bug_from_uuid("a") - self.failUnless(bugA != bugAorig, - "\n%s\n%s" % (bugA.string(), bugAorig.string())) - bugAorig.status = "fixed" - self.failUnless(bug.cmp_status(bugA, bugAorig)==0, - "%s, %s" % (bugA.status, bugAorig.status)) - self.failUnless(bug.cmp_severity(bugA, bugAorig)==0, - "%s, %s" % (bugA.severity, bugAorig.severity)) - self.failUnless(bug.cmp_assigned(bugA, bugAorig)==0, - "%s, %s" % (bugA.assigned, bugAorig.assigned)) - self.failUnless(bug.cmp_time(bugA, bugAorig)==0, - "%s, %s" % (bugA.time, bugAorig.time)) - self.failUnless(bug.cmp_creator(bugA, bugAorig)==0, - "%s, %s" % (bugA.creator, bugAorig.creator)) - self.failUnless(bugA == bugAorig, - "\n%s\n%s" % (bugA.string(), bugAorig.string())) - self.bugdir.remove_duplicate_bugdir() - self.failUnless(os.path.exists(dupdir.root)==False, str(dupdir.root)) - def testRun(self): - self.bugdir.new_bug(uuid="a", summary="Ant") - self.bugdir.new_bug(uuid="b", summary="Cockroach") - self.bugdir.new_bug(uuid="c", summary="Praying mantis") - length = len(self.bugdir) - self.failUnless(length == 3, "%d != 3 bugs" % length) - uuids = list(self.bugdir.list_uuids()) - self.failUnless(len(uuids) == 3, "%d != 3 uuids" % len(uuids)) - self.failUnless(uuids == ["a","b","c"], str(uuids)) - bugA = self.bugdir.bug_from_uuid("a") - bugAprime = self.bugdir.bug_from_shortname("a") - self.failUnless(bugA == bugAprime, "%s != %s" % (bugA, bugAprime)) - self.bugdir.save() - self.versionTest() - def testComments(self): - self.bugdir.new_bug(uuid="a", summary="Ant") - bug = self.bugdir.bug_from_uuid("a") - comm = bug.comment_root - rep = comm.new_reply("Ants are small.") - rep.new_reply("And they have six legs.") - self.bugdir.save() - self.bugdir._clear_bugs() - bug = self.bugdir.bug_from_uuid("a") - bug.load_comments() - self.failUnless(len(bug.comment_root)==1, len(bug.comment_root)) - for index,comment in enumerate(bug.comments()): - if index == 0: - repLoaded = comment - self.failUnless(repLoaded.uuid == rep.uuid, repLoaded.uuid) - self.failUnless(comment.sync_with_disk == True, - comment.sync_with_disk) - #load_settings() - self.failUnless(comment.content_type == "text/plain", - comment.content_type) - self.failUnless(repLoaded.settings["Content-type"]=="text/plain", - repLoaded.settings) - self.failUnless(repLoaded.body == "Ants are small.", - repLoaded.body) - elif index == 1: - self.failUnless(comment.in_reply_to == repLoaded.uuid, - repLoaded.uuid) - self.failUnless(comment.body == "And they have six legs.", - comment.body) - else: - self.failIf(True, "Invalid comment: %d\n%s" % (index, comment)) - -unitsuite = unittest.TestLoader().loadTestsFromTestCase(BugDirTestCase) -suite = unittest.TestSuite([unitsuite])#, doctest.DocTestSuite()]) diff --git a/libbe/bzr.py b/libbe/bzr.py index d73392a..e69de29 100644 --- a/libbe/bzr.py +++ b/libbe/bzr.py @@ -1,104 +0,0 @@ -# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. -# Ben Finney <ben+python@benfinney.id.au> -# Marien Zwart <marienz@gentoo.org> -# W. Trevor King <wking@drexel.edu> -# <abentley@panoramicfeedback.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -import os -import re -import sys -import unittest -import doctest - -import rcs -from rcs import RCS - -def new(): - return Bzr() - -class Bzr(RCS): - name = "bzr" - client = "bzr" - versioned = True - def _rcs_help(self): - status,output,error = self._u_invoke_client("--help") - return output - def _rcs_detect(self, path): - if self._u_search_parent_directories(path, ".bzr") != None : - return True - return False - def _rcs_root(self, path): - """Find the root of the deepest repository containing path.""" - status,output,error = self._u_invoke_client("root", path) - return output.rstrip('\n') - def _rcs_init(self, path): - self._u_invoke_client("init", directory=path) - def _rcs_get_user_id(self): - status,output,error = self._u_invoke_client("whoami") - return output.rstrip('\n') - def _rcs_set_user_id(self, value): - self._u_invoke_client("whoami", value) - def _rcs_add(self, path): - self._u_invoke_client("add", path) - def _rcs_remove(self, path): - # --force to also remove unversioned files. - self._u_invoke_client("remove", "--force", path) - def _rcs_update(self, path): - pass - def _rcs_get_file_contents(self, path, revision=None, binary=False): - if revision == None: - return RCS._rcs_get_file_contents(self, path, revision, binary=binary) - else: - status,output,error = \ - self._u_invoke_client("cat","-r",revision,path) - return output - def _rcs_duplicate_repo(self, directory, revision=None): - if revision == None: - RCS._rcs_duplicate_repo(self, directory, revision) - else: - self._u_invoke_client("branch", "--revision", revision, - ".", directory) - def _rcs_commit(self, commitfile): - status,output,error = self._u_invoke_client("commit", "--unchanged", - "--file", commitfile) - revision = None - revline = re.compile("Committed revision (.*)[.]") - match = revline.search(error) - assert match != None, output+error - assert len(match.groups()) == 1 - revision = match.groups()[0] - return revision - def postcommit(self): - try: - self._u_invoke_client('merge') - except rcs.CommandError, e: - if ('No merge branch known or specified' in e.err_str or - 'No merge location known or specified' in e.err_str): - pass - else: - self._u_invoke_client('revert', '--no-backup', - directory=directory) - self._u_invoke_client('resolve', '--all', directory=directory) - raise - if len(self._u_invoke_client('status', directory=directory)[1]) > 0: - self.commit('Merge from upstream') - - -rcs.make_rcs_testcase_subclasses(Bzr, sys.modules[__name__]) - -unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) -suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/cmdutil.py b/libbe/cmdutil.py index edc6442..e69de29 100644 --- a/libbe/cmdutil.py +++ b/libbe/cmdutil.py @@ -1,191 +0,0 @@ -# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. -# Oleg Romanyshyn <oromanyshyn@panoramicfeedback.com> -# W. Trevor King <wking@drexel.edu> -# <abentley@panoramicfeedback.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -import optparse -import os -from textwrap import TextWrapper -from StringIO import StringIO -import sys -import doctest - -import bugdir -import plugin -import encoding - - -class UserError(Exception): - def __init__(self, msg): - Exception.__init__(self, msg) - -class UserErrorWrap(UserError): - def __init__(self, exception): - UserError.__init__(self, str(exception)) - self.exception = exception - -class UsageError(Exception): - pass - -class GetHelp(Exception): - pass - -class GetCompletions(Exception): - def __init__(self, completions=[]): - msg = "Get allowed completions" - Exception.__init__(self, msg) - self.completions = completions - -def iter_commands(): - for name, module in plugin.iter_plugins("becommands"): - yield name.replace("_", "-"), module - -def get_command(command_name): - """Retrieves the module for a user command - - >>> get_command("asdf") - Traceback (most recent call last): - UserError: Unknown command asdf - >>> repr(get_command("list")).startswith("<module 'becommands.list' from ") - True - """ - cmd = plugin.get_plugin("becommands", command_name.replace("-", "_")) - if cmd is None: - raise UserError("Unknown command %s" % command_name) - return cmd - - -def execute(cmd, args): - enc = encoding.get_encoding() - get_command(cmd).execute([a.decode(enc) for a in args]) - return 0 - -def help(cmd=None): - if cmd != None: - return get_command(cmd).help() - else: - cmdlist = [] - for name, module in iter_commands(): - cmdlist.append((name, module.__desc__)) - longest_cmd_len = max([len(name) for name,desc in cmdlist]) - ret = ["Bugs Everywhere - Distributed bug tracking", - "", - "usage: be [command] [command_options ...] [command_args ...]", - "or: be help", - "or: be help [command]", - "", - "Supported commands"] - for name, desc in cmdlist: - numExtraSpaces = longest_cmd_len-len(name) - ret.append("be %s%*s %s" % (name, numExtraSpaces, "", desc)) - - return "\n".join(ret) - -def completions(cmd): - parser = get_command(cmd).get_parser() - longopts = [] - for opt in parser.option_list: - longopts.append(opt.get_opt_string()) - return longopts - -def raise_get_help(option, opt, value, parser): - raise GetHelp - -def raise_get_completions(option, opt, value, parser): - print "got completion arg" - raise GetCompletions(completions(sys.argv[1])) - -class CmdOptionParser(optparse.OptionParser): - def __init__(self, usage): - optparse.OptionParser.__init__(self, usage) - self.disable_interspersed_args() - self.remove_option("-h") - self.add_option("-h", "--help", action="callback", - callback=raise_get_help, help="Print a help message") - self.add_option("--complete", action="callback", - callback=raise_get_completions, - help="Print a list of available completions") - - def error(self, message): - raise UsageError(message) - - def iter_options(self): - return iter_combine([self._short_opt.iterkeys(), - self._long_opt.iterkeys()]) - - def help_str(self): - f = StringIO() - self.print_help(f) - return f.getvalue() - -def option_value_pairs(options, parser): - """ - Iterate through OptionParser (option, value) pairs. - """ - for option in [o.dest for o in parser.option_list if o.dest != None]: - value = getattr(options, option) - yield (option, value) - -def default_complete(options, args, parser, bugid_args={}): - """ - A dud complete implementation for becommands to that the - --complete argument doesn't cause any problems. Use this - until you've set up a command-specific complete function. - - bugid_args is an optional dict where the keys are positional - arguments taking bug shortnames and the values are functions for - filtering, since that's a common enough operation. - e.g. for "be open [options] BUGID" - bugid_args = {0: lambda bug : bug.active == False} - """ - for option,value in option_value_pairs(options, parser): - if value == "--complete": - raise cmdutil.GetCompletions() - for pos,value in enumerate(args): - if value == "--complete": - if pos in bugid_args: - filter = bugid_args[pos] - bugshortnames = [] - try: - bd = bugdir.BugDir(from_disk=True, - manipulate_encodings=False) - bd.load_all_bugs() - bugs = [bug for bug in bd if filter(bug) == True] - bugshortnames = [bd.bug_shortname(bug) for bug in bugs] - except bugdir.NoBugDir: - pass - raise GetCompletions(bugshortnames) - raise GetCompletions() - -def underlined(instring): - """Produces a version of a string that is underlined with '=' - - >>> underlined("Underlined String") - 'Underlined String\\n=================' - """ - - return "%s\n%s" % (instring, "="*len(instring)) - - -def _test(): - import doctest - import sys - doctest.testmod() - -if __name__ == "__main__": - _test() - -suite = doctest.DocTestSuite() diff --git a/libbe/comment.py b/libbe/comment.py index 9d4f744..e69de29 100644 --- a/libbe/comment.py +++ b/libbe/comment.py @@ -1,538 +0,0 @@ -# Bugs Everywhere, a distributed bugtracker -# Copyright (C) 2008-2009 Chris Ball <cjb@laptop.org> -# Thomas Habets <thomas@habets.pp.se> -# W. Trevor King <wking@drexel.edu> -# <abentley@panoramicfeedback.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, -# MA 02110-1301, USA -import email.mime.base, email.encoders -import os -import os.path -import time -import xml.sax.saxutils -import textwrap -import doctest - -from beuuid import uuid_gen -from properties import Property, doc_property, local_property, \ - defaulting_property, checked_property, cached_property, \ - primed_property, change_hook_property, settings_property -import settings_object -import mapfile -from tree import Tree -import utility - - -class InvalidShortname(KeyError): - def __init__(self, shortname, shortnames): - msg = "Invalid shortname %s\n%s" % (shortname, shortnames) - KeyError.__init__(self, msg) - self.shortname = shortname - self.shortnames = shortnames - - -INVALID_UUID = "!!~~\n INVALID-UUID \n~~!!" - -def _list_to_root(comments, bug): - """ - Convert a raw list of comments to single (dummy) root comment. We - use a dummy root comment, because there can be several comment - threads rooted on the same parent bug. To simplify comment - interaction, we condense these threads into a single thread with a - Comment dummy root. - - No Comment method should use the dummy comment. - """ - root_comments = [] - uuid_map = {} - for comment in comments: - assert comment.uuid != None - uuid_map[comment.uuid] = comment - for comm in comments: - rep = comm.in_reply_to - if rep == None or rep == settings_object.EMPTY or rep == bug.uuid: - root_comments.append(comm) - else: - parentUUID = comm.in_reply_to - parent = uuid_map[parentUUID] - parent.add_reply(comm) - dummy_root = Comment(bug, uuid=INVALID_UUID) - dummy_root.extend(root_comments) - return dummy_root - -def loadComments(bug, load_full=False): - """ - Set load_full=True when you want to load the comment completely - from disk *now*, rather than waiting and lazy loading as required. - """ - path = bug.get_path("comments") - if not os.path.isdir(path): - return Comment(bug, uuid=INVALID_UUID) - comments = [] - for uuid in os.listdir(path): - if uuid.startswith('.'): - continue - comm = Comment(bug, uuid, from_disk=True) - if load_full == True: - comm.load_settings() - dummy = comm.body # force the body to load - comments.append(comm) - return _list_to_root(comments, bug) - -def saveComments(bug): - path = bug.get_path("comments") - bug.rcs.mkdir(path) - for comment in bug.comment_root.traverse(): - comment.save() - - -class Comment(Tree, settings_object.SavedSettingsObject): - """ - >>> c = Comment() - >>> c.uuid != None - True - >>> c.uuid = "some-UUID" - >>> print c.content_type - text/plain - """ - - settings_properties = [] - required_saved_properties = [] - _prop_save_settings = settings_object.prop_save_settings - _prop_load_settings = settings_object.prop_load_settings - def _versioned_property(settings_properties=settings_properties, - required_saved_properties=required_saved_properties, - **kwargs): - if "settings_properties" not in kwargs: - kwargs["settings_properties"] = settings_properties - if "required_saved_properties" not in kwargs: - kwargs["required_saved_properties"]=required_saved_properties - return settings_object.versioned_property(**kwargs) - - @_versioned_property(name="From", - doc="The author of the comment") - def From(): return {} - - @_versioned_property(name="In-reply-to", - doc="UUID for parent comment or bug") - def in_reply_to(): return {} - - @_versioned_property(name="Content-type", - doc="Mime type for comment body", - default="text/plain", - require_save=True) - def content_type(): return {} - - @_versioned_property(name="Date", - doc="An RFC 2822 timestamp for comment creation") - def time_string(): return {} - - def _get_time(self): - if self.time_string == None: - return None - return utility.str_to_time(self.time_string) - def _set_time(self, value): - self.time_string = utility.time_to_str(value) - time = property(fget=_get_time, - fset=_set_time, - doc="An integer version of .time_string") - - def _get_comment_body(self): - if self.rcs != None and self.sync_with_disk == True: - import rcs - binary = not self.content_type.startswith("text/") - return self.rcs.get_file_contents(self.get_path("body"), binary=binary) - def _set_comment_body(self, old=None, new=None, force=False): - if (self.rcs != None and self.sync_with_disk == True) or force==True: - assert new != None, "Can't save empty comment" - binary = not self.content_type.startswith("text/") - self.rcs.set_file_contents(self.get_path("body"), new, binary=binary) - - @Property - @change_hook_property(hook=_set_comment_body) - @cached_property(generator=_get_comment_body) - @local_property("body") - @doc_property(doc="The meat of the comment") - def body(): return {} - - def _get_rcs(self): - if hasattr(self.bug, "rcs"): - return self.bug.rcs - - @Property - @cached_property(generator=_get_rcs) - @local_property("rcs") - @doc_property(doc="A revision control system instance.") - def rcs(): return {} - - def __init__(self, bug=None, uuid=None, from_disk=False, - in_reply_to=None, body=None): - """ - Set from_disk=True to load an old comment. - Set from_disk=False to create a new comment. - - The uuid option is required when from_disk==True. - - The in_reply_to and body options are only used if - from_disk==False (the default). When from_disk==True, they are - loaded from the bug database. - - in_reply_to should be the uuid string of the parent comment. - """ - Tree.__init__(self) - settings_object.SavedSettingsObject.__init__(self) - self.bug = bug - self.uuid = uuid - if from_disk == True: - self.sync_with_disk = True - else: - self.sync_with_disk = False - if uuid == None: - self.uuid = uuid_gen() - self.time = int(time.time()) # only save to second precision - if self.rcs != None: - self.From = self.rcs.get_user_id() - self.in_reply_to = in_reply_to - self.body = body - - def traverse(self, *args, **kwargs): - """Avoid working with the possible dummy root comment""" - for comment in Tree.traverse(self, *args, **kwargs): - if comment.uuid == INVALID_UUID: - continue - yield comment - - def _setting_attr_string(self, setting): - value = getattr(self, setting) - if value == settings_object.EMPTY: - return "" - else: - return str(value) - - def xml(self, indent=0, shortname=None): - """ - >>> comm = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n") - >>> comm.uuid = "0123" - >>> comm.time_string = "Thu, 01 Jan 1970 00:00:00 +0000" - >>> print comm.xml(indent=2, shortname="com-1") - <comment> - <uuid>0123</uuid> - <short-name>com-1</short-name> - <from></from> - <date>Thu, 01 Jan 1970 00:00:00 +0000</date> - <content-type>text/plain</content-type> - <body>Some - insightful - remarks</body> - </comment> - """ - if shortname == None: - shortname = self.uuid - if self.content_type.startswith("text/"): - body = (self.body or "").rstrip('\n') - else: - maintype,subtype = self.content_type.split('/',1) - msg = email.mime.base.MIMEBase(maintype, subtype) - msg.set_payload(self.body or "") - email.encoders.encode_base64(msg) - body = msg.as_string() - info = [("uuid", self.uuid), - ("short-name", shortname), - ("in-reply-to", self.in_reply_to), - ("from", self._setting_attr_string("From")), - ("date", self.time_string), - ("content-type", self.content_type), - ("body", body)] - lines = ["<comment>"] - for (k,v) in info: - if v not in [settings_object.EMPTY, None]: - lines.append(' <%s>%s</%s>' % (k,xml.sax.saxutils.escape(v),k)) - lines.append("</comment>") - istring = ' '*indent - sep = '\n' + istring - return istring + sep.join(lines).rstrip('\n') - - def string(self, indent=0, shortname=None): - """ - >>> comm = Comment(bug=None, body="Some\\ninsightful\\nremarks\\n") - >>> comm.time_string = "Thu, 01 Jan 1970 00:00:00 +0000" - >>> print comm.string(indent=2, shortname="com-1") - --------- Comment --------- - Name: com-1 - From: - Date: Thu, 01 Jan 1970 00:00:00 +0000 - <BLANKLINE> - Some - insightful - remarks - """ - if shortname == None: - shortname = self.uuid - lines = [] - lines.append("--------- Comment ---------") - lines.append("Name: %s" % shortname) - lines.append("From: %s" % (self._setting_attr_string("From"))) - lines.append("Date: %s" % self.time_string) - lines.append("") - #lines.append(textwrap.fill(self.body or "", - # width=(79-indent))) - if self.content_type.startswith("text/"): - lines.extend((self.body or "").splitlines()) - else: - lines.append("Content type %s not printable. Try XML output instead" % self.content_type) - # some comments shouldn't be wrapped... - - istring = ' '*indent - sep = '\n' + istring - return istring + sep.join(lines).rstrip('\n') - - def __str__(self): - """ - >>> comm = Comment(bug=None, body="Some insightful remarks") - >>> comm.uuid = "com-1" - >>> comm.time_string = "Thu, 20 Nov 2008 15:55:11 +0000" - >>> comm.From = "Jane Doe <jdoe@example.com>" - >>> print comm - --------- Comment --------- - Name: com-1 - From: Jane Doe <jdoe@example.com> - Date: Thu, 20 Nov 2008 15:55:11 +0000 - <BLANKLINE> - Some insightful remarks - """ - return self.string() - - def get_path(self, name=None): - my_dir = os.path.join(self.bug.get_path("comments"), self.uuid) - if name is None: - return my_dir - assert name in ["values", "body"] - return os.path.join(my_dir, name) - - def load_settings(self): - self.settings = mapfile.map_load(self.rcs, self.get_path("values")) - self._setup_saved_settings() - - def save_settings(self): - parent_dir = os.path.dirname(self.get_path()) - self.rcs.mkdir(parent_dir) - self.rcs.mkdir(self.get_path()) - path = self.get_path("values") - mapfile.map_save(self.rcs, path, self._get_saved_settings()) - - def save(self): - assert self.body != None, "Can't save blank comment" - #if self.in_reply_to == None: - # raise Exception, str(self)+'\n'+str(self.settings)+'\n'+str(self._settings_loaded) - #assert self.in_reply_to != None, "Comment must be a reply to something" - self.save_settings() - self._set_comment_body(new=self.body, force=True) - - def remove(self): - for comment in self.traverse(): - path = comment.get_path() - self.rcs.recursive_remove(path) - - def add_reply(self, reply, allow_time_inversion=False): - if self.uuid != INVALID_UUID: - reply.in_reply_to = self.uuid - self.append(reply) - #raise Exception, "adding reply \n%s\n%s" % (self, reply) - - def new_reply(self, body=None): - """ - >>> comm = Comment(bug=None, body="Some insightful remarks") - >>> repA = comm.new_reply("Critique original comment") - >>> repB = repA.new_reply("Begin flamewar :p") - >>> repB.in_reply_to == repA.uuid - True - """ - reply = Comment(self.bug, body=body) - self.add_reply(reply) - #raise Exception, "new reply added (%s),\n%s\n%s\n\t--%s--" % (body, self, reply, reply.in_reply_to) - return reply - - def string_thread(self, string_method_name="string", name_map={}, - indent=0, flatten=True, - auto_name_map=False, bug_shortname=None): - """ - Return a string displaying a thread of comments. - bug_shortname is only used if auto_name_map == True. - - string_method_name (defaults to "string") is the name of the - Comment method used to generate the output string for each - Comment in the thread. The method must take the arguments - indent and shortname. - - SIDE-EFFECT: if auto_name_map==True, calls comment_shortnames() - which will sort the tree by comment.time. Avoid by calling - name_map = {} - for shortname,comment in comm.comment_shortnames(bug_shortname): - name_map[comment.uuid] = shortname - comm.sort(key=lambda c : c.From) # your sort - comm.string_thread(name_map=name_map) - - >>> a = Comment(bug=None, uuid="a", body="Insightful remarks") - >>> a.time = utility.str_to_time("Thu, 20 Nov 2008 01:00:00 +0000") - >>> b = a.new_reply("Critique original comment") - >>> b.uuid = "b" - >>> b.time = utility.str_to_time("Thu, 20 Nov 2008 02:00:00 +0000") - >>> c = b.new_reply("Begin flamewar :p") - >>> c.uuid = "c" - >>> c.time = utility.str_to_time("Thu, 20 Nov 2008 03:00:00 +0000") - >>> d = a.new_reply("Useful examples") - >>> d.uuid = "d" - >>> d.time = utility.str_to_time("Thu, 20 Nov 2008 04:00:00 +0000") - >>> a.sort(key=lambda comm : comm.time) - >>> print a.string_thread(flatten=True) - --------- Comment --------- - Name: a - From: - Date: Thu, 20 Nov 2008 01:00:00 +0000 - <BLANKLINE> - Insightful remarks - --------- Comment --------- - Name: b - From: - Date: Thu, 20 Nov 2008 02:00:00 +0000 - <BLANKLINE> - Critique original comment - --------- Comment --------- - Name: c - From: - Date: Thu, 20 Nov 2008 03:00:00 +0000 - <BLANKLINE> - Begin flamewar :p - --------- Comment --------- - Name: d - From: - Date: Thu, 20 Nov 2008 04:00:00 +0000 - <BLANKLINE> - Useful examples - >>> print a.string_thread(auto_name_map=True, bug_shortname="bug-1") - --------- Comment --------- - Name: bug-1:1 - From: - Date: Thu, 20 Nov 2008 01:00:00 +0000 - <BLANKLINE> - Insightful remarks - --------- Comment --------- - Name: bug-1:2 - From: - Date: Thu, 20 Nov 2008 02:00:00 +0000 - <BLANKLINE> - Critique original comment - --------- Comment --------- - Name: bug-1:3 - From: - Date: Thu, 20 Nov 2008 03:00:00 +0000 - <BLANKLINE> - Begin flamewar :p - --------- Comment --------- - Name: bug-1:4 - From: - Date: Thu, 20 Nov 2008 04:00:00 +0000 - <BLANKLINE> - Useful examples - """ - if auto_name_map == True: - name_map = {} - for shortname,comment in self.comment_shortnames(bug_shortname): - name_map[comment.uuid] = shortname - stringlist = [] - for depth,comment in self.thread(flatten=flatten): - ind = 2*depth+indent - if comment.uuid in name_map: - sname = name_map[comment.uuid] - else: - sname = None - string_fn = getattr(comment, string_method_name) - stringlist.append(string_fn(indent=ind, shortname=sname)) - return '\n'.join(stringlist) - - def xml_thread(self, name_map={}, indent=0, - auto_name_map=False, bug_shortname=None): - return self.string_thread(string_method_name="xml", name_map=name_map, - indent=indent, auto_name_map=auto_name_map, - bug_shortname=bug_shortname) - - def comment_shortnames(self, bug_shortname=None): - """ - Iterate through (id, comment) pairs, in time order. - (This is a user-friendly id, not the comment uuid). - - SIDE-EFFECT : will sort the comment tree by comment.time - - >>> a = Comment(bug=None, uuid="a") - >>> b = a.new_reply() - >>> b.uuid = "b" - >>> c = b.new_reply() - >>> c.uuid = "c" - >>> d = a.new_reply() - >>> d.uuid = "d" - >>> for id,name in a.comment_shortnames("bug-1"): - ... print id, name.uuid - bug-1:1 a - bug-1:2 b - bug-1:3 c - bug-1:4 d - """ - if bug_shortname == None: - bug_shortname = "" - self.sort(key=lambda comm : comm.time) - for num,comment in enumerate(self.traverse()): - yield ("%s:%d" % (bug_shortname, num+1), comment) - - def comment_from_shortname(self, comment_shortname, *args, **kwargs): - """ - Use a comment shortname to look up a comment. - >>> a = Comment(bug=None, uuid="a") - >>> b = a.new_reply() - >>> b.uuid = "b" - >>> c = b.new_reply() - >>> c.uuid = "c" - >>> d = a.new_reply() - >>> d.uuid = "d" - >>> comm = a.comment_from_shortname("bug-1:3", bug_shortname="bug-1") - >>> id(comm) == id(c) - True - """ - for cur_name, comment in self.comment_shortnames(*args, **kwargs): - if comment_shortname == cur_name: - return comment - raise InvalidShortname(comment_shortname, - list(self.comment_shortnames(*args, **kwargs))) - - def comment_from_uuid(self, uuid): - """ - Use a comment shortname to look up a comment. - >>> a = Comment(bug=None, uuid="a") - >>> b = a.new_reply() - >>> b.uuid = "b" - >>> c = b.new_reply() - >>> c.uuid = "c" - >>> d = a.new_reply() - >>> d.uuid = "d" - >>> comm = a.comment_from_uuid("d") - >>> id(comm) == id(d) - True - """ - for comment in self.traverse(): - if comment.uuid == uuid: - return comment - raise KeyError(uuid) - -suite = doctest.DocTestSuite() diff --git a/libbe/config.py b/libbe/config.py index 7f600a5..e69de29 100644 --- a/libbe/config.py +++ b/libbe/config.py @@ -1,84 +0,0 @@ -# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. -# W. Trevor King <wking@drexel.edu> -# <abentley@panoramicfeedback.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -import ConfigParser -import codecs -import locale -import os.path -import sys -import doctest - -default_encoding = sys.getfilesystemencoding() or locale.getpreferredencoding() - -def path(): - """Return the path to the per-user config file""" - return os.path.expanduser("~/.bugs_everywhere") - -def set_val(name, value, section="DEFAULT", encoding=None): - """Set a value in the per-user config file - - :param name: The name of the value to set - :param value: The new value to set (or None to delete the value) - :param section: The section to store the name/value in - """ - if encoding == None: - encoding = default_encoding - config = ConfigParser.ConfigParser() - if os.path.exists(path()) == False: # touch file or config - open(path(), "w").close() # read chokes on missing file - f = codecs.open(path(), "r", encoding) - config.readfp(f, path()) - f.close() - if value is not None: - config.set(section, name, value) - else: - config.remove_option(section, name) - f = codecs.open(path(), "w", encoding) - config.write(f) - f.close() - -def get_val(name, section="DEFAULT", default=None, encoding=None): - """ - Get a value from the per-user config file - - :param name: The name of the value to get - :section: The section that the name is in - :return: The value, or None - >>> get_val("junk") is None - True - >>> set_val("junk", "random") - >>> get_val("junk") - u'random' - >>> set_val("junk", None) - >>> get_val("junk") is None - True - """ - if os.path.exists(path()): - if encoding == None: - encoding = default_encoding - config = ConfigParser.ConfigParser() - f = codecs.open(path(), "r", encoding) - config.readfp(f, path()) - f.close() - try: - return config.get(section, name) - except ConfigParser.NoOptionError: - return default - else: - return default - -suite = doctest.DocTestSuite() diff --git a/libbe/darcs.py b/libbe/darcs.py index 43af99a..e69de29 100644 --- a/libbe/darcs.py +++ b/libbe/darcs.py @@ -1,162 +0,0 @@ -# Copyright (C) 2009 W. Trevor King <wking@drexel.edu> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -import codecs -import os -import re -import sys -import unittest -import doctest - -import rcs -from rcs import RCS - -def new(): - return Darcs() - -class Darcs(RCS): - name="darcs" - client="darcs" - versioned=True - def _rcs_help(self): - status,output,error = self._u_invoke_client("--help") - return output - def _rcs_detect(self, path): - if self._u_search_parent_directories(path, "_darcs") != None : - return True - return False - def _rcs_root(self, path): - """Find the root of the deepest repository containing path.""" - # Assume that nothing funny is going on; in particular, that we aren't - # dealing with a bare repo. - if os.path.isdir(path) != True: - path = os.path.dirname(path) - darcs_dir = self._u_search_parent_directories(path, "_darcs") - if darcs_dir == None: - return None - return os.path.dirname(darcs_dir) - def _rcs_init(self, path): - self._u_invoke_client("init", directory=path) - def _rcs_get_user_id(self): - # following http://darcs.net/manual/node4.html#SECTION00410030000000000000 - # as of June 29th, 2009 - if self.rootdir == None: - return None - darcs_dir = os.path.join(self.rootdir, "_darcs") - if darcs_dir != None: - for pref_file in ["author", "email"]: - pref_path = os.path.join(darcs_dir, "prefs", pref_file) - if os.path.exists(pref_path): - return self.get_file_contents(pref_path) - for env_variable in ["DARCS_EMAIL", "EMAIL"]: - if env_variable in os.environ: - return os.environ[env_variable] - return None - def _rcs_set_user_id(self, value): - if self.rootdir == None: - self.root(".") - if self.rootdir == None: - raise rcs.SettingIDnotSupported - author_path = os.path.join(self.rootdir, "_darcs", "prefs", "author") - f = codecs.open(author_path, "w", self.encoding) - f.write(value) - f.close() - def _rcs_add(self, path): - if os.path.isdir(path): - return - self._u_invoke_client("add", path) - def _rcs_remove(self, path): - if not os.path.isdir(self._u_abspath(path)): - os.remove(os.path.join(self.rootdir, path)) # darcs notices removal - def _rcs_update(self, path): - pass # darcs notices changes - def _rcs_get_file_contents(self, path, revision=None, binary=False): - if revision == None: - return RCS._rcs_get_file_contents(self, path, revision, - binary=binary) - else: - try: - return self._u_invoke_client("show", "contents", "--patch", revision, path) - except rcs.CommandError: - # Darcs versions < 2.0.0pre2 lack the "show contents" command - - status,output,error = self._u_invoke_client("diff", "--unified", - "--from-patch", - revision, path) - major_patch = output - status,output,error = self._u_invoke_client("diff", "--unified", - "--patch", - revision, path) - target_patch = output - - # "--output -" to be supported in GNU patch > 2.5.9 - # but that hasn't been released as of June 30th, 2009. - - # Rewrite path to status before the patch we want - args=["patch", "--reverse", path] - status,output,error = self._u_invoke(args, stdin=major_patch) - # Now apply the patch we want - args=["patch", path] - status,output,error = self._u_invoke(args, stdin=target_patch) - - if os.path.exists(os.path.join(self.rootdir, path)) == True: - contents = RCS._rcs_get_file_contents(self, path, - binary=binary) - else: - contents = "" - - # Now restore path to it's current incarnation - args=["patch", "--reverse", path] - status,output,error = self._u_invoke(args, stdin=target_patch) - args=["patch", path] - status,output,error = self._u_invoke(args, stdin=major_patch) - current_contents = RCS._rcs_get_file_contents(self, path, - binary=binary) - return contents - def _rcs_duplicate_repo(self, directory, revision=None): - if revision==None: - RCS._rcs_duplicate_repo(self, directory, revision) - else: - self._u_invoke_client("put", "--to-patch", revision, directory) - def _rcs_commit(self, commitfile): - id = self.get_user_id() - if '@' not in id: - id = "%s <%s@invalid.com>" % (id, id) - # Darcs doesn't like commitfiles without trailing endlines. - f = codecs.open(commitfile, 'r', self.encoding) - contents = f.read() - f.close() - if contents[-1] != '\n': - f = codecs.open(commitfile, 'a', self.encoding) - f.write('\n') - f.close() - status,output,error = self._u_invoke_client('record', '--all', - '--author', id, - '--logfile', commitfile) - revision = None - - revline = re.compile("Finished recording patch '(.*)'") - match = revline.search(output) - assert match != None, output+error - assert len(match.groups()) == 1 - revision = match.groups()[0] - return revision - - -rcs.make_rcs_testcase_subclasses(Darcs, sys.modules[__name__]) - -unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) -suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/diff.py b/libbe/diff.py index a349e14..e69de29 100644 --- a/libbe/diff.py +++ b/libbe/diff.py @@ -1,117 +0,0 @@ -# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. -# W. Trevor King <wking@drexel.edu> -# <abentley@panoramicfeedback.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -"""Compare two bug trees""" -from libbe import cmdutil, bugdir, bug -from libbe.utility import time_to_str -import doctest - -def diff(old_bugdir, new_bugdir): - added = [] - removed = [] - modified = [] - for uuid in old_bugdir.list_uuids(): - old_bug = old_bugdir.bug_from_uuid(uuid) - try: - new_bug = new_bugdir.bug_from_uuid(uuid) - if old_bug != new_bug: - modified.append((old_bug, new_bug)) - except KeyError: - removed.append(old_bug) - for uuid in new_bugdir.list_uuids(): - if not old_bugdir.has_bug(uuid): - new_bug = new_bugdir.bug_from_uuid(uuid) - added.append(new_bug) - return (removed, modified, added) - -def diff_report(diff_data, bug_dir): - (removed, modified, added) = diff_data - def modified_cmp(left, right): - return bug.cmp_severity(left[1], right[1]) - - added.sort(bug.cmp_severity) - removed.sort(bug.cmp_severity) - modified.sort(modified_cmp) - lines = [] - - if len(added) > 0: - lines.append("New bug reports:") - for bg in added: - lines.extend(bg.string(shortlist=True).splitlines()) - lines.append("") - - if len(modified) > 0: - printed = False - for old_bug, new_bug in modified: - change_str = bug_changes(old_bug, new_bug, bug_dir) - if change_str is None: - continue - if not printed: - printed = True - lines.append("Modified bug reports:") - lines.extend(change_str.splitlines()) - if printed == True: - lines.append("") - - if len(removed) > 0: - lines.append("Removed bug reports:") - for bg in removed: - lines.extend(bg.string(shortlist=True).splitlines()) - lines.append("") - - return '\n'.join(lines) - -def change_lines(old, new, attributes): - change_list = [] - for attr in attributes: - old_attr = getattr(old, attr) - new_attr = getattr(new, attr) - if old_attr != new_attr: - change_list.append((attr, old_attr, new_attr)) - if len(change_list) >= 0: - return change_list - else: - return None - -def bug_changes(old, new, bugs): - change_list = change_lines(old, new, ("time", "creator", "severity", - "target", "summary", "status", "assigned")) - - old_comment_ids = [c.uuid for c in old.comments()] - new_comment_ids = [c.uuid for c in new.comments()] - change_strings = ["%s: %s -> %s" % f for f in change_list] - for comment_id in new_comment_ids: - if comment_id not in old_comment_ids: - summary = comment_summary(new.comment_from_uuid(comment_id), "new") - change_strings.append(summary) - for comment_id in old_comment_ids: - if comment_id not in new_comment_ids: - summary = comment_summary(new.comment_from_uuid(comment_id), - "removed") - change_strings.append(summary) - - if len(change_strings) == 0: - return None - return "%s\n %s" % (new.string(shortlist=True), - " \n".join(change_strings)) - - -def comment_summary(comment, status): - return "%8s comment from %s on %s" % (status, comment.From, - time_to_str(comment.time)) - -suite = doctest.DocTestSuite() diff --git a/libbe/editor.py b/libbe/editor.py index 6638ed9..e69de29 100644 --- a/libbe/editor.py +++ b/libbe/editor.py @@ -1,103 +0,0 @@ -# Bugs Everywhere, a distributed bugtracker -# Copyright (C) 2008-2009 W. Trevor King <wking@drexel.edu> -# <abentley@panoramicfeedback.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, -# MA 02110-1301, USA - -import codecs -import locale -import os -import sys -import tempfile -import doctest - -default_encoding = sys.getfilesystemencoding() or locale.getpreferredencoding() - -comment_marker = u"== Anything below this line will be ignored\n" - -class CantFindEditor(Exception): - def __init__(self): - Exception.__init__(self, "Can't find editor to get string from") - -def editor_string(comment=None, encoding=None): - """Invokes the editor, and returns the user_produced text as a string - - >>> if "EDITOR" in os.environ: - ... del os.environ["EDITOR"] - >>> if "VISUAL" in os.environ: - ... del os.environ["VISUAL"] - >>> editor_string() - Traceback (most recent call last): - CantFindEditor: Can't find editor to get string from - >>> os.environ["EDITOR"] = "echo bar > " - >>> editor_string() - u'bar\\n' - >>> os.environ["VISUAL"] = "echo baz > " - >>> editor_string() - u'baz\\n' - >>> del os.environ["EDITOR"] - >>> del os.environ["VISUAL"] - """ - if encoding == None: - encoding = default_encoding - for name in ('VISUAL', 'EDITOR'): - try: - editor = os.environ[name] - break - except KeyError: - pass - else: - raise CantFindEditor() - fhandle, fname = tempfile.mkstemp() - try: - if comment is not None: - os.write(fhandle, '\n'+comment_string(comment)) - os.close(fhandle) - oldmtime = os.path.getmtime(fname) - os.system("%s %s" % (editor, fname)) - f = codecs.open(fname, "r", encoding) - output = trimmed_string(f.read()) - f.close() - if output.rstrip('\n') == "": - output = None - finally: - os.unlink(fname) - return output - - -def comment_string(comment): - """ - >>> comment_string('hello') == comment_marker+"hello" - True - """ - return comment_marker + comment - - -def trimmed_string(instring): - """ - >>> trimmed_string("hello\\n"+comment_marker) - u'hello\\n' - >>> trimmed_string("hi!\\n" + comment_string('Booga')) - u'hi!\\n' - """ - out = [] - for line in instring.splitlines(True): - if line.startswith(comment_marker): - break - out.append(line) - return ''.join(out) - -suite = doctest.DocTestSuite() diff --git a/libbe/encoding.py b/libbe/encoding.py index bdac98e..e69de29 100644 --- a/libbe/encoding.py +++ b/libbe/encoding.py @@ -1,53 +0,0 @@ -# Bugs Everywhere, a distributed bugtracker -# Copyright (C) 2008-2009 W. Trevor King <wking@drexel.edu> -# <abentley@panoramicfeedback.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, -# MA 02110-1301, USA -import codecs -import locale -import sys -import doctest - -def get_encoding(): - """ - Guess a useful input/output/filesystem encoding... Maybe we need - seperate encodings for input/output and filesystem? Hmm... - """ - encoding = locale.getpreferredencoding() or sys.getdefaultencoding() - if sys.platform != 'win32' or sys.version_info[:2] > (2, 3): - encoding = locale.getlocale(locale.LC_TIME)[1] or encoding - # Python 2.3 on windows doesn't know about 'XYZ' alias for 'cpXYZ' - return encoding - -def known_encoding(encoding): - """ - >>> known_encoding("highly-unlikely-encoding") - False - >>> known_encoding(get_encoding()) - True - """ - try: - codecs.lookup(encoding) - return True - except LookupError: - return False - -def set_IO_stream_encodings(encoding): - sys.stdin = codecs.getreader(encoding)(sys.__stdin__) - sys.stdout = codecs.getwriter(encoding)(sys.__stdout__) - sys.stderr = codecs.getwriter(encoding)(sys.__stderr__) - -suite = doctest.DocTestSuite() diff --git a/libbe/git.py b/libbe/git.py index 31bbe32..e69de29 100644 --- a/libbe/git.py +++ b/libbe/git.py @@ -1,111 +0,0 @@ -# Copyright (C) 2008-2009 Ben Finney <ben+python@benfinney.id.au> -# Chris Ball <cjb@laptop.org> -# W. Trevor King <wking@drexel.edu> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -import os -import re -import sys -import unittest -import doctest - -import rcs -from rcs import RCS - -def new(): - return Git() - -class Git(RCS): - name="git" - client="git" - versioned=True - def _rcs_help(self): - status,output,error = self._u_invoke_client("--help") - return output - def _rcs_detect(self, path): - if self._u_search_parent_directories(path, ".git") != None : - return True - return False - def _rcs_root(self, path): - """Find the root of the deepest repository containing path.""" - # Assume that nothing funny is going on; in particular, that we aren't - # dealing with a bare repo. - if os.path.isdir(path) != True: - path = os.path.dirname(path) - status,output,error = self._u_invoke_client("rev-parse", "--git-dir", - directory=path) - gitdir = os.path.join(path, output.rstrip('\n')) - dirname = os.path.abspath(os.path.dirname(gitdir)) - return dirname - def _rcs_init(self, path): - self._u_invoke_client("init", directory=path) - def _rcs_get_user_id(self): - status,output,error = self._u_invoke_client("config", "user.name") - name = output.rstrip('\n') - status,output,error = self._u_invoke_client("config", "user.email") - email = output.rstrip('\n') - if name != "" or email != "": # got something! - # guess missing info, if necessary - if name == "": - name = self._u_get_fallback_username() - if email == "": - email = self._u_get_fallback_email() - return self._u_create_id(name, email) - return None # Git has no infomation - def _rcs_set_user_id(self, value): - name,email = self._u_parse_id(value) - if email != None: - self._u_invoke_client("config", "user.email", email) - self._u_invoke_client("config", "user.name", name) - def _rcs_add(self, path): - if os.path.isdir(path): - return - self._u_invoke_client("add", path) - def _rcs_remove(self, path): - if not os.path.isdir(self._u_abspath(path)): - self._u_invoke_client("rm", "-f", path) - def _rcs_update(self, path): - self._rcs_add(path) - def _rcs_get_file_contents(self, path, revision=None, binary=False): - if revision == None: - return RCS._rcs_get_file_contents(self, path, revision, binary=binary) - else: - arg = "%s:%s" % (revision,path) - status,output,error = self._u_invoke_client("show", arg) - return output - def _rcs_duplicate_repo(self, directory, revision=None): - if revision==None: - RCS._rcs_duplicate_repo(self, directory, revision) - else: - #self._u_invoke_client("archive", revision, directory) # makes tarball - self._u_invoke_client("clone", "--no-checkout",".",directory) - self._u_invoke_client("checkout", revision, directory=directory) - def _rcs_commit(self, commitfile): - status,output,error = self._u_invoke_client('commit', '-a', - '-F', commitfile) - revision = None - revline = re.compile("(.*) (.*)[:\]] (.*)") - match = revline.search(output) - assert match != None, output+error - assert len(match.groups()) == 3 - revision = match.groups()[1] - return revision - - -rcs.make_rcs_testcase_subclasses(Git, sys.modules[__name__]) - -unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) -suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/hg.py b/libbe/hg.py index 30b0470..e69de29 100644 --- a/libbe/hg.py +++ b/libbe/hg.py @@ -1,91 +0,0 @@ -# Copyright (C) 2007-2009 Aaron Bentley and Panometrics, Inc. -# Ben Finney <ben+python@benfinney.id.au> -# W. Trevor King <wking@drexel.edu> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -import os -import re -import sys -import unittest -import doctest - -import rcs -from rcs import RCS - -def new(): - return Hg() - -class Hg(RCS): - name="hg" - client="hg" - versioned=True - def _rcs_help(self): - status,output,error = self._u_invoke_client("--help") - return output - def _rcs_detect(self, path): - """Detect whether a directory is revision-controlled using Mercurial""" - if self._u_search_parent_directories(path, ".hg") != None: - return True - return False - def _rcs_root(self, path): - status,output,error = self._u_invoke_client("root", directory=path) - return output.rstrip('\n') - def _rcs_init(self, path): - self._u_invoke_client("init", directory=path) - def _rcs_get_user_id(self): - status,output,error = self._u_invoke_client("showconfig","ui.username") - return output.rstrip('\n') - def _rcs_set_user_id(self, value): - """ - Supported by the Config Extension, but that is not part of - standard Mercurial. - http://www.selenic.com/mercurial/wiki/index.cgi/ConfigExtension - """ - raise rcs.SettingIDnotSupported - def _rcs_add(self, path): - self._u_invoke_client("add", path) - def _rcs_remove(self, path): - self._u_invoke_client("rm", path) - def _rcs_update(self, path): - pass - def _rcs_get_file_contents(self, path, revision=None, binary=False): - if revision == None: - return RCS._rcs_get_file_contents(self, path, revision, binary=binary) - else: - status,output,error = \ - self._u_invoke_client("cat","-r",revision,path) - return output - def _rcs_duplicate_repo(self, directory, revision=None): - if revision == None: - return RCS._rcs_duplicate_repo(self, directory, revision) - else: - self._u_invoke_client("archive", "--rev", revision, directory) - def _rcs_commit(self, commitfile): - self._u_invoke_client('commit', '--logfile', commitfile) - status,output,error = self._u_invoke_client('identify') - revision = None - revline = re.compile("(.*) tip") - match = revline.search(output) - assert match != None, output+error - assert len(match.groups()) == 1 - revision = match.groups()[0] - return revision - - -rcs.make_rcs_testcase_subclasses(Hg, sys.modules[__name__]) - -unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) -suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/mapfile.py b/libbe/mapfile.py index 9ff2215..e69de29 100644 --- a/libbe/mapfile.py +++ b/libbe/mapfile.py @@ -1,128 +0,0 @@ -# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. -# W. Trevor King <wking@drexel.edu> -# <abentley@panoramicfeedback.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -import yaml -import os.path -import errno -import utility -import doctest - -class IllegalKey(Exception): - def __init__(self, key): - Exception.__init__(self, 'Illegal key "%s"' % key) - self.key = key - -class IllegalValue(Exception): - def __init__(self, value): - Exception.__init__(self, 'Illegal value "%s"' % value) - self.value = value - -def generate(map): - """Generate a YAML mapfile content string. - >>> generate({"q":"p"}) - 'q: p\\n\\n' - >>> generate({"q":u"Fran\u00e7ais"}) - 'q: Fran\\xc3\\xa7ais\\n\\n' - >>> generate({"q":u"hello"}) - 'q: hello\\n\\n' - >>> generate({"q=":"p"}) - Traceback (most recent call last): - IllegalKey: Illegal key "q=" - >>> generate({"q:":"p"}) - Traceback (most recent call last): - IllegalKey: Illegal key "q:" - >>> generate({"q\\n":"p"}) - Traceback (most recent call last): - IllegalKey: Illegal key "q\\n" - >>> generate({"":"p"}) - Traceback (most recent call last): - IllegalKey: Illegal key "" - >>> generate({">q":"p"}) - Traceback (most recent call last): - IllegalKey: Illegal key ">q" - >>> generate({"q":"p\\n"}) - Traceback (most recent call last): - IllegalValue: Illegal value "p\\n" - """ - keys = map.keys() - keys.sort() - for key in keys: - try: - assert not key.startswith('>') - assert('\n' not in key) - assert('=' not in key) - assert(':' not in key) - assert(len(key) > 0) - except AssertionError: - raise IllegalKey(key.encode('string_escape')) - if "\n" in map[key]: - raise IllegalValue(map[key].encode('string_escape')) - - lines = [] - for key in keys: - lines.append(yaml.safe_dump({key: map[key]}, - default_flow_style=False, - allow_unicode=True)) - lines.append("") - return '\n'.join(lines) - -def parse(contents): - """ - Parse a YAML mapfile string. - >>> parse('q: p\\n\\n')['q'] - 'p' - >>> parse('q: \\'p\\'\\n\\n')['q'] - 'p' - >>> contents = generate({"a":"b", "c":"d", "e":"f"}) - >>> dict = parse(contents) - >>> dict["a"] - 'b' - >>> dict["c"] - 'd' - >>> dict["e"] - 'f' - """ - old_format = False - for line in contents.splitlines(): - if len(line.split("=")) == 2: - old_format = True - break - if old_format: # translate to YAML. Hack to deal with old BE bugs. - newlines = [] - for line in contents.splitlines(): - line = line.rstrip('\n') - if len(line) == 0: - continue - fields = line.split("=") - if len(fields) == 2: - key,value = fields - newlines.append('%s: "%s"' % (key, value.replace('"','\\"'))) - else: - newlines.append(line) - contents = '\n'.join(newlines) - return yaml.load(contents) or {} - -def map_save(rcs, path, map, allow_no_rcs=False): - """Save the map as a mapfile to the specified path""" - contents = generate(map) - rcs.set_file_contents(path, contents, allow_no_rcs) - -def map_load(rcs, path, allow_no_rcs=False): - contents = rcs.get_file_contents(path, allow_no_rcs=allow_no_rcs) - return parse(contents) - -suite = doctest.DocTestSuite() diff --git a/libbe/plugin.py b/libbe/plugin.py index 3c7c753..e69de29 100644 --- a/libbe/plugin.py +++ b/libbe/plugin.py @@ -1,72 +0,0 @@ -# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. -# Marien Zwart <marienz@gentoo.org> -# W. Trevor King <wking@drexel.edu> -# <abentley@panoramicfeedback.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -import os -import os.path -import sys -import doctest - -def my_import(mod_name): - module = __import__(mod_name) - components = mod_name.split('.') - for comp in components[1:]: - module = getattr(module, comp) - return module - -def iter_plugins(prefix): - """ - >>> "list" in [n for n,m in iter_plugins("becommands")] - True - >>> "plugin" in [n for n,m in iter_plugins("libbe")] - True - """ - modfiles = os.listdir(os.path.join(plugin_path, prefix)) - modfiles.sort() - for modfile in modfiles: - if modfile.startswith('.'): - continue # the occasional emacs temporary file - if modfile.endswith(".py") and modfile != "__init__.py": - yield modfile[:-3], my_import(prefix+"."+modfile[:-3]) - - -def get_plugin(prefix, name): - """ - >>> get_plugin("becommands", "asdf") is None - True - >>> q = repr(get_plugin("becommands", "list")) - >>> q.startswith("<module 'becommands.list' from ") - True - """ - dirprefix = os.path.join(*prefix.split('.')) - command_path = os.path.join(plugin_path, dirprefix, name+".py") - if os.path.isfile(command_path): - return my_import(prefix + "." + name) - return None - -plugin_path = os.path.realpath(os.path.dirname(os.path.dirname(__file__))) -if plugin_path not in sys.path: - sys.path.append(plugin_path) - -suite = doctest.DocTestSuite() - -def _test(): - import doctest - doctest.testmod() - -if __name__ == "__main__": - _test() diff --git a/libbe/properties.py b/libbe/properties.py index 37204d6..e69de29 100644 --- a/libbe/properties.py +++ b/libbe/properties.py @@ -1,614 +0,0 @@ -# Bugs Everywhere - a distributed bugtracker -# Copyright (C) 2008-2009 W. Trevor King <wking@drexel.edu> -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -""" -This module provides a series of useful decorators for defining -various types of properties. For example usage, consider the -unittests at the end of the module. - -See - http://www.python.org/dev/peps/pep-0318/ -and - http://www.phyast.pitt.edu/~micheles/python/documentation.html -for more information on decorators. -""" - -import copy -import types -import unittest - - -class ValueCheckError (ValueError): - def __init__(self, name, value, allowed): - action = "in" # some list of allowed values - if type(allowed) == types.FunctionType: - action = "allowed by" # some allowed-value check function - msg = "%s not %s %s for %s" % (value, action, allowed, name) - ValueError.__init__(self, msg) - self.name = name - self.value = value - self.allowed = allowed - -def Property(funcs): - """ - End a chain of property decorators, returning a property. - """ - args = {} - args["fget"] = funcs.get("fget", None) - args["fset"] = funcs.get("fset", None) - args["fdel"] = funcs.get("fdel", None) - args["doc"] = funcs.get("doc", None) - - #print "Creating a property with" - #for key, val in args.items(): print key, value - return property(**args) - -def doc_property(doc=None): - """ - Add a docstring to a chain of property decorators. - """ - def decorator(funcs=None): - """ - Takes either a dict of funcs {"fget":fnX, "fset":fnY, ...} - or a function fn() returning such a dict. - """ - if hasattr(funcs, "__call__"): - funcs = funcs() # convert from function-arg to dict - funcs["doc"] = doc - return funcs - return decorator - -def local_property(name, null=None, mutable_null=False): - """ - Define get/set access to per-parent-instance local storage. Uses - ._<name>_value to store the value for a particular owner instance. - If the ._<name>_value attribute does not exist, returns null. - """ - def decorator(funcs): - if hasattr(funcs, "__call__"): - funcs = funcs() - fget = funcs.get("fget", None) - fset = funcs.get("fset", None) - def _fget(self): - if fget is not None: - fget(self) - if mutable_null == True: - ret_null = copy.deepcopy(null) - else: - ret_null = null - value = getattr(self, "_%s_value" % name, ret_null) - return value - def _fset(self, value): - setattr(self, "_%s_value" % name, value) - if fset is not None: - fset(self, value) - funcs["fget"] = _fget - funcs["fset"] = _fset - funcs["name"] = name - return funcs - return decorator - -def settings_property(name, null=None): - """ - Similar to local_property, except where local_property stores the - value in instance._<name>_value, settings_property stores the - value in instance.settings[name]. - """ - def decorator(funcs): - if hasattr(funcs, "__call__"): - funcs = funcs() - fget = funcs.get("fget", None) - fset = funcs.get("fset", None) - def _fget(self): - if fget is not None: - fget(self) - value = self.settings.get(name, null) - return value - def _fset(self, value): - self.settings[name] = value - if fset is not None: - fset(self, value) - funcs["fget"] = _fget - funcs["fset"] = _fset - funcs["name"] = name - return funcs - return decorator - - -# Allow comparison and caching with _original_ values for mutables, -# since -# -# >>> a = [] -# >>> b = a -# >>> b.append(1) -# >>> a -# [1] -# >>> a==b -# True -def _hash_mutable_value(value): - return repr(value) -def _init_mutable_property_cache(self): - if not hasattr(self, "_mutable_property_cache_hash"): - # first call to _fget for any mutable property - self._mutable_property_cache_hash = {} - self._mutable_property_cache_copy = {} -def _set_cached_mutable_property(self, cacher_name, property_name, value): - _init_mutable_property_cache(self) - self._mutable_property_cache_hash[(cacher_name, property_name)] = \ - _hash_mutable_value(value) - self._mutable_property_cache_copy[(cacher_name, property_name)] = \ - copy.deepcopy(value) -def _get_cached_mutable_property(self, cacher_name, property_name, default=None): - _init_mutable_property_cache(self) - if (cacher_name, property_name) not in self._mutable_property_cache_copy: - return default - return self._mutable_property_cache_copy[(cacher_name, property_name)] -def _cmp_cached_mutable_property(self, cacher_name, property_name, value): - _init_mutable_property_cache(self) - if (cacher_name, property_name) not in self._mutable_property_cache_hash: - return 1 # any value > non-existant old hash - old_hash = self._mutable_property_cache_hash[(cacher_name, property_name)] - return cmp(_hash_mutable_value(value), old_hash) - - -def defaulting_property(default=None, null=None, - default_mutable=False, - null_mutable=False): - """ - Define a default value for get access to a property. - If the stored value is null, then default is returned. - """ - def decorator(funcs): - if hasattr(funcs, "__call__"): - funcs = funcs() - fget = funcs.get("fget") - fset = funcs.get("fset") - name = funcs.get("name", "<unknown>") - def _fget(self): - value = fget(self) - if value == null: - if default_mutable == True: - return copy.deepcopy(default) - else: - return default - return value - def _fset(self, value): - if value == default: - if null_mutable == True: - value = copy.deepcopy(null) - else: - value = null - fset(self, value) - funcs["fget"] = _fget - funcs["fset"] = _fset - return funcs - return decorator - -def fn_checked_property(value_allowed_fn): - """ - Define allowed values for get/set access to a property. - """ - def decorator(funcs): - if hasattr(funcs, "__call__"): - funcs = funcs() - fget = funcs.get("fget") - fset = funcs.get("fset") - name = funcs.get("name", "<unknown>") - def _fget(self): - value = fget(self) - if value_allowed_fn(value) != True: - raise ValueCheckError(name, value, value_allowed_fn) - return value - def _fset(self, value): - if value_allowed_fn(value) != True: - raise ValueCheckError(name, value, value_allowed_fn) - fset(self, value) - funcs["fget"] = _fget - funcs["fset"] = _fset - return funcs - return decorator - -def checked_property(allowed=[]): - """ - Define allowed values for get/set access to a property. - """ - def decorator(funcs): - if hasattr(funcs, "__call__"): - funcs = funcs() - fget = funcs.get("fget") - fset = funcs.get("fset") - name = funcs.get("name", "<unknown>") - def _fget(self): - value = fget(self) - if value not in allowed: - raise ValueCheckError(name, value, allowed) - return value - def _fset(self, value): - if value not in allowed: - raise ValueCheckError(name, value, allowed) - fset(self, value) - funcs["fget"] = _fget - funcs["fset"] = _fset - return funcs - return decorator - -def cached_property(generator, initVal=None, mutable=False): - """ - Allow caching of values generated by generator(instance), where - instance is the instance to which this property belongs. Uses - ._<name>_cache to store a cache flag for a particular owner - instance. - - When the cache flag is True or missing and the stored value is - initVal, the first fget call triggers the generator function, - whose output is stored in _<name>_cached_value. That and - subsequent calls to fget will return this cached value. - - If the input value is no longer initVal (e.g. a value has been - loaded from disk or set with fset), that value overrides any - cached value, and this property has no effect. - - When the cache flag is False and the stored value is initVal, the - generator is not cached, but is called on every fget. - - The cache flag is missing on initialization. Particular instances - may override by setting their own flag. - - In the case that mutable == True, all caching is disabled and the - generator is called whenever the cached value would otherwise be - used. This avoids uncertainties in the value of stored mutables. - """ - def decorator(funcs): - if hasattr(funcs, "__call__"): - funcs = funcs() - fget = funcs.get("fget") - name = funcs.get("name", "<unknown>") - def _fget(self): - cache = getattr(self, "_%s_cache" % name, True) - value = fget(self) - if value == initVal: - if cache == True and mutable == False: - if hasattr(self, "_%s_cached_value" % name): - value = getattr(self, "_%s_cached_value" % name) - else: - value = generator(self) - setattr(self, "_%s_cached_value" % name, value) - else: - value = generator(self) - return value - funcs["fget"] = _fget - return funcs - return decorator - -def primed_property(primer, initVal=None): - """ - Just like a generator_property, except that instead of returning a - new value and running fset to cache it, the primer performs some - background manipulation (e.g. loads data into instance.settings) - such that a _second_ pass through fget succeeds. - - The 'cache' flag becomes a 'prime' flag, with priming taking place - whenever ._<name>_prime is True, or is False or missing and - value == initVal. - """ - def decorator(funcs): - if hasattr(funcs, "__call__"): - funcs = funcs() - fget = funcs.get("fget") - name = funcs.get("name", "<unknown>") - def _fget(self): - prime = getattr(self, "_%s_prime" % name, False) - if prime == False: - value = fget(self) - if prime == True or (prime == False and value == initVal): - primer(self) - value = fget(self) - return value - funcs["fget"] = _fget - return funcs - return decorator - -def change_hook_property(hook, mutable=False): - """ - Call the function hook(instance, old_value, new_value) whenever a - value different from the current value is set (instance is a a - reference to the class instance to which this property belongs). - This is useful for saving changes to disk, etc. This function is - called _after_ the new value has been stored, allowing you to - change the stored value if you want. - - """ - def decorator(funcs): - if hasattr(funcs, "__call__"): - funcs = funcs() - fget = funcs.get("fget") - fset = funcs.get("fset") - name = funcs.get("name", "<unknown>") - def _fget(self, new_value=None, from_fset=False): # only used if mutable == True - value = fget(self) - if _cmp_cached_mutable_property(self, "change hook property", name, value) != 0: - # there has been a change, cache new value - old_value = _get_cached_mutable_property(self, "change hook property", name) - _set_cached_mutable_property(self, "change hook property", name, value) - if from_fset == True: # return previously cached value - value = old_value - else: # the value changed while we weren't looking - hook(self, old_value, value) - return value - def _fset(self, value): - if mutable == True: # get cached previous value - old_value = _fget(self, new_value=value, from_fset=True) - else: - old_value = fget(self) - fset(self, value) - if value != old_value: - hook(self, old_value, value) - if mutable == True: - funcs["fget"] = _fget - funcs["fset"] = _fset - return funcs - return decorator - - -class DecoratorTests(unittest.TestCase): - def testLocalDoc(self): - class Test(object): - @Property - @doc_property("A fancy property") - def x(): - return {} - self.failUnless(Test.x.__doc__ == "A fancy property", - Test.x.__doc__) - def testLocalProperty(self): - class Test(object): - @Property - @local_property(name="LOCAL") - def x(): - return {} - t = Test() - self.failUnless(t.x == None, str(t.x)) - t.x = 'z' # the first set initializes ._LOCAL_value - self.failUnless(t.x == 'z', str(t.x)) - self.failUnless("_LOCAL_value" in dir(t), dir(t)) - self.failUnless(t._LOCAL_value == 'z', t._LOCAL_value) - def testSettingsProperty(self): - class Test(object): - @Property - @settings_property(name="attr") - def x(): - return {} - def __init__(self): - self.settings = {} - t = Test() - self.failUnless(t.x == None, str(t.x)) - t.x = 'z' # the first set initializes ._LOCAL_value - self.failUnless(t.x == 'z', str(t.x)) - self.failUnless("attr" in t.settings, t.settings) - self.failUnless(t.settings["attr"] == 'z', t.settings["attr"]) - def testDefaultingLocalProperty(self): - class Test(object): - @Property - @defaulting_property(default='y', null='x') - @local_property(name="DEFAULT", null=5) - def x(): return {} - t = Test() - self.failUnless(t.x == 5, str(t.x)) - t.x = 'x' - self.failUnless(t.x == 'y', str(t.x)) - t.x = 'y' - self.failUnless(t.x == 'y', str(t.x)) - t.x = 'z' - self.failUnless(t.x == 'z', str(t.x)) - t.x = 5 - self.failUnless(t.x == 5, str(t.x)) - def testCheckedLocalProperty(self): - class Test(object): - @Property - @checked_property(allowed=['x', 'y', 'z']) - @local_property(name="CHECKED") - def x(): return {} - def __init__(self): - self._CHECKED_value = 'x' - t = Test() - self.failUnless(t.x == 'x', str(t.x)) - try: - t.x = None - e = None - except ValueCheckError, e: - pass - self.failUnless(type(e) == ValueCheckError, type(e)) - def testTwoCheckedLocalProperties(self): - class Test(object): - @Property - @checked_property(allowed=['x', 'y', 'z']) - @local_property(name="X") - def x(): return {} - - @Property - @checked_property(allowed=['a', 'b', 'c']) - @local_property(name="A") - def a(): return {} - def __init__(self): - self._A_value = 'a' - self._X_value = 'x' - t = Test() - try: - t.x = 'a' - e = None - except ValueCheckError, e: - pass - self.failUnless(type(e) == ValueCheckError, type(e)) - t.x = 'x' - t.x = 'y' - t.x = 'z' - try: - t.a = 'x' - e = None - except ValueCheckError, e: - pass - self.failUnless(type(e) == ValueCheckError, type(e)) - t.a = 'a' - t.a = 'b' - t.a = 'c' - def testFnCheckedLocalProperty(self): - class Test(object): - @Property - @fn_checked_property(lambda v : v in ['x', 'y', 'z']) - @local_property(name="CHECKED") - def x(): return {} - def __init__(self): - self._CHECKED_value = 'x' - t = Test() - self.failUnless(t.x == 'x', str(t.x)) - try: - t.x = None - e = None - except ValueCheckError, e: - pass - self.failUnless(type(e) == ValueCheckError, type(e)) - def testCachedLocalProperty(self): - class Gen(object): - def __init__(self): - self.i = 0 - def __call__(self, owner): - self.i += 1 - return self.i - class Test(object): - @Property - @cached_property(generator=Gen(), initVal=None) - @local_property(name="CACHED") - def x(): return {} - t = Test() - self.failIf("_CACHED_cache" in dir(t), getattr(t, "_CACHED_cache", None)) - self.failUnless(t.x == 1, t.x) - self.failUnless(t.x == 1, t.x) - self.failUnless(t.x == 1, t.x) - t.x = 8 - self.failUnless(t.x == 8, t.x) - self.failUnless(t.x == 8, t.x) - t._CACHED_cache = False # Caching is off, but the stored value - val = t.x # is 8, not the initVal (None), so we - self.failUnless(val == 8, val) # get 8. - t._CACHED_value = None # Now we've set the stored value to None - val = t.x # so future calls to fget (like this) - self.failUnless(val == 2, val) # will call the generator every time... - val = t.x - self.failUnless(val == 3, val) - val = t.x - self.failUnless(val == 4, val) - t._CACHED_cache = True # We turn caching back on, and get - self.failUnless(t.x == 1, str(t.x)) # the original cached value. - del t._CACHED_cached_value # Removing that value forces a - self.failUnless(t.x == 5, str(t.x)) # single cache-regenerating call - self.failUnless(t.x == 5, str(t.x)) # to the genenerator, after which - self.failUnless(t.x == 5, str(t.x)) # we get the new cached value. - def testPrimedLocalProperty(self): - class Test(object): - def prime(self): - self.settings["PRIMED"] = "initialized" - @Property - @primed_property(primer=prime, initVal=None) - @settings_property(name="PRIMED") - def x(): return {} - def __init__(self): - self.settings={} - t = Test() - self.failIf("_PRIMED_prime" in dir(t), getattr(t, "_PRIMED_prime", None)) - self.failUnless(t.x == "initialized", t.x) - t.x = 1 - self.failUnless(t.x == 1, t.x) - t.x = None - self.failUnless(t.x == "initialized", t.x) - t._PRIMED_prime = True - t.x = 3 - self.failUnless(t.x == "initialized", t.x) - t._PRIMED_prime = False - t.x = 3 - self.failUnless(t.x == 3, t.x) - def testChangeHookLocalProperty(self): - class Test(object): - def _hook(self, old, new): - self.old = old - self.new = new - - @Property - @change_hook_property(_hook) - @local_property(name="HOOKED") - def x(): return {} - t = Test() - t.x = 1 - self.failUnless(t.old == None, t.old) - self.failUnless(t.new == 1, t.new) - t.x = 1 - self.failUnless(t.old == None, t.old) - self.failUnless(t.new == 1, t.new) - t.x = 2 - self.failUnless(t.old == 1, t.old) - self.failUnless(t.new == 2, t.new) - def testChangeHookMutableProperty(self): - class Test(object): - def _hook(self, old, new): - self.old = old - self.new = new - self.hook_calls += 1 - - @Property - @change_hook_property(_hook, mutable=True) - @local_property(name="HOOKED") - def x(): return {} - t = Test() - t.hook_calls = 0 - t.x = [] - self.failUnless(t.old == None, t.old) - self.failUnless(t.new == [], t.new) - a = t.x - a.append(5) - t.x = a - self.failUnless(t.old == [], t.old) - self.failUnless(t.new == [5], t.new) - t.x = [] - self.failUnless(t.old == [5], t.old) - self.failUnless(t.new == [], t.new) - # now append without reassigning. this doesn't trigger the - # change, since we don't ever set t.x, only get it and mess - # with it. It does, however, update our t.new, since t.new = - # t.x and is not a static copy. - t.x.append(5) - self.failUnless(t.old == [5], t.old) - self.failUnless(t.new == [5], t.new) - # however, the next t.x get _will_ notice the change... - a = t.x - self.failUnless(t.old == [], t.old) - self.failUnless(t.new == [5], t.new) - self.failUnless(t.hook_calls == 6, t.hook_calls) - t.x.append(6) # this append(6) is not noticed yet - self.failUnless(t.old == [], t.old) - self.failUnless(t.new == [5,6], t.new) - self.failUnless(t.hook_calls == 6, t.hook_calls) - # this append(7) is not noticed, but the t.x get causes the - # append(6) to be noticed - t.x.append(7) - self.failUnless(t.old == [5], t.old) - self.failUnless(t.new == [5,6,7], t.new) - self.failUnless(t.hook_calls == 7, t.hook_calls) - a = t.x # now the append(7) is noticed - self.failUnless(t.old == [5,6], t.old) - self.failUnless(t.new == [5,6,7], t.new) - self.failUnless(t.hook_calls == 8, t.hook_calls) - - -suite = unittest.TestLoader().loadTestsFromTestCase(DecoratorTests) - diff --git a/libbe/rcs.py b/libbe/rcs.py index 2c416f4..e69de29 100644 --- a/libbe/rcs.py +++ b/libbe/rcs.py @@ -1,838 +0,0 @@ -# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. -# Alexander Belchenko <bialix@ukr.net> -# Ben Finney <ben+python@benfinney.id.au> -# Chris Ball <cjb@laptop.org> -# W. Trevor King <wking@drexel.edu> -# <abentley@panoramicfeedback.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -from subprocess import Popen, PIPE -import codecs -import os -import os.path -import re -from socket import gethostname -import shutil -import sys -import tempfile -import unittest -import doctest - -from utility import Dir, search_parent_directories - - -def _get_matching_rcs(matchfn): - """Return the first module for which matchfn(RCS_instance) is true""" - import arch - import bzr - import darcs - import git - import hg - for module in [arch, bzr, darcs, git, hg]: - rcs = module.new() - if matchfn(rcs) == True: - return rcs - del(rcs) - return RCS() - -def rcs_by_name(rcs_name): - """Return the module for the RCS with the given name""" - return _get_matching_rcs(lambda rcs: rcs.name == rcs_name) - -def detect_rcs(dir): - """Return an RCS instance for the rcs being used in this directory""" - return _get_matching_rcs(lambda rcs: rcs.detect(dir)) - -def installed_rcs(): - """Return an instance of an installed RCS""" - return _get_matching_rcs(lambda rcs: rcs.installed()) - - -class CommandError(Exception): - def __init__(self, err_str, status): - Exception.__init__(self, "Command failed (%d): %s" % (status, err_str)) - self.err_str = err_str - self.status = status - -class SettingIDnotSupported(NotImplementedError): - pass - -class RCSnotRooted(Exception): - def __init__(self): - msg = "RCS not rooted" - Exception.__init__(self, msg) - -class PathNotInRoot(Exception): - def __init__(self, path, root): - msg = "Path '%s' not in root '%s'" % (path, root) - Exception.__init__(self, msg) - self.path = path - self.root = root - -class NoSuchFile(Exception): - def __init__(self, pathname, root="."): - path = os.path.abspath(os.path.join(root, pathname)) - Exception.__init__(self, "No such file: %s" % path) - - -def new(): - return RCS() - -class RCS(object): - """ - This class implements a 'no-rcs' interface. - - Support for other RCSs can be added by subclassing this class, and - overriding methods _rcs_*() with code appropriate for your RCS. - - The methods _u_*() are utility methods available to the _rcs_*() - methods. - """ - name = "None" - client = "" # command-line tool for _u_invoke_client - versioned = False - def __init__(self, paranoid=False, encoding=sys.getdefaultencoding()): - self.paranoid = paranoid - self.verboseInvoke = False - self.rootdir = None - self._duplicateBasedir = None - self._duplicateDirname = None - self.encoding = encoding - def __del__(self): - self.cleanup() - - def _rcs_help(self): - """ - Return the command help string. - (Allows a simple test to see if the client is installed.) - """ - pass - def _rcs_detect(self, path=None): - """ - Detect whether a directory is revision controlled with this RCS. - """ - return True - def _rcs_root(self, path): - """ - Get the RCS root. This is the default working directory for - future invocations. You would normally set this to the root - directory for your RCS. - """ - if os.path.isdir(path)==False: - path = os.path.dirname(path) - if path == "": - path = os.path.abspath(".") - return path - def _rcs_init(self, path): - """ - Begin versioning the tree based at path. - """ - pass - def _rcs_cleanup(self): - """ - Remove any cruft that _rcs_init() created outside of the - versioned tree. - """ - pass - def _rcs_get_user_id(self): - """ - Get the RCS's suggested user id (e.g. "John Doe <jdoe@example.com>"). - If the RCS has not been configured with a username, return None. - """ - return None - def _rcs_set_user_id(self, value): - """ - Set the RCS's suggested user id (e.g "John Doe <jdoe@example.com>"). - This is run if the RCS has not been configured with a usename, so - that commits will have a reasonable FROM value. - """ - raise SettingIDnotSupported - def _rcs_add(self, path): - """ - Add the already created file at path to version control. - """ - pass - def _rcs_remove(self, path): - """ - Remove the file at path from version control. Optionally - remove the file from the filesystem as well. - """ - pass - def _rcs_update(self, path): - """ - Notify the versioning system of changes to the versioned file - at path. - """ - pass - def _rcs_get_file_contents(self, path, revision=None, binary=False): - """ - Get the file contents as they were in a given revision. - Revision==None specifies the current revision. - """ - assert revision == None, \ - "The %s RCS does not support revision specifiers" % self.name - if binary == False: - f = codecs.open(os.path.join(self.rootdir, path), "r", self.encoding) - else: - f = open(path, "rb") - contents = f.read() - f.close() - return contents - def _rcs_duplicate_repo(self, directory, revision=None): - """ - Get the repository as it was in a given revision. - revision==None specifies the current revision. - dir specifies a directory to create the duplicate in. - """ - shutil.copytree(self.rootdir, directory, True) - def _rcs_commit(self, commitfile): - """ - Commit the current working directory, using the contents of - commitfile as the comment. Return the name of the old - revision. - """ - return None - def installed(self): - try: - self._rcs_help() - return True - except OSError, e: - if e.errno == errno.ENOENT: - return False - except CommandError: - return False - def detect(self, path="."): - """ - Detect whether a directory is revision controlled with this RCS. - """ - return self._rcs_detect(path) - def root(self, path): - """ - Set the root directory to the path's RCS root. This is the - default working directory for future invocations. - """ - self.rootdir = self._rcs_root(path) - def init(self, path): - """ - Begin versioning the tree based at path. - Also roots the rcs at path. - """ - if os.path.isdir(path)==False: - path = os.path.dirname(path) - self._rcs_init(path) - self.root(path) - def cleanup(self): - self._rcs_cleanup() - def get_user_id(self): - """ - Get the RCS's suggested user id (e.g. "John Doe <jdoe@example.com>"). - If the RCS has not been configured with a username, return the user's - id. You can override the automatic lookup procedure by setting the - RCS.user_id attribute to a string of your choice. - """ - if hasattr(self, "user_id"): - if self.user_id != None: - return self.user_id - id = self._rcs_get_user_id() - if id == None: - name = self._u_get_fallback_username() - email = self._u_get_fallback_email() - id = self._u_create_id(name, email) - print >> sys.stderr, "Guessing id '%s'" % id - try: - self.set_user_id(id) - except SettingIDnotSupported: - pass - return id - def set_user_id(self, value): - """ - Set the RCS's suggested user id (e.g "John Doe <jdoe@example.com>"). - This is run if the RCS has not been configured with a usename, so - that commits will have a reasonable FROM value. - """ - self._rcs_set_user_id(value) - def add(self, path): - """ - Add the already created file at path to version control. - """ - self._rcs_add(self._u_rel_path(path)) - def remove(self, path): - """ - Remove a file from both version control and the filesystem. - """ - self._rcs_remove(self._u_rel_path(path)) - if os.path.exists(path): - os.remove(path) - def recursive_remove(self, dirname): - """ - Remove a file/directory and all its decendents from both - version control and the filesystem. - """ - if not os.path.exists(dirname): - raise NoSuchFile(dirname) - for dirpath,dirnames,filenames in os.walk(dirname, topdown=False): - filenames.extend(dirnames) - for path in filenames: - fullpath = os.path.join(dirpath, path) - if os.path.exists(fullpath) == False: - continue - self._rcs_remove(self._u_rel_path(fullpath)) - if os.path.exists(dirname): - shutil.rmtree(dirname) - def update(self, path): - """ - Notify the versioning system of changes to the versioned file - at path. - """ - self._rcs_update(self._u_rel_path(path)) - def get_file_contents(self, path, revision=None, allow_no_rcs=False, binary=False): - """ - Get the file as it was in a given revision. - Revision==None specifies the current revision. - """ - if not os.path.exists(path): - raise NoSuchFile(path) - if self._use_rcs(path, allow_no_rcs): - relpath = self._u_rel_path(path) - contents = self._rcs_get_file_contents(relpath,revision,binary=binary) - else: - f = codecs.open(path, "r", self.encoding) - contents = f.read() - f.close() - return contents - def set_file_contents(self, path, contents, allow_no_rcs=False, binary=False): - """ - Set the file contents under version control. - """ - add = not os.path.exists(path) - if binary == False: - f = codecs.open(path, "w", self.encoding) - else: - f = open(path, "wb") - f.write(contents) - f.close() - - if self._use_rcs(path, allow_no_rcs): - if add: - self.add(path) - else: - self.update(path) - def mkdir(self, path, allow_no_rcs=False): - """ - Create (if neccessary) a directory at path under version - control. - """ - if not os.path.exists(path): - os.mkdir(path) - if self._use_rcs(path, allow_no_rcs): - self.add(path) - else: - assert os.path.isdir(path) - if self._use_rcs(path, allow_no_rcs): - self.update(path) - def duplicate_repo(self, revision=None): - """ - Get the repository as it was in a given revision. - revision==None specifies the current revision. - Return the path to the arbitrary directory at the base of the new repo. - """ - # Dirname in Baseir to protect against simlink attacks. - if self._duplicateBasedir == None: - self._duplicateBasedir = tempfile.mkdtemp(prefix='BErcs') - self._duplicateDirname = \ - os.path.join(self._duplicateBasedir, "duplicate") - self._rcs_duplicate_repo(directory=self._duplicateDirname, - revision=revision) - return self._duplicateDirname - def remove_duplicate_repo(self): - """ - Clean up a duplicate repo created with duplicate_repo(). - """ - if self._duplicateBasedir != None: - shutil.rmtree(self._duplicateBasedir) - self._duplicateBasedir = None - self._duplicateDirname = None - def commit(self, summary, body=None): - """ - Commit the current working directory, with a commit message - string summary and body. Return the name of the old revision - (or None if versioning is not supported). - """ - if body is not None: - summary += '\n' + body - descriptor, filename = tempfile.mkstemp() - revision = None - try: - temp_file = os.fdopen(descriptor, 'wb') - temp_file.write(summary) - temp_file.flush() - revision = self._rcs_commit(filename) - temp_file.close() - finally: - os.remove(filename) - return revision - def precommit(self, directory): - pass - def postcommit(self, directory): - pass - def _u_invoke(self, args, stdin=None, expect=(0,), cwd=None): - if cwd == None: - cwd = self.rootdir - if self.verboseInvoke == True: - print >> sys.stderr, "%s$ %s" % (cwd, " ".join(args)) - try : - if sys.platform != "win32": - q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd) - else: - # win32 don't have os.execvp() so have to run command in a shell - q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, - shell=True, cwd=cwd) - except OSError, e : - strerror = "%s\nwhile executing %s" % (e.args[1], args) - raise CommandError(strerror, e.args[0]) - output, error = q.communicate(input=stdin) - status = q.wait() - if self.verboseInvoke == True: - print >> sys.stderr, "%d\n%s%s" % (status, output, error) - if status not in expect: - strerror = "%s\nwhile executing %s\n%s" % (args[1], args, error) - raise CommandError(strerror, status) - return status, output, error - def _u_invoke_client(self, *args, **kwargs): - directory = kwargs.get('directory',None) - expect = kwargs.get('expect', (0,)) - stdin = kwargs.get('stdin', None) - cl_args = [self.client] - cl_args.extend(args) - return self._u_invoke(cl_args, stdin=stdin,expect=expect,cwd=directory) - def _u_search_parent_directories(self, path, filename): - """ - Find the file (or directory) named filename in path or in any - of path's parents. - - e.g. - search_parent_directories("/a/b/c", ".be") - will return the path to the first existing file from - /a/b/c/.be - /a/b/.be - /a/.be - /.be - or None if none of those files exist. - """ - return search_parent_directories(path, filename) - def _use_rcs(self, path, allow_no_rcs): - """ - Try and decide if _rcs_add/update/mkdir/etc calls will - succeed. Returns True is we think the rcs_call would - succeeed, and False otherwise. - """ - use_rcs = True - exception = None - if self.rootdir != None: - if self.path_in_root(path) == False: - use_rcs = False - exception = PathNotInRoot(path, self.rootdir) - else: - use_rcs = False - exception = RCSnotRooted - if use_rcs == False and allow_no_rcs==False: - raise exception - return use_rcs - def path_in_root(self, path, root=None): - """ - Return the relative path to path from root. - >>> rcs = new() - >>> rcs.path_in_root("/a.b/c/.be", "/a.b/c") - True - >>> rcs.path_in_root("/a.b/.be", "/a.b/c") - False - """ - if root == None: - if self.rootdir == None: - raise RCSnotRooted - root = self.rootdir - path = os.path.abspath(path) - absRoot = os.path.abspath(root) - absRootSlashedDir = os.path.join(absRoot,"") - if not path.startswith(absRootSlashedDir): - return False - return True - def _u_rel_path(self, path, root=None): - """ - Return the relative path to path from root. - >>> rcs = new() - >>> rcs._u_rel_path("/a.b/c/.be", "/a.b/c") - '.be' - """ - if root == None: - if self.rootdir == None: - raise RCSnotRooted - root = self.rootdir - path = os.path.abspath(path) - absRoot = os.path.abspath(root) - absRootSlashedDir = os.path.join(absRoot,"") - if not path.startswith(absRootSlashedDir): - raise PathNotInRoot(path, absRootSlashedDir) - assert path != absRootSlashedDir, \ - "file %s == root directory %s" % (path, absRootSlashedDir) - relpath = path[len(absRootSlashedDir):] - return relpath - def _u_abspath(self, path, root=None): - """ - Return the absolute path from a path realtive to root. - >>> rcs = new() - >>> rcs._u_abspath(".be", "/a.b/c") - '/a.b/c/.be' - """ - if root == None: - assert self.rootdir != None, "RCS not rooted" - root = self.rootdir - return os.path.abspath(os.path.join(root, path)) - def _u_create_id(self, name, email=None): - """ - >>> rcs = new() - >>> rcs._u_create_id("John Doe", "jdoe@example.com") - 'John Doe <jdoe@example.com>' - >>> rcs._u_create_id("John Doe") - 'John Doe' - """ - assert len(name) > 0 - if email == None or len(email) == 0: - return name - else: - return "%s <%s>" % (name, email) - def _u_parse_id(self, value): - """ - >>> rcs = new() - >>> rcs._u_parse_id("John Doe <jdoe@example.com>") - ('John Doe', 'jdoe@example.com') - >>> rcs._u_parse_id("John Doe") - ('John Doe', None) - >>> try: - ... rcs._u_parse_id("John Doe <jdoe@example.com><what?>") - ... except AssertionError: - ... print "Invalid match" - Invalid match - """ - emailexp = re.compile("(.*) <([^>]*)>(.*)") - match = emailexp.search(value) - if match == None: - email = None - name = value - else: - assert len(match.groups()) == 3 - assert match.groups()[2] == "", match.groups() - email = match.groups()[1] - name = match.groups()[0] - assert name != None - assert len(name) > 0 - return (name, email) - def _u_get_fallback_username(self): - name = None - for envariable in ["LOGNAME", "USERNAME"]: - if os.environ.has_key(envariable): - name = os.environ[envariable] - break - assert name != None - return name - def _u_get_fallback_email(self): - hostname = gethostname() - name = self._u_get_fallback_username() - return "%s@%s" % (name, hostname) - def _u_parse_commitfile(self, commitfile): - """ - Split the commitfile created in self.commit() back into - summary and header lines. - """ - f = codecs.open(commitfile, "r", self.encoding) - summary = f.readline() - body = f.read() - body.lstrip('\n') - if len(body) == 0: - body = None - f.close() - return (summary, body) - - -def setup_rcs_test_fixtures(testcase): - """Set up test fixtures for RCS test case.""" - testcase.rcs = testcase.Class() - testcase.dir = Dir() - testcase.dirname = testcase.dir.path - - rcs_not_supporting_uninitialized_user_id = [] - rcs_not_supporting_set_user_id = ["None", "hg"] - testcase.rcs_supports_uninitialized_user_id = ( - testcase.rcs.name not in rcs_not_supporting_uninitialized_user_id) - testcase.rcs_supports_set_user_id = ( - testcase.rcs.name not in rcs_not_supporting_set_user_id) - - if not testcase.rcs.installed(): - testcase.fail( - "%(name)s RCS not found" % vars(testcase.Class)) - - if testcase.Class.name != "None": - testcase.failIf( - testcase.rcs.detect(testcase.dirname), - "Detected %(name)s RCS before initialising" - % vars(testcase.Class)) - - testcase.rcs.init(testcase.dirname) - - -class RCSTestCase(unittest.TestCase): - """Test cases for base RCS class.""" - - Class = RCS - - def __init__(self, *args, **kwargs): - super(RCSTestCase, self).__init__(*args, **kwargs) - self.dirname = None - - def setUp(self): - super(RCSTestCase, self).setUp() - setup_rcs_test_fixtures(self) - - def tearDown(self): - del(self.rcs) - super(RCSTestCase, self).tearDown() - - def full_path(self, rel_path): - return os.path.join(self.dirname, rel_path) - - -class RCS_init_TestCase(RCSTestCase): - """Test cases for RCS.init method.""" - - def test_detect_should_succeed_after_init(self): - """Should detect RCS in directory after initialization.""" - self.failUnless( - self.rcs.detect(self.dirname), - "Did not detect %(name)s RCS after initialising" - % vars(self.Class)) - - def test_rcs_rootdir_in_specified_root_path(self): - """RCS root directory should be in specified root path.""" - rp = os.path.realpath(self.rcs.rootdir) - dp = os.path.realpath(self.dirname) - rcs_name = self.Class.name - self.failUnless( - dp == rp or rp == None, - "%(rcs_name)s RCS root in wrong dir (%(dp)s %(rp)s)" % vars()) - - -class RCS_get_user_id_TestCase(RCSTestCase): - """Test cases for RCS.get_user_id method.""" - - def test_gets_existing_user_id(self): - """Should get the existing user ID.""" - if not self.rcs_supports_uninitialized_user_id: - return - - user_id = self.rcs.get_user_id() - self.failUnless( - user_id is not None, - "unable to get a user id") - - -class RCS_set_user_id_TestCase(RCSTestCase): - """Test cases for RCS.set_user_id method.""" - - def setUp(self): - super(RCS_set_user_id_TestCase, self).setUp() - - if self.rcs_supports_uninitialized_user_id: - self.prev_user_id = self.rcs.get_user_id() - else: - self.prev_user_id = "Uninitialized identity <bogus@example.org>" - - if self.rcs_supports_set_user_id: - self.test_new_user_id = "John Doe <jdoe@example.com>" - self.rcs.set_user_id(self.test_new_user_id) - - def tearDown(self): - if self.rcs_supports_set_user_id: - self.rcs.set_user_id(self.prev_user_id) - super(RCS_set_user_id_TestCase, self).tearDown() - - def test_raises_error_in_unsupported_vcs(self): - """Should raise an error in a VCS that doesn't support it.""" - if self.rcs_supports_set_user_id: - return - self.assertRaises( - SettingIDnotSupported, - self.rcs.set_user_id, "foo") - - def test_updates_user_id_in_supporting_rcs(self): - """Should update the user ID in an RCS that supports it.""" - if not self.rcs_supports_set_user_id: - return - user_id = self.rcs.get_user_id() - self.failUnlessEqual( - self.test_new_user_id, user_id, - "user id not set correctly (expected %s, got %s)" - % (self.test_new_user_id, user_id)) - - -def setup_rcs_revision_test_fixtures(testcase): - """Set up revision test fixtures for RCS test case.""" - testcase.test_dirs = ['a', 'a/b', 'c'] - for path in testcase.test_dirs: - testcase.rcs.mkdir(testcase.full_path(path)) - - testcase.test_files = ['a/text', 'a/b/text'] - - testcase.test_contents = { - 'rev_1': "Lorem ipsum", - 'uncommitted': "dolor sit amet", - } - - -class RCS_mkdir_TestCase(RCSTestCase): - """Test cases for RCS.mkdir method.""" - - def setUp(self): - super(RCS_mkdir_TestCase, self).setUp() - setup_rcs_revision_test_fixtures(self) - - def tearDown(self): - for path in reversed(sorted(self.test_dirs)): - self.rcs.recursive_remove(self.full_path(path)) - super(RCS_mkdir_TestCase, self).tearDown() - - def test_mkdir_creates_directory(self): - """Should create specified directory in filesystem.""" - for path in self.test_dirs: - full_path = self.full_path(path) - self.failUnless( - os.path.exists(full_path), - "path %(full_path)s does not exist" % vars()) - - -class RCS_commit_TestCase(RCSTestCase): - """Test cases for RCS.commit method.""" - - def setUp(self): - super(RCS_commit_TestCase, self).setUp() - setup_rcs_revision_test_fixtures(self) - - def tearDown(self): - for path in reversed(sorted(self.test_dirs)): - self.rcs.recursive_remove(self.full_path(path)) - super(RCS_commit_TestCase, self).tearDown() - - def test_file_contents_as_specified(self): - """Should set file contents as specified.""" - test_contents = self.test_contents['rev_1'] - for path in self.test_files: - full_path = self.full_path(path) - self.rcs.set_file_contents(full_path, test_contents) - current_contents = self.rcs.get_file_contents(full_path) - self.failUnlessEqual(test_contents, current_contents) - - def test_file_contents_as_committed(self): - """Should have file contents as specified after commit.""" - test_contents = self.test_contents['rev_1'] - for path in self.test_files: - full_path = self.full_path(path) - self.rcs.set_file_contents(full_path, test_contents) - revision = self.rcs.commit("Initial file contents.") - current_contents = self.rcs.get_file_contents(full_path) - self.failUnlessEqual(test_contents, current_contents) - - def test_file_contents_as_set_when_uncommitted(self): - """Should set file contents as specified after commit.""" - if not self.rcs.versioned: - return - for path in self.test_files: - full_path = self.full_path(path) - self.rcs.set_file_contents( - full_path, self.test_contents['rev_1']) - revision = self.rcs.commit("Initial file contents.") - self.rcs.set_file_contents( - full_path, self.test_contents['uncommitted']) - current_contents = self.rcs.get_file_contents(full_path) - self.failUnlessEqual( - self.test_contents['uncommitted'], current_contents) - - def test_revision_file_contents_as_committed(self): - """Should get file contents as committed to specified revision.""" - if not self.rcs.versioned: - return - for path in self.test_files: - full_path = self.full_path(path) - self.rcs.set_file_contents( - full_path, self.test_contents['rev_1']) - revision = self.rcs.commit("Initial file contents.") - self.rcs.set_file_contents( - full_path, self.test_contents['uncommitted']) - committed_contents = self.rcs.get_file_contents( - full_path, revision) - self.failUnlessEqual( - self.test_contents['rev_1'], committed_contents) - - -class RCS_duplicate_repo_TestCase(RCSTestCase): - """Test cases for RCS.duplicate_repo method.""" - - def setUp(self): - super(RCS_duplicate_repo_TestCase, self).setUp() - setup_rcs_revision_test_fixtures(self) - - def tearDown(self): - self.rcs.remove_duplicate_repo() - for path in reversed(sorted(self.test_dirs)): - self.rcs.recursive_remove(self.full_path(path)) - super(RCS_duplicate_repo_TestCase, self).tearDown() - - def test_revision_file_contents_as_committed(self): - """Should match file contents as committed to specified revision.""" - if not self.rcs.versioned: - return - for path in self.test_files: - full_path = self.full_path(path) - self.rcs.set_file_contents( - full_path, self.test_contents['rev_1']) - revision = self.rcs.commit("Commit current status") - self.rcs.set_file_contents( - full_path, self.test_contents['uncommitted']) - dup_repo_path = self.rcs.duplicate_repo(revision) - dup_file_path = os.path.join(dup_repo_path, path) - dup_file_contents = file(dup_file_path, 'rb').read() - self.failUnlessEqual( - self.test_contents['rev_1'], dup_file_contents) - self.rcs.remove_duplicate_repo() - - -def make_rcs_testcase_subclasses(rcs_class, namespace): - """Make RCSTestCase subclasses for rcs_class in the namespace.""" - rcs_testcase_classes = [ - c for c in ( - ob for ob in globals().values() if isinstance(ob, type)) - if issubclass(c, RCSTestCase)] - - for base_class in rcs_testcase_classes: - testcase_class_name = rcs_class.__name__ + base_class.__name__ - testcase_class_bases = (base_class,) - testcase_class_dict = dict(base_class.__dict__) - testcase_class_dict['Class'] = rcs_class - testcase_class = type( - testcase_class_name, testcase_class_bases, testcase_class_dict) - setattr(namespace, testcase_class_name, testcase_class) - - -unitsuite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) -suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/settings_object.py b/libbe/settings_object.py index 7326d1b..e69de29 100644 --- a/libbe/settings_object.py +++ b/libbe/settings_object.py @@ -1,411 +0,0 @@ -# Bugs Everywhere - a distributed bugtracker -# Copyright (C) 2008-2009 W. Trevor King <wking@drexel.edu> -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -""" -This module provides a base class implementing settings-dict based -property storage useful for BE objects with saved properties -(e.g. BugDir, Bug, Comment). For example usage, consider the -unittests at the end of the module. -""" - -import doctest -import unittest - -from properties import Property, doc_property, local_property, \ - defaulting_property, checked_property, fn_checked_property, \ - cached_property, primed_property, change_hook_property, \ - settings_property - - -class _Token (object): - """ - `Control' value class for properties. We want values that only - mean something to the settings_object module. - """ - pass - -class UNPRIMED (_Token): - "Property has not been primed." - pass - -class EMPTY (_Token): - """ - Property has been primed but has no user-set value, so use - default/generator value. - """ - pass - - -def prop_save_settings(self, old, new): - """ - The default action undertaken when a property changes. - """ - if self.sync_with_disk==True: - self.save_settings() - -def prop_load_settings(self): - """ - The default action undertaken when an UNPRIMED property is accessed. - """ - if self.sync_with_disk==True and self._settings_loaded==False: - self.load_settings() - else: - self._setup_saved_settings(flag_as_loaded=False) - -# Some name-mangling routines for pretty printing setting names -def setting_name_to_attr_name(self, name): - """ - Convert keys to the .settings dict into their associated - SavedSettingsObject attribute names. - >>> print setting_name_to_attr_name(None,"User-id") - user_id - """ - return name.lower().replace('-', '_') - -def attr_name_to_setting_name(self, name): - """ - The inverse of setting_name_to_attr_name. - >>> print attr_name_to_setting_name(None, "user_id") - User-id - """ - return name.capitalize().replace('_', '-') - - -def versioned_property(name, doc, - default=None, generator=None, - change_hook=prop_save_settings, - mutable=False, - primer=prop_load_settings, - allowed=None, check_fn=None, - settings_properties=[], - required_saved_properties=[], - require_save=False): - """ - Combine the common decorators in a single function. - - Use zero or one (but not both) of default or generator, since a - working default will keep the generator from functioning. Use the - default if you know what you want the default value to be at - 'coding time'. Use the generator if you can write a function to - determine a valid default at run time. If both default and - generator are None, then the property will be a defaulting - property which defaults to None. - - allowed and check_fn have a similar relationship, although you can - use both of these if you want. allowed compares the proposed - value against a list determined at 'coding time' and check_fn - allows more flexible comparisons to take place at run time. - - Set require_save to True if you want to save the default/generated - value for a property, to protect against future changes. E.g., we - currently expect all comments to be 'text/plain' but in the future - we may want to default to 'text/html'. If we don't want the old - comments to be interpreted as 'text/html', we would require that - the content type be saved. - - change_hook, primer, settings_properties, and - required_saved_properties are only options to get their defaults - into our local scope. Don't mess with them. - """ - settings_properties.append(name) - if require_save == True: - required_saved_properties.append(name) - def decorator(funcs): - fulldoc = doc - if default != None: - defaulting = defaulting_property(default=default, null=EMPTY, - default_mutable=mutable) - fulldoc += "\n\nThis property defaults to %s" % default - if generator != None: - cached = cached_property(generator=generator, initVal=EMPTY, - mutable=mutable) - fulldoc += "\n\nThis property is generated with %s" % generator - if check_fn != None: - fn_checked = fn_checked_property(value_allowed_fn=check_fn) - fulldoc += "\n\nThis property is checked with %s" % check_fn - if allowed != None: - checked = checked_property(allowed=allowed) - fulldoc += "\n\nThe allowed values for this property are: %s." \ - % (', '.join(allowed)) - hooked = change_hook_property(hook=change_hook, mutable=mutable) - primed = primed_property(primer=primer, initVal=UNPRIMED) - settings = settings_property(name=name, null=UNPRIMED) - docp = doc_property(doc=fulldoc) - deco = hooked(primed(settings(docp(funcs)))) - if default != None: - deco = defaulting(deco) - if generator != None: - deco = cached(deco) - if check_fn != None: - deco = fn_checked(deco) - if allowed != None: - deco = checked(deco) - return Property(deco) - return decorator - -class SavedSettingsObject(object): - - # Keep a list of properties that may be stored in the .settings dict. - #settings_properties = [] - - # A list of properties that we save to disk, even if they were - # never set (in which case we save the default value). This - # protects against future changes in default values. - #required_saved_properties = [] - - _setting_name_to_attr_name = setting_name_to_attr_name - _attr_name_to_setting_name = attr_name_to_setting_name - - def __init__(self): - self._settings_loaded = False - self.sync_with_disk = False - self.settings = {} - - def load_settings(self): - """Load the settings from disk.""" - # Override. Must call ._setup_saved_settings() after loading. - self.settings = {} - self._setup_saved_settings() - - def _setup_saved_settings(self, flag_as_loaded=True): - """ - To be run after setting self.settings up from disk. Marks all - settings as primed. - """ - for property in self.settings_properties: - if property not in self.settings: - self.settings[property] = EMPTY - elif self.settings[property] == UNPRIMED: - self.settings[property] = EMPTY - if flag_as_loaded == True: - self._settings_loaded = True - - def save_settings(self): - """Load the settings from disk.""" - # Override. Should save the dict output of ._get_saved_settings() - settings = self._get_saved_settings() - pass # write settings to disk.... - - def _get_saved_settings(self): - settings = {} - for k,v in self.settings.items(): - if v != None and v != EMPTY: - settings[k] = v - for k in self.required_saved_properties: - settings[k] = getattr(self, self._setting_name_to_attr_name(k)) - return settings - - def clear_cached_setting(self, setting=None): - "If setting=None, clear *all* cached settings" - if setting != None: - if hasattr(self, "_%s_cached_value" % setting): - delattr(self, "_%s_cached_value" % setting) - else: - for setting in settings_properties: - self.clear_cached_setting(setting) - - -class SavedSettingsObjectTests(unittest.TestCase): - def testSimpleProperty(self): - """Testing a minimal versioned property""" - class Test(SavedSettingsObject): - settings_properties = [] - required_saved_properties = [] - @versioned_property(name="Content-type", - doc="A test property", - settings_properties=settings_properties, - required_saved_properties=required_saved_properties) - def content_type(): return {} - def __init__(self): - SavedSettingsObject.__init__(self) - t = Test() - # access missing setting - self.failUnless(t._settings_loaded == False, t._settings_loaded) - self.failUnless(len(t.settings) == 0, len(t.settings)) - self.failUnless(t.content_type == EMPTY, t.content_type) - # accessing t.content_type triggers the priming, which runs - # t._setup_saved_settings, which fills out t.settings with - # EMPTY data. t._settings_loaded is still false though, since - # the default priming does not do any of the `official' loading - # that occurs in t.load_settings. - self.failUnless(len(t.settings) == 1, len(t.settings)) - self.failUnless(t.settings["Content-type"] == EMPTY, - t.settings["Content-type"]) - self.failUnless(t._settings_loaded == False, t._settings_loaded) - # load settings creates an EMPTY value in the settings array - t.load_settings() - self.failUnless(t._settings_loaded == True, t._settings_loaded) - self.failUnless(t.settings["Content-type"] == EMPTY, - t.settings["Content-type"]) - self.failUnless(t.content_type == EMPTY, t.content_type) - self.failUnless(len(t.settings) == 1, len(t.settings)) - self.failUnless(t.settings["Content-type"] == EMPTY, - t.settings["Content-type"]) - # now we set a value - t.content_type = None - self.failUnless(t.settings["Content-type"] == None, - t.settings["Content-type"]) - self.failUnless(t.content_type == None, t.content_type) - self.failUnless(t.settings["Content-type"] == None, - t.settings["Content-type"]) - # now we set another value - t.content_type = "text/plain" - self.failUnless(t.content_type == "text/plain", t.content_type) - self.failUnless(t.settings["Content-type"] == "text/plain", - t.settings["Content-type"]) - self.failUnless(t._get_saved_settings()=={"Content-type":"text/plain"}, - t._get_saved_settings()) - # now we clear to the post-primed value - t.content_type = EMPTY - self.failUnless(t._settings_loaded == True, t._settings_loaded) - self.failUnless(t.settings["Content-type"] == EMPTY, - t.settings["Content-type"]) - self.failUnless(t.content_type == EMPTY, t.content_type) - self.failUnless(len(t.settings) == 1, len(t.settings)) - self.failUnless(t.settings["Content-type"] == EMPTY, - t.settings["Content-type"]) - def testDefaultingProperty(self): - """Testing a defaulting versioned property""" - class Test(SavedSettingsObject): - settings_properties = [] - required_saved_properties = [] - @versioned_property(name="Content-type", - doc="A test property", - default="text/plain", - settings_properties=settings_properties, - required_saved_properties=required_saved_properties) - def content_type(): return {} - def __init__(self): - SavedSettingsObject.__init__(self) - t = Test() - self.failUnless(t._settings_loaded == False, t._settings_loaded) - self.failUnless(t.content_type == "text/plain", t.content_type) - self.failUnless(t._settings_loaded == False, t._settings_loaded) - t.load_settings() - self.failUnless(t._settings_loaded == True, t._settings_loaded) - self.failUnless(t.content_type == "text/plain", t.content_type) - self.failUnless(t.settings["Content-type"] == EMPTY, - t.settings["Content-type"]) - self.failUnless(t._get_saved_settings() == {}, t._get_saved_settings()) - t.content_type = "text/html" - self.failUnless(t.content_type == "text/html", - t.content_type) - self.failUnless(t.settings["Content-type"] == "text/html", - t.settings["Content-type"]) - self.failUnless(t._get_saved_settings()=={"Content-type":"text/html"}, - t._get_saved_settings()) - def testRequiredDefaultingProperty(self): - """Testing a required defaulting versioned property""" - class Test(SavedSettingsObject): - settings_properties = [] - required_saved_properties = [] - @versioned_property(name="Content-type", - doc="A test property", - default="text/plain", - settings_properties=settings_properties, - required_saved_properties=required_saved_properties, - require_save=True) - def content_type(): return {} - def __init__(self): - SavedSettingsObject.__init__(self) - t = Test() - self.failUnless(t._get_saved_settings()=={"Content-type":"text/plain"}, - t._get_saved_settings()) - t.content_type = "text/html" - self.failUnless(t._get_saved_settings()=={"Content-type":"text/html"}, - t._get_saved_settings()) - def testClassVersionedPropertyDefinition(self): - """Testing a class-specific _versioned property decorator""" - class Test(SavedSettingsObject): - settings_properties = [] - required_saved_properties = [] - def _versioned_property(settings_properties=settings_properties, - required_saved_properties=required_saved_properties, - **kwargs): - if "settings_properties" not in kwargs: - kwargs["settings_properties"] = settings_properties - if "required_saved_properties" not in kwargs: - kwargs["required_saved_properties"]=required_saved_properties - return versioned_property(**kwargs) - @_versioned_property(name="Content-type", - doc="A test property", - default="text/plain", - require_save=True) - def content_type(): return {} - def __init__(self): - SavedSettingsObject.__init__(self) - t = Test() - self.failUnless(t._get_saved_settings()=={"Content-type":"text/plain"}, - t._get_saved_settings()) - t.content_type = "text/html" - self.failUnless(t._get_saved_settings()=={"Content-type":"text/html"}, - t._get_saved_settings()) - def testMutableChangeHookedProperty(self): - """Testing a mutable change-hooked property""" - SAVES = [] - def prop_log_save_settings(self, old, new, saves=SAVES): - saves.append("'%s' -> '%s'" % (str(old), str(new))) - prop_save_settings(self, old, new) - class Test(SavedSettingsObject): - settings_properties = [] - required_saved_properties = [] - @versioned_property(name="List-type", - doc="A test property", - mutable=True, - change_hook=prop_log_save_settings, - settings_properties=settings_properties, - required_saved_properties=required_saved_properties) - def list_type(): return {} - def __init__(self): - SavedSettingsObject.__init__(self) - t = Test() - self.failUnless(t._settings_loaded == False, t._settings_loaded) - t.load_settings() - self.failUnless(SAVES == [], SAVES) - self.failUnless(t._settings_loaded == True, t._settings_loaded) - self.failUnless(t.list_type == EMPTY, t.list_type) - self.failUnless(SAVES == [ - "'None' -> '<class 'libbe.settings_object.EMPTY'>'" - ], SAVES) - self.failUnless(t.settings["List-type"]==EMPTY,t.settings["List-type"]) - t.list_type = [] - self.failUnless(t.settings["List-type"] == [], t.settings["List-type"]) - self.failUnless(SAVES == [ - "'None' -> '<class 'libbe.settings_object.EMPTY'>'", - "'<class 'libbe.settings_object.EMPTY'>' -> '[]'" - ], SAVES) - t.list_type.append(5) - self.failUnless(SAVES == [ - "'None' -> '<class 'libbe.settings_object.EMPTY'>'", - "'<class 'libbe.settings_object.EMPTY'>' -> '[]'", - "'<class 'libbe.settings_object.EMPTY'>' -> '[]'" # <- TODO. Where did this come from? - ], SAVES) - self.failUnless(t.settings["List-type"] == [5],t.settings["List-type"]) - self.failUnless(SAVES == [ # the append(5) has not yet been saved - "'None' -> '<class 'libbe.settings_object.EMPTY'>'", - "'<class 'libbe.settings_object.EMPTY'>' -> '[]'", - "'<class 'libbe.settings_object.EMPTY'>' -> '[]'", - ], SAVES) - self.failUnless(t.list_type == [5], t.list_type) # <-get triggers saved - self.failUnless(SAVES == [ # now the append(5) has been saved. - "'None' -> '<class 'libbe.settings_object.EMPTY'>'", - "'<class 'libbe.settings_object.EMPTY'>' -> '[]'", - "'<class 'libbe.settings_object.EMPTY'>' -> '[]'", - "'[]' -> '[5]'" - ], SAVES) - -unitsuite=unittest.TestLoader().loadTestsFromTestCase(SavedSettingsObjectTests) -suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/tree.py b/libbe/tree.py index 54b927e..e69de29 100644 --- a/libbe/tree.py +++ b/libbe/tree.py @@ -1,161 +0,0 @@ -# Bugs Everywhere, a distributed bugtracker -# Copyright (C) 2008-2009 W. Trevor King <wking@drexel.edu> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA -# 02110-1301, USA - -import doctest - -class Tree(list): - """ - Construct - +-b---d-g - a-+ +-e - +-c-+-f-h-i - with - >>> i = Tree(); i.n = "i" - >>> h = Tree([i]); h.n = "h" - >>> f = Tree([h]); f.n = "f" - >>> e = Tree(); e.n = "e" - >>> c = Tree([f,e]); c.n = "c" - >>> g = Tree(); g.n = "g" - >>> d = Tree([g]); d.n = "d" - >>> b = Tree([d]); b.n = "b" - >>> a = Tree(); a.n = "a" - >>> a.append(c) - >>> a.append(b) - - >>> a.branch_len() - 5 - >>> a.sort(key=lambda node : -node.branch_len()) - >>> "".join([node.n for node in a.traverse()]) - 'acfhiebdg' - >>> a.sort(key=lambda node : node.branch_len()) - >>> "".join([node.n for node in a.traverse()]) - 'abdgcefhi' - >>> "".join([node.n for node in a.traverse(depthFirst=False)]) - 'abcdefghi' - >>> for depth,node in a.thread(): - ... print "%*s" % (2*depth+1, node.n) - a - b - d - g - c - e - f - h - i - >>> for depth,node in a.thread(flatten=True): - ... print "%*s" % (2*depth+1, node.n) - a - b - d - g - c - e - f - h - i - """ - def branch_len(self): - """ - Exhaustive search every time == SLOW. - - Use only on small trees, or reimplement by overriding - child-addition methods to allow accurate caching. - - For the tree - +-b---d-g - a-+ +-e - +-c-+-f-h-i - this method returns 5. - """ - if len(self) == 0: - return 1 - else: - return 1 + max([child.branch_len() for child in self]) - - def sort(self, *args, **kwargs): - """ - This method can be slow, e.g. on a branch_len() sort, since a - node at depth N from the root has it's branch_len() method - called N times. - """ - list.sort(self, *args, **kwargs) - for child in self: - child.sort(*args, **kwargs) - - def traverse(self, depthFirst=True): - """ - Note: you might want to sort() your tree first. - """ - if depthFirst == True: - yield self - for child in self: - for descendant in child.traverse(): - yield descendant - else: # breadth first, Wikipedia algorithm - # http://en.wikipedia.org/wiki/Breadth-first_search - queue = [self] - while len(queue) > 0: - node = queue.pop(0) - yield node - queue.extend(node) - - def thread(self, flatten=False): - """ - When flatten==False, the depth of any node is one greater than - the depth of its parent. That way the inheritance is - explicit, but you can end up with highly indented threads. - - When flatten==True, the depth of any node is only greater than - the depth of its parent when there is a branch, and the node - is not the last child. This can lead to ancestry ambiguity, - but keeps the total indentation down. E.g. - +-b +-b-c - a-+-c and a-+ - +-d-e-f +-d-e-f - would both produce (after sorting by branch_len()) - (0, a) - (1, b) - (1, c) - (0, d) - (0, e) - (0, f) - """ - stack = [] # ancestry of the current node - if flatten == True: - depthDict = {} - - for node in self.traverse(depthFirst=True): - while len(stack) > 0 \ - and id(node) not in [id(c) for c in stack[-1]]: - stack.pop(-1) - if flatten == False: - depth = len(stack) - else: - if len(stack) == 0: - depth = 0 - else: - parent = stack[-1] - depth = depthDict[id(parent)] - if len(parent) > 1 and node != parent[-1]: - depth += 1 - depthDict[id(node)] = depth - yield (depth,node) - stack.append(node) - -suite = doctest.DocTestSuite() diff --git a/libbe/utility.py b/libbe/utility.py index 8a0f318..e69de29 100644 --- a/libbe/utility.py +++ b/libbe/utility.py @@ -1,93 +0,0 @@ -# Copyright (C) 2005-2009 Aaron Bentley and Panometrics, Inc. -# W. Trevor King <wking@drexel.edu> -# <abentley@panoramicfeedback.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -import calendar -import codecs -import os -import shutil -import tempfile -import time -import doctest - - -def search_parent_directories(path, filename): - """ - Find the file (or directory) named filename in path or in any - of path's parents. - - e.g. - search_parent_directories("/a/b/c", ".be") - will return the path to the first existing file from - /a/b/c/.be - /a/b/.be - /a/.be - /.be - or None if none of those files exist. - """ - path = os.path.realpath(path) - assert os.path.exists(path) - old_path = None - while True: - check_path = os.path.join(path, filename) - if os.path.exists(check_path): - return check_path - if path == old_path: - return None - old_path = path - path = os.path.dirname(path) - -class Dir (object): - "A temporary directory for testing use" - def __init__(self): - self.path = tempfile.mkdtemp(prefix="BEtest") - self.rmtree = shutil.rmtree # save local reference for __del__ - self.removed = False - def __del__(self): - self.cleanup() - def cleanup(self): - if self.removed == False: - self.rmtree(self.path) - self.removed = True - def __call__(self): - return self.path - -RFC_2822_TIME_FMT = "%a, %d %b %Y %H:%M:%S +0000" - - -def time_to_str(time_val): - """Convert a time value into an RFC 2822-formatted string. This format - lacks sub-second data. - >>> time_to_str(0) - 'Thu, 01 Jan 1970 00:00:00 +0000' - """ - return time.strftime(RFC_2822_TIME_FMT, time.gmtime(time_val)) - -def str_to_time(str_time): - """Convert an RFC 2822-fomatted string into a time falue. - >>> str_to_time("Thu, 01 Jan 1970 00:00:00 +0000") - 0 - >>> q = time.time() - >>> str_to_time(time_to_str(q)) == int(q) - True - """ - return calendar.timegm(time.strptime(str_time, RFC_2822_TIME_FMT)) - -def handy_time(time_val): - return time.strftime("%a, %d %b %Y %H:%M", time.localtime(time_val)) - - -suite = doctest.DocTestSuite() |