diff options
author | W. Trevor King <wking@drexel.edu> | 2008-11-18 20:42:50 -0500 |
---|---|---|
committer | W. Trevor King <wking@drexel.edu> | 2008-11-18 20:42:50 -0500 |
commit | 19b153b9a86377a2b30cc80fa3f475fed892e2fe (patch) | |
tree | 8f5688707ab1b34ffec2bc4372d087580ff21709 /libbe | |
parent | e4018dfe8cfa553adbd20898c5b42c3462ca1733 (diff) | |
download | bugseverywhere-19b153b9a86377a2b30cc80fa3f475fed892e2fe.tar.gz |
Major rewrite of RCS backends. RCS now represented as a class.
Lots of changes and just one commit. This started with bug
dac91856-cb6a-4f69-8c03-38ff0b29aab2, when I noticed that new bugs
were not being added appropriately with the Git backend. I'd been
working with Git trouble before with bug
0cad2ac6-76ef-4a88-abdf-b2e02de76f5c, and decided things would be
better off if I just scrapped the current RCS architecture and went to
a more object oriented setup. So I did. It's not clear how to add
support for an RCS backend:
* Create a new module that
- defines an inheritor of rsc.RCS, overriding the _rcs_*() methods
- provide a new() function for instantizating the new class
- defines an inheritor of rcs.RCStestCase, overiding the Class attribute
- defines 'suite' a unittest.TestSuite testing the module
* Add your new module to the rest in rcs._get_matching_rcs()
* Add your new module to the rest in libbe/tests.py
Although I'm not sure libbe/tests.py is still usefull.
The new framework clears out a bunch of hackery that used to be
involved with supporting becommands/diff.py. There's still room for
progress though. While implementing the new verision, I moved the
testing framework over from doctest to a doctest/unittest combination.
Longer tests that don't demonstrate a function's usage should be moved
to unittests at the end of the module, since unittest has better
support for setup/teardown, etc.
The new framework also revealed some underimplented backends, most
notably arch. These backends have now been fixed.
I also tweaked the test_usage.sh script to run through all the backends
if it is called with no arguments.
The fix for the dac bug turned out to be an unflushed file write :p.
Diffstat (limited to 'libbe')
-rw-r--r-- | libbe/arch.py | 357 | ||||
-rw-r--r-- | libbe/bug.py | 53 | ||||
-rw-r--r-- | libbe/bugdir.py | 97 | ||||
-rw-r--r-- | libbe/bzr.py | 186 | ||||
-rw-r--r-- | libbe/cmdutil.py | 15 | ||||
-rw-r--r-- | libbe/config.py | 4 | ||||
-rw-r--r-- | libbe/diff.py | 11 | ||||
-rw-r--r-- | libbe/git.py | 218 | ||||
-rw-r--r-- | libbe/hg.py | 167 | ||||
-rw-r--r-- | libbe/mapfile.py | 8 | ||||
-rw-r--r-- | libbe/names.py | 4 | ||||
-rw-r--r-- | libbe/no_rcs.py | 51 | ||||
-rw-r--r-- | libbe/plugin.py | 4 | ||||
-rw-r--r-- | libbe/rcs.py | 578 | ||||
-rw-r--r-- | libbe/restconvert.py | 4 | ||||
-rw-r--r-- | libbe/tests.py | 46 | ||||
-rw-r--r-- | libbe/utility.py | 12 |
17 files changed, 1123 insertions, 692 deletions
diff --git a/libbe/arch.py b/libbe/arch.py index 001f852..8e7390d 100644 --- a/libbe/arch.py +++ b/libbe/arch.py @@ -15,184 +15,209 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import os -import config -import errno +import shutil +import time +import re +import unittest +import doctest -from rcs import invoke +import config +from rcs import RCS, RCStestCase, CommandError client = config.get_val("arch_client") if client is None: client = "tla" config.set_val("arch_client", client) - -def invoke_client(*args, **kwargs): - cl_args = [client] - cl_args.extend(args) - status,output,error = invoke(cl_args) - if status not in (0,): - raise Exception("Command failed: %s" % error) - return output - -def get_user_id(): - try: - return invoke_client('my-id') - except Exception, e: - if 'no arch user id set' in e.args[0]: - return None +def new(): + return Arch() + +class Arch(RCS): + name = "Arch" + client = client + versioned = True + _archive_name = None + _archive_dir = None + _tmp_archive = False + _project_name = None + _tmp_project = False + _arch_paramdir = os.path.expanduser("~/.arch-params") + def _rcs_help(self): + status,output,error = self._u_invoke_client("--help") + return output + def _rcs_detect(self, path): + """Detect whether a directory is revision-controlled using Arch""" + if self._u_search_parent_directories(path, "{arch}") != None : + return True + return False + def _rcs_root(self, path): + if not os.path.isdir(path): + dirname = os.path.dirname(path) else: - raise - - -def set_user_id(value): - invoke_client('my-id', value) - - -def ensure_user_id(): - if get_user_id() is None: - set_user_id('nobody <nobody@example.com>') - - -def write_tree_settings(contents, path): - file(os.path.join(path, "{arch}", "=tagging-method"), "wb").write(contents) - -def init_tree(path): - invoke_client("init-tree", "-d", path) - -def temp_arch_tree(type="easy"): - import tempfile - ensure_user_id() - path = tempfile.mkdtemp() - init_tree(path) - if type=="easy": - write_tree_settings("source ^.*$\n", path) - elif type=="tricky": - write_tree_settings("source ^$\n", path) - else: - assert (type=="impossible") - add_dir_rule("precious ^\.boo$", path, path) - return path - -def list_added(root): - assert os.path.exists(root) - assert os.access(root, os.X_OK) - root = os.path.realpath(root) - inv_str = invoke_client("inventory", "--source", '--both', '--all', root) - return [os.path.join(root, p) for p in inv_str.split('\n')] - -def tree_root(filename): - assert os.path.exists(filename) - if not os.path.isdir(filename): - dirname = os.path.dirname(filename) - else: - dirname = filename - return invoke_client("tree-root", dirname).rstrip('\n') - -def rel_filename(filename, root): - filename = os.path.realpath(filename) - root = os.path.realpath(root) - assert(filename.startswith(root)) - return filename[len(root)+1:] + dirname = path + status,output,error = self._u_invoke_client("tree-root", dirname) + # get archive name... + return output.rstrip('\n') + def _rcs_init(self, path): + self._create_archive(path) + self._create_project(path) + self._add_project_code(path) + def _create_archive(self, path): + # Create a new archive + # http://regexps.srparish.net/tutorial-tla/new-archive.html#Creating_a_New_Archive + assert self._archive_name == None + id = self.get_user_id() + name, email = self._u_parse_id(id) + if email == None: + email = "%s@example.com" % name + trailer = "%s-%s" % ("bugs-everywhere-auto", + time.strftime("%Y.%H.%M.%S")) + self._archive_name = "%s--%s" % (email, trailer) + self._archive_dir = "/tmp/%s" % trailer + self._tmp_archive = True + self._u_invoke_client("make-archive", self._archive_name, + self._archive_dir, directory=path) + def _invoke_client(self, *args, **kwargs): + """ + Invoke the client on our archive. + """ + assert self._archive_name != None + command = args[0] + if len(args) > 1: + tailargs = args[1:] + else: + tailargs = [] + arglist = [command, "-A", self._archive_name] + arglist.extend(tailargs) + args = tuple(arglist) + return self._u_invoke_client(*args, **kwargs) + def _remove_archive(self): + assert self._tmp_archive == True + assert self._archive_dir != None + assert self._archive_name != None + os.remove(os.path.join(self._arch_paramdir, + "=locations", self._archive_name)) + shutil.rmtree(self._archive_dir) + self._tmp_archive = False + self._archive_dir = False + self._archive_name = False + def _create_project(self, path): + # http://mwolson.org/projects/GettingStartedWithArch.html + # http://regexps.srparish.net/tutorial-tla/new-project.html#Starting_a_New_Project + category = "bugs-everywhere" + branch = "mainline" + version = "0.1" + self._project_name = "%s--%s--%s" % (category, branch, version) + self._invoke_client("archive-setup", self._project_name, + directory=path) + def _remove_project(self): + assert self._tmp_project == True + assert self._project_name != None + assert self._archive_dir != None + shutil.rmtree(os.path.join(self._archive_dir, self._project_name)) + self._tmp_project = False + self._project_name = False + def _archive_project_name(self): + assert self._archive_name != None + assert self._project_name != None + return "%s/%s" % (self._archive_name, self._project_name) + def _add_project_code(self, path): + # http://mwolson.org/projects/GettingStartedWithArch.html + # http://regexps.srparish.net/tutorial-tla/importing-first.html#Importing_the_First_Revision + self._u_invoke_client("init-tree", self._archive_project_name(), + directory=path) + self._invoke_client("import", "--summary", "Began versioning", + directory=path) + def _rcs_cleanup(self): + if self._tmp_project == True: + self._remove_project() + if self._tmp_archive == True: + self._remove_archive() + def _rcs_get_user_id(self): + try: + status,output,error = self._u_invoke_client('my-id') + return output.rstrip('\n') + except Exception, e: + if 'no arch user id set' in e.args[0]: + return None + else: + raise + def _rcs_set_user_id(self, value): + self._u_invoke_client('my-id', value) + def _rcs_add(self, path): + self._u_invoke_client("add-id", path) + realpath = os.path.realpath(self._u_abspath(path)) + pathAdded = realpath in self._list_added(self.rootdir) + if self.paranoid and not pathAdded: + self._force_source(path) + def _list_added(self, root): + assert os.path.exists(root) + assert os.access(root, os.X_OK) + root = os.path.realpath(root) + status,output,error = self._u_invoke_client("inventory", "--source", + "--both", "--all", root) + inv_str = output.rstrip('\n') + return [os.path.join(root, p) for p in inv_str.split('\n')] + def _add_dir_rule(self, rule, dirname, root): + inv_path = os.path.join(dirname, '.arch-inventory') + file(inv_path, "ab").write(rule) + if os.path.realpath(inv_path) not in self._list_added(root): + paranoid = self.paranoid + self.paranoid = False + self.add(inv_path) + self.paranoid = paranoid + def _force_source(self, path): + rule = "source %s\n" % self._u_rel_path(path) + self._add_dir_rule(rule, os.path.dirname(path), self.rootdir) + if os.path.realpath(path) not in self._list_added(self.rootdir): + raise CantAddFile(path) + def _rcs_remove(self, path): + if not '.arch-ids' in path: + self._u_invoke_client("delete-id", path) + def _rcs_update(self, path): + pass + def _rcs_get_file_contents(self, path, revision=None): + if revision == None: + return file(self._u_abspath(path), "rb").read() + else: + status,output,error = \ + self._invoke_client("file-find", path, revision) + path = output.rstrip('\n') + return file(self._u_abspath(path), "rb").read() + def _rcs_duplicate_repo(self, directory, revision=None): + if revision == None: + RCS._rcs_duplicate_repo(self, directory, revision) + else: + status,output,error = \ + self._u_invoke_client("get", revision,directory) + def _rcs_commit(self, commitfile): + summary,body = self._u_parse_commitfile(commitfile) + #status,output,error = self._invoke_client("make-log") + if body == None: + status,output,error \ + = self._invoke_client("commit","--summary",summary) + else: + status,output,error \ + = self._invoke_client("commit","--summary",summary, + "--log-message",body) + revision = None + revline = re.compile("[*] committed (.*)") + match = revline.search(output) + assert match != None, output+error + assert len(match.groups()) == 1 + revpath = match.groups()[0] + assert not " " in revpath, revpath + assert revpath.startswith(self._archive_project_name()+'--') + revision = revpath[len(self._archive_project_name()+'--'):] + return revpath class CantAddFile(Exception): def __init__(self, file): self.file = file Exception.__init__(self, "Can't automatically add file %s" % file) +class ArchTestCase(RCStestCase): + Class = Arch -def add_dir_rule(rule, dirname, root): - inv_filename = os.path.join(dirname, '.arch-inventory') - file(inv_filename, "ab").write(rule) - if os.path.realpath(inv_filename) not in list_added(root): - add_id(inv_filename, paranoid=False) - -def force_source(filename, root): - rule = "source %s\n" % rel_filename(filename, root) - add_dir_rule(rule, os.path.dirname(filename), root) - if os.path.realpath(filename) not in list_added(root): - raise CantAddFile(filename) - -def add_id(filename, paranoid=False): - invoke_client("add-id", filename) - root = tree_root(filename) - if paranoid and os.path.realpath(filename) not in list_added(root): - force_source(filename, root) - - -def delete_id(filename): - invoke_client("delete-id", filename) - -def test_helper(type): - t = temp_arch_tree(type) - dirname = os.path.join(t, ".boo") - return dirname, t - -def mkdir(path, paranoid=False): - """ - >>> import shutil - >>> dirname,t = test_helper("easy") - >>> mkdir(dirname, paranoid=False) - >>> assert os.path.realpath(dirname) in list_added(t) - >>> assert not os.path.exists(os.path.join(t, ".arch-inventory")) - >>> shutil.rmtree(t) - >>> dirname,t = test_helper("tricky") - >>> mkdir(dirname, paranoid=True) - >>> assert os.path.realpath(dirname) in list_added(t) - >>> assert os.path.exists(os.path.join(t, ".arch-inventory")) - >>> shutil.rmtree(t) - >>> dirname,t = test_helper("impossible") - >>> try: - ... mkdir(dirname, paranoid=True) - ... except CantAddFile, e: - ... print "Can't add file" - Can't add file - >>> shutil.rmtree(t) - """ - os.mkdir(path) - add_id(path, paranoid=paranoid) - -def set_file_contents(path, contents): - add = not os.path.exists(path) - file(path, "wb").write(contents) - if add: - add_id(path) - - -def path_in_reference(bug_dir, spec): - if spec is not None: - return invoke_client("file-find", bug_dir, spec).rstrip('\n') - return invoke_client("file-find", bug_dir).rstrip('\n') - - -def unlink(path): - try: - os.unlink(path) - delete_id(path) - except OSError, e: - if e.errno != 2: - raise - - -def detect(path): - """Detect whether a directory is revision-controlled using Arch""" - path = os.path.realpath(path) - old_path = None - while True: - if os.path.exists(os.path.join(path, "{arch}")): - return True - if path == old_path: - return False - old_path = path - path = os.path.join('..', path) - -def precommit(directory): - pass - -def commit(directory, summary, body=None): - pass - -def postcommit(directory): - pass - - -name = "Arch" +unitsuite = unittest.TestLoader().loadTestsFromTestCase(ArchTestCase) +suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/bug.py b/libbe/bug.py index f973cf0..a14f7fd 100644 --- a/libbe/bug.py +++ b/libbe/bug.py @@ -16,14 +16,12 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import os import os.path -import shutil import errno import names import mapfile import time import utility -from rcs import rcs_by_name - +import doctest ### Define and describe valid bug categories # Use a tuple of (category, description) tuples since we don't have @@ -89,7 +87,7 @@ class Bug(object): severity = checked_property("severity", severity_values) status = checked_property("status", status_values) - def __init__(self, path, uuid, rcs_name, bugdir): + def __init__(self, path, uuid, rcs, bugdir): self.path = path self.uuid = uuid if uuid is not None: @@ -97,7 +95,7 @@ class Bug(object): else: dict = {} - self.rcs_name = rcs_name + self.rcs = rcs self.bugdir = bugdir self.summary = dict.get("summary") @@ -110,6 +108,17 @@ class Bug(object): if self.time is not None: self.time = utility.str_to_time(self.time) + def get_path(self, file=None): + if file == None: + return os.path.join(self.path, self.uuid) + else: + return os.path.join(self.path, self.uuid, file) + + def _get_active(self): + return self.status in active_status_values + + active = property(_get_active) + def __repr__(self): return "Bug(uuid=%r)" % self.uuid @@ -146,19 +155,13 @@ class Bug(object): statuschar = self.status[0] severitychar = self.severity[0] chars = "%c%c" % (statuschar, severitychar) - return "%s:%s: %s\n" % (short_name, chars, self.summary) + return "%s:%s: %s" % (short_name, chars, self.summary) + def __str__(self): return self.string(shortlist=True) - def get_path(self, file=None): - if file == None: - return os.path.join(self.path, self.uuid) - else: - return os.path.join(self.path, self.uuid, file) - def _get_active(self): - return self.status in active_status_values - - active = property(_get_active) + def __cmp__(self, other): + return cmp_full(self, other) def add_attr(self, map, name): value = getattr(self, name) @@ -166,6 +169,7 @@ class Bug(object): map[name] = value def save(self): + assert self.summary != None, "Can't save blank bug" map = {} self.add_attr(map, "assigned") self.add_attr(map, "summary") @@ -176,22 +180,18 @@ class Bug(object): if self.time is not None: map["time"] = utility.time_to_str(self.time) path = self.get_path("values") - mapfile.map_save(rcs_by_name(self.rcs_name), path, map) - + mapfile.map_save(self.rcs, path, map) + def remove(self): path = self.get_path() - shutil.rmtree(path) + self.rcs.recursive_remove(path) - def _get_rcs(self): - return rcs_by_name(self.rcs_name) - - rcs = property(_get_rcs) - def new_comment(self): if not os.path.exists(self.get_path("comments")): self.rcs.mkdir(self.get_path("comments")) comm = Comment(None, self) comm.uuid = names.uuid() + comm.rcs = self.rcs return comm def get_comment(self, uuid): @@ -218,7 +218,7 @@ class Bug(object): def new_bug(dir, uuid=None): bug = dir.new_bug(uuid) - bug.creator = names.creator() + bug.creator = bug.rcs.get_user_id() bug.severity = "minor" bug.status = "open" bug.time = time.time() @@ -226,7 +226,7 @@ def new_bug(dir, uuid=None): def new_comment(bug, body=None): comm = bug.new_comment() - comm.From = names.creator() + comm.From = comm.rcs.get_user_id() comm.time = time.time() comm.body = body return comm @@ -276,7 +276,6 @@ class Comment(object): mapfile.map_save(self.bug.rcs, self.get_path("values"), map_file) self.bug.rcs.set_file_contents(self.get_path("body"), self.body.encode('utf-8')) - def get_path(self, name=None): my_dir = os.path.join(self.bug.get_path("comments"), self.uuid) @@ -387,3 +386,5 @@ class InvalidValue(ValueError): Exception.__init__(self, msg) self.name = name self.value = value + +suite = doctest.DocTestSuite() diff --git a/libbe/bugdir.py b/libbe/bugdir.py index f8f45b8..cf8cba5 100644 --- a/libbe/bugdir.py +++ b/libbe/bugdir.py @@ -18,11 +18,13 @@ import os import os.path import cmdutil import errno +import unittest +import doctest import names import mapfile import time import utility -from rcs import rcs_by_name +from rcs import rcs_by_name, installed_rcs from bug import Bug class NoBugDir(Exception): @@ -84,22 +86,20 @@ class AlreadyInitialized(Exception): Exception.__init__(self, "Specified root is already initialized: %s" % path) +def bugdir_root(versioning_root): + return os.path.join(versioning_root, ".be") + def create_bug_dir(path, rcs): """ - >>> import no_rcs, tests - >>> create_bug_dir('/highly-unlikely-to-exist', no_rcs) + >>> import tests + >>> rcs = rcs_by_name("None") + >>> create_bug_dir('/highly-unlikely-to-exist', rcs) Traceback (most recent call last): NoRootEntry: Specified root does not exist: /highly-unlikely-to-exist - >>> test_dir = os.path.dirname(tests.bug_arch_dir().dir) - >>> try: - ... create_bug_dir(test_dir, no_rcs) - ... except AlreadyInitialized, e: - ... print "Already Initialized" - Already Initialized """ root = os.path.join(path, ".be") try: - rcs.mkdir(root, paranoid=True) + rcs.mkdir(root) except OSError, e: if e.errno == errno.ENOENT: raise NoRootEntry(path) @@ -111,7 +111,7 @@ def create_bug_dir(path, rcs): set_version(root, rcs) mapfile.map_save(rcs, os.path.join(root, "settings"), {"rcs_name": rcs.name}) - return BugDir(os.path.join(path, ".be")) + return BugDir(bugdir_root(path)) def setting_property(name, valid=None): @@ -139,11 +139,12 @@ class BugDir: self.dir = dir self.bugs_path = os.path.join(self.dir, "bugs") try: - self.settings = mapfile.map_load(os.path.join(self.dir, "settings")) + self.settings = mapfile.map_load(os.path.join(self.dir,"settings")) except mapfile.NoSuchFile: self.settings = {"rcs_name": "None"} - rcs_name = setting_property("rcs_name", ("None", "bzr", "git", "Arch", "hg")) + rcs_name = setting_property("rcs_name", + ("None", "bzr", "git", "Arch", "hg")) _rcs = None target = setting_property("target") @@ -152,16 +153,21 @@ class BugDir: mapfile.map_save(self.rcs, os.path.join(self.dir, "settings"), self.settings) - def get_rcs(self): - if self._rcs is not None and self.rcs_name == self._rcs.name: - return self._rcs + def _get_rcs(self): + if self._rcs is not None: + if self.rcs_name == self._rcs.name: + return self._rcs self._rcs = rcs_by_name(self.rcs_name) + self._rcs.root(self.dir) return self._rcs - rcs = property(get_rcs) + rcs = property(_get_rcs) + + def duplicate_bugdir(self, revision): + return BugDir(bugdir_root(self.rcs.duplicate_repo(revision))) - def get_reference_bugdir(self, spec): - return BugDir(self.rcs.path_in_reference(self.dir, spec)) + def remove_duplicate_bugdir(self): + self.rcs.remove_duplicate_repo() def list(self): for uuid in self.list_uuids(): @@ -174,7 +180,7 @@ class BugDir: return bugs def get_bug(self, uuid): - return Bug(self.bugs_path, uuid, self.rcs_name, self) + return Bug(self.bugs_path, uuid, self.rcs, self) def list_uuids(self): for uuid in os.listdir(self.bugs_path): @@ -187,7 +193,7 @@ class BugDir: uuid = names.uuid() path = os.path.join(self.bugs_path, uuid) self.rcs.mkdir(path) - bug = Bug(self.bugs_path, None, self.rcs_name, self) + bug = Bug(self.bugs_path, None, self.rcs, self) bug.uuid = uuid return bug @@ -197,3 +203,52 @@ class InvalidValue(ValueError): Exception.__init__(self, msg) self.name = name self.value = value + +def simple_bug_dir(): + """ + For testing + >>> bugdir = simple_bug_dir() + >>> ls = list(bugdir.list_uuids()) + >>> ls.sort() + >>> print ls + ['a', 'b'] + """ + dir = utility.Dir() + rcs = installed_rcs() + rcs.init(dir.path) + assert os.path.exists(dir.path) + bugdir = create_bug_dir(dir.path, rcs) + bugdir._dir_ref = dir # postpone cleanup since dir.__del__() removes dir. + bug_a = bugdir.new_bug("a") + bug_a.summary = "Bug A" + bug_a.save() + bug_b = bugdir.new_bug("b") + bug_b.status = "closed" + bug_b.summary = "Bug B" + bug_b.save() + return bugdir + + +class BugDirTestCase(unittest.TestCase): + def __init__(self, *args, **kwargs): + unittest.TestCase.__init__(self, *args, **kwargs) + def setUp(self): + self.dir = utility.Dir() + self.rcs = installed_rcs() + self.rcs.init(self.dir.path) + self.bugdir = create_bug_dir(self.dir.path, self.rcs) + def tearDown(self): + del(self.rcs) + del(self.dir) + def fullPath(self, path): + return os.path.join(self.dir.path, path) + def assertPathExists(self, path): + fullpath = self.fullPath(path) + self.failUnless(os.path.exists(fullpath)==True, + "path %s does not exist" % fullpath) + def testBugDirDuplicate(self): + self.assertRaises(AlreadyInitialized, create_bug_dir, + self.dir.path, self.rcs) + +unitsuite = unittest.TestLoader().loadTestsFromTestCase(BugDirTestCase) +suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/bzr.py b/libbe/bzr.py index ddda334..a0ae715 100644 --- a/libbe/bzr.py +++ b/libbe/bzr.py @@ -15,114 +15,84 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import os -import tempfile - -import config -from rcs import invoke, CommandError - -def invoke_client(*args, **kwargs): - directory = kwargs['directory'] - expect = kwargs.get('expect', (0, 1)) - cl_args = ["bzr"] - cl_args.extend(args) - status,output,error = invoke(cl_args, expect, cwd=directory) - return status, output - -def add_id(filename, paranoid=False): - invoke_client("add", filename, directory='.') - -def delete_id(filename): - invoke_client("remove", filename, directory='.') - -def mkdir(path, paranoid=False): - os.mkdir(path) - add_id(path) - -def set_file_contents(path, contents): - add = not os.path.exists(path) - file(path, "wb").write(contents) - if add: - add_id(path) - -def lookup_revision(revno, directory): - return invoke_client("lookup-revision", str(revno), - directory=directory)[1].rstrip('\n') - -def export(revno, directory, revision_dir): - invoke_client("export", "-r", str(revno), revision_dir, directory=directory) - -def find_or_make_export(revno, directory): - revision_id = lookup_revision(revno, directory) - home = os.path.expanduser("~") - revision_root = os.path.join(home, ".bzrrevs") - if not os.path.exists(revision_root): - os.mkdir(revision_root) - revision_dir = os.path.join(revision_root, revision_id) - if not os.path.exists(revision_dir): - export(revno, directory, revision_dir) - return revision_dir - -def bzr_root(path): - return invoke_client("root", path, directory=None)[1].rstrip('\r') - -def path_in_reference(bug_dir, spec): - if spec is None: - spec = int(invoke_client("revno", directory=bug_dir)[1]) - rel_bug_dir = bug_dir[len(bzr_root(bug_dir)):] - export_root = find_or_make_export(spec, directory=bug_dir) - return os.path.join(export_root, rel_bug_dir) - - -def unlink(path): - try: - os.unlink(path) - delete_id(path) - except OSError, e: - if e.errno != 2: - raise - - -def detect(path): - """Detect whether a directory is revision-controlled using bzr""" - path = os.path.realpath(path) - old_path = None - while True: - if os.path.exists(os.path.join(path, ".bzr")): +import re +import unittest +import doctest + +from rcs import RCS, RCStestCase, CommandError + +def new(): + return Bzr() + +class Bzr(RCS): + name = "bzr" + client = "bzr" + versioned = True + def _rcs_help(self): + status,output,error = self._u_invoke_client("--help") + return output + def _rcs_detect(self, path): + if self._u_search_parent_directories(path, ".bzr") != None : return True - if path == old_path: - return False - old_path = path - path = os.path.dirname(path) - -def precommit(directory): - pass - -def commit(directory, summary, body=None): - if body is not None: - summary += '\n' + body - descriptor, filename = tempfile.mkstemp() - try: - temp_file = os.fdopen(descriptor, 'wb') - temp_file.write(summary) - temp_file.close() - invoke_client('commit', '--unchanged', '--file', filename, - directory=directory) - finally: - os.unlink(filename) - -def postcommit(directory): - try: - invoke_client('merge', directory=directory) - except CommandError, e: - if ('No merge branch known or specified' in e.err_str or - 'No merge location known or specified' in e.err_str): - pass + return False + def _rcs_root(self, path): + """Find the root of the deepest repository containing path.""" + status,output,error = self._u_invoke_client("root", path) + return output.rstrip('\n') + def _rcs_init(self, path): + self._u_invoke_client("init", directory=path) + def _rcs_get_user_id(self): + status,output,error = self._u_invoke_client("whoami") + return output.rstrip('\n') + def _rcs_set_user_id(self, value): + self._u_invoke_client("whoami", value) + def _rcs_add(self, path): + self._u_invoke_client("add", path) + def _rcs_remove(self, path): + # --force to also remove unversioned files. + self._u_invoke_client("remove", "--force", path) + def _rcs_update(self, path): + pass + def _rcs_get_file_contents(self, path, revision=None): + if revision == None: + return file(os.path.join(self.rootdir, path), "rb").read() + else: + status,output,error = \ + self._u_invoke_client("cat","-r",revision,path) + return output + def _rcs_duplicate_repo(self, directory, revision=None): + if revision == None: + RCS._rcs_duplicate_repo(self, directory, revision) else: - status = invoke_client('revert', '--no-backup', + self._u_invoke_client("branch", "--revision", revision, + ".", directory) + def _rcs_commit(self, commitfile): + status,output,error = self._u_invoke_client("commit", "--unchanged", + "--file", commitfile) + revision = None + revline = re.compile("Committed revision (.*)[.]") + match = revline.search(error) + assert match != None, output+error + assert len(match.groups()) == 1 + revision = match.groups()[0] + return revision + def postcommit(self): + try: + self._u_invoke_client('merge') + except CommandError, e: + if ('No merge branch known or specified' in e.err_str or + 'No merge location known or specified' in e.err_str): + pass + else: + self._u_invoke_client('revert', '--no-backup', directory=directory) - status = invoke_client('resolve', '--all', directory=directory) - raise - if len(invoke_client('status', directory=directory)[1]) > 0: - commit(directory, 'Merge from upstream') - -name = "bzr" + self._u_invoke_client('resolve', '--all', directory=directory) + raise + if len(self._u_invoke_client('status', directory=directory)[1]) > 0: + self.commit('Merge from upstream') + +class BzrTestCase(RCStestCase): + Class = Bzr + +unitsuite = unittest.TestLoader().loadTestsFromTestCase(BzrTestCase) +suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/cmdutil.py b/libbe/cmdutil.py index ace2d81..62a0c7c 100644 --- a/libbe/cmdutil.py +++ b/libbe/cmdutil.py @@ -22,6 +22,7 @@ import optparse from textwrap import TextWrapper from StringIO import StringIO import utility +import doctest class UserError(Exception): def __init__(self, msg): @@ -33,6 +34,18 @@ class UserErrorWrap(UserError): self.exception = exception def get_bug(spec, bug_dir=None): + """ + >>> bd = bugdir.simple_bug_dir() + >>> bug_a = get_bug('a', bd) + >>> print type(bug_a) + <class 'libbe.bug.Bug'> + >>> print bug_a + a:om: Bug A + >>> print bd.get_bug('a') + a:om: Bug A + >>> bug_a == bd.get_bug('a') + True + """ matches = [] try: if bug_dir is None: @@ -206,3 +219,5 @@ def _test(): if __name__ == "__main__": _test() + +suite = doctest.DocTestSuite() diff --git a/libbe/config.py b/libbe/config.py index ecc40ce..79c0d6f 100644 --- a/libbe/config.py +++ b/libbe/config.py @@ -16,6 +16,8 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import ConfigParser import os.path +import doctest + def path(): """Return the path to the per-user config file""" return os.path.expanduser("~/.bugs_everywhere") @@ -58,3 +60,5 @@ def get_val(name, section="DEFAULT"): return config.get(section, name) except ConfigParser.NoOptionError: return None + +suite = doctest.DocTestSuite() diff --git a/libbe/diff.py b/libbe/diff.py index 7a1dbcc..9fa3816 100644 --- a/libbe/diff.py +++ b/libbe/diff.py @@ -18,6 +18,7 @@ from libbe import cmdutil, bugdir from libbe.utility import time_to_str from libbe.bug import cmp_severity +import doctest def diff(old_tree, new_tree): old_bug_map = old_tree.bug_map() @@ -38,9 +39,11 @@ def diff(old_tree, new_tree): return (removed, modified, added) -def reference_diff(bugdir, spec=None): - return diff(bugdir.get_reference_bugdir(spec), bugdir) - +def reference_diff(bugdir, revision=None): + d = diff(bugdir.duplicate_bugdir(revision), bugdir) + bugdir.remove_duplicate_bugdir() + return d + def diff_report(diff_data, bug_dir): (removed, modified, added) = diff_data bugs = list(bug_dir.list()) @@ -109,3 +112,5 @@ def bug_changes(old, new, bugs): def comment_summary(comment, status): return "%8s comment from %s on %s" % (status, comment.From, time_to_str(comment.time)) + +suite = doctest.DocTestSuite() diff --git a/libbe/git.py b/libbe/git.py index e15d773..046e72e 100644 --- a/libbe/git.py +++ b/libbe/git.py @@ -14,140 +14,86 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import os -import tempfile - -from rcs import invoke - -def strip_git(filename): - # Find the base path of the GIT tree, in order to strip that leading - # path from arguments to git -- it doesn't like absolute paths. - if os.path.isabs(filename): - absRepoDir = os.path.abspath(git_repo_for_path('.')) - absRepoSlashedDir = os.path.join(absRepoDir,"") - assert filename.startswith(absRepoSlashedDir), \ - "file %s not in git repo %s" % (filename, absRepoSlashedDir) - filename = filename[len(absRepoSlashedDir):] - return filename - -def invoke_client(*args, **kwargs): - directory = kwargs['directory'] - expect = kwargs.get('expect', (0, 1)) - cl_args = ["git"] - cl_args.extend(args) - status,output,error = invoke(cl_args, expect, cwd=directory) - return status, output - -def add_id(filename, paranoid=False): - filename = strip_git(filename) - invoke_client("add", filename, directory=git_repo_for_path('.')) - -def delete_id(filename): - filename = strip_git(filename) - invoke_client("rm", filename, directory=git_repo_for_path('.')) - -def mkdir(path, paranoid=False): - os.mkdir(path) - -def set_file_contents(path, contents): - add = not os.path.exists(path) - file(path, "wb").write(contents) - if add: - add_id(path) - -def detect(path): - """Detect whether a directory is revision-controlled using GIT""" - path = os.path.realpath(path) - old_path = None - while True: - if os.path.exists(os.path.join(path, ".git")): +import re +import unittest +import doctest + +from rcs import RCS, RCStestCase, CommandError + +def new(): + return Git() + +class Git(RCS): + name="git" + client="git" + versioned=True + def _rcs_help(self): + status,output,error = self._u_invoke_client("--help") + return output + def _rcs_detect(self, path): + if self._u_search_parent_directories(path, ".git") != None : return True - if path == old_path: - return False - old_path = path - path = os.path.dirname(path) - -def precommit(directory): - pass - -def commit(directory, summary, body=None): - if body is not None: - summary += '\n' + body - descriptor, filename = tempfile.mkstemp() - try: - temp_file = os.fdopen(descriptor, 'wb') - temp_file.write(summary) - temp_file.close() - invoke_client('commit', '-a', '-F', filename, directory=directory) - finally: - os.unlink(filename) - -def postcommit(directory): - pass - - -# In order to diff the bug database, you need a way to check out arbitrary -# previous revisions and a mechanism for locating the bug_dir in the revision -# you've checked out. -# -# Copying the Mercurial implementation, this feature is implemented by four -# functions: -# -# git_dir_for_path : find '.git' for a git tree. -# -# export : check out a commit 'spec' from git-repo 'bug_dir' into a dir -# 'revision_dir' -# -# find_or_make_export : check out a commit 'spec' from git repo 'directory' to -# any location you please and return the path to the checkout -# -# path_in_reference : return a path to the bug_dir of the commit 'spec' - -def git_repo_for_path(path): - """Find the root of the deepest repository containing path.""" - # Assume that nothing funny is going on; in particular, that we aren't - # dealing with a bare repo. - dirname = os.path.dirname(git_dir_for_path(path)) - if dirname == '' : # os.path.dirname('filename') == '' - dirname = '.' - return dirname - -def git_dir_for_path(path): - """Find the git-dir of the deepest repo containing path.""" - return invoke_client("rev-parse", "--git-dir", directory=path)[1].rstrip() - -def export(spec, bug_dir, revision_dir): - """Check out commit 'spec' from the git repo containing bug_dir into - 'revision_dir'.""" - if not os.path.exists(revision_dir): - os.makedirs(revision_dir) - invoke_client("init", directory=revision_dir) - invoke_client("pull", git_dir_for_path(bug_dir), directory=revision_dir) - invoke_client("checkout", '-f', spec, directory=revision_dir) - -def find_or_make_export(spec, directory): - """Checkout 'spec' from the repo at 'directory' by hook or by crook and - return the path to the working copy.""" - home = os.path.expanduser("~") - revision_root = os.path.join(home, ".be_revs") - if not os.path.exists(revision_root): - os.mkdir(revision_root) - revision_dir = os.path.join(revision_root, spec) - if not os.path.exists(revision_dir): - export(spec, directory, revision_dir) - return revision_dir - -def path_in_reference(bug_dir, spec): - """Check out 'spec' and return the path to its bug_dir.""" - spec = spec or 'HEAD' - spec = invoke_client('rev-parse', spec, directory=bug_dir)[1].rstrip() - # This is a really hairy computation. - # The theory is that we can't possibly be working out of a bare repo; - # hence, we get the rel_bug_dir by chopping off dirname(git_dir_for_path(bug_dir)) - # + '/'. - rel_bug_dir = strip_git(bug_dir) - export_root = find_or_make_export(spec, directory=bug_dir) - return os.path.join(export_root, rel_bug_dir) - - -name = "git" - + return False + def _rcs_root(self, path): + """Find the root of the deepest repository containing path.""" + # Assume that nothing funny is going on; in particular, that we aren't + # dealing with a bare repo. + if os.path.isdir(path) != True: + path = os.path.dirname(path) + status,output,error = self._u_invoke_client("rev-parse", "--git-dir", + directory=path) + gitdir = os.path.join(path, output.rstrip('\n')) + dirname = os.path.abspath(os.path.dirname(gitdir)) + return dirname + def _rcs_init(self, path): + self._u_invoke_client("init", directory=path) + def _rcs_get_user_id(self): + status,output,error = self._u_invoke_client("config", "user.name") + name = output.rstrip('\n') + status,output,error = self._u_invoke_client("config", "user.email") + email = output.rstrip('\n') + return self._u_create_id(name, email) + def _rcs_set_user_id(self, value): + name,email = self._u_parse_id(value) + if email != None: + self._u_invoke_client("config", "user.email", email) + self._u_invoke_client("config", "user.name", name) + def _rcs_add(self, path): + if os.path.isdir(path): + return + self._u_invoke_client("add", path) + def _rcs_remove(self, path): + if not os.path.isdir(self._u_abspath(path)): + self._u_invoke_client("rm", "-f", path) + def _rcs_update(self, path): + self._rcs_add(path) + def _rcs_get_file_contents(self, path, revision=None): + if revision == None: + return file(self._u_abspath(path), "rb").read() + else: + arg = "%s:%s" % (revision,path) + status,output,error = self._u_invoke_client("show", arg) + return output + def _rcs_duplicate_repo(self, directory, revision=None): + if revision==None: + RCS._rcs_duplicate_repo(self, directory, revision) + else: + #self._u_invoke_client("archive", revision, directory) # makes tarball + self._u_invoke_client("clone", "--no-checkout",".",directory) + self._u_invoke_client("checkout", revision, directory=directory) + def _rcs_commit(self, commitfile): + status,output,error = self._u_invoke_client('commit', '-a', + '-F', commitfile) + revision = None + revline = re.compile("Created (.*)commit (.*):(.*)") + match = revline.search(output) + assert match != None, output+error + assert len(match.groups()) == 3 + revision = match.groups()[1] + return revision + +class GitTestCase(RCStestCase): + Class = Git + +unitsuite = unittest.TestLoader().loadTestsFromTestCase(GitTestCase) +suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/hg.py b/libbe/hg.py index 35de8e0..27cbb79 100644 --- a/libbe/hg.py +++ b/libbe/hg.py @@ -14,102 +14,73 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import os -import tempfile - -import config -from rcs import invoke, CommandError - -def invoke_client(*args, **kwargs): - directory = kwargs['directory'] - expect = kwargs.get('expect', (0, 1)) - cl_args = ["hg"] - cl_args.extend(args) - status,output,error = invoke(cl_args, expect, cwd=directory) - return status, output - -def add_id(filename, paranoid=False): - invoke_client("add", filename, directory='.') - -def delete_id(filename): - invoke_client("rm", filename, directory='.') - -def mkdir(path, paranoid=False): - os.mkdir(path) - -def set_file_contents(path, contents): - add = not os.path.exists(path) - file(path, "wb").write(contents) - if add: - add_id(path) - -def lookup_revision(revno, directory): - return invoke_client('log', '--rev', str(revno), '--template={node}', - directory=directory)[1].rstrip('\n') - -def export(revno, directory, revision_dir): - invoke_client("archive", "--rev", str(revno), revision_dir, - directory=directory) - -def find_or_make_export(revno, directory): - revision_id = lookup_revision(revno, directory) - home = os.path.expanduser("~") - revision_root = os.path.join(home, ".be_revs") - if not os.path.exists(revision_root): - os.mkdir(revision_root) - revision_dir = os.path.join(revision_root, revision_id) - if not os.path.exists(revision_dir): - export(revno, directory, revision_dir) - return revision_dir - -def hg_root(path): - return invoke_client("root", "-R", path, directory=None)[1].rstrip('\r') - -def path_in_reference(bug_dir, spec): - if spec is None: - spec = int(invoke_client('tip', '--template="{rev}"', - directory=bug_dir)[1]) - rel_bug_dir = bug_dir[len(hg_root(bug_dir)):] - export_root = find_or_make_export(spec, directory=bug_dir) - return os.path.join(export_root, rel_bug_dir) - - -def unlink(path): - try: - os.unlink(path) - delete_id(path) - except OSError, e: - if e.errno != 2: - raise - - -def detect(path): - """Detect whether a directory is revision-controlled using Mercurial""" - path = os.path.realpath(path) - old_path = None - while True: - if os.path.exists(os.path.join(path, ".hg")): +import re +import unittest +import doctest + +from rcs import RCS, RCStestCase, CommandError, SettingIDnotSupported + +def new(): + return Hg() + +class Hg(RCS): + name="hg" + client="hg" + versioned=True + def _rcs_help(self): + status,output,error = self._u_invoke_client("--help") + return output + def _rcs_detect(self, path): + """Detect whether a directory is revision-controlled using Mercurial""" + if self._u_search_parent_directories(path, ".hg") != None: return True - if path == old_path: - return False - old_path = path - path = os.path.dirname(path) - -def precommit(directory): - pass - -def commit(directory, summary, body=None): - if body is not None: - summary += '\n' + body - descriptor, filename = tempfile.mkstemp() - try: - temp_file = os.fdopen(descriptor, 'wb') - temp_file.write(summary) - temp_file.close() - invoke_client('commit', '--logfile', filename, directory=directory) - finally: - os.unlink(filename) - -def postcommit(directory): - pass - -name = "hg" + return False + def _rcs_root(self, path): + status,output,error = self._u_invoke_client("root", directory=path) + return output.rstrip('\n') + def _rcs_init(self, path): + self._u_invoke_client("init", directory=path) + def _rcs_get_user_id(self): + status,output,error = self._u_invoke_client("showconfig","ui.username") + return output.rstrip('\n') + def _rcs_set_user_id(self, value): + """ + Supported by the Config Extension, but that is not part of + standard Mercurial. + http://www.selenic.com/mercurial/wiki/index.cgi/ConfigExtension + """ + raise SettingIDnotSupported + def _rcs_add(self, path): + self._u_invoke_client("add", path) + def _rcs_remove(self, path): + self._u_invoke_client("rm", path) + def _rcs_update(self, path): + pass + def _rcs_get_file_contents(self, path, revision=None): + if revision == None: + return file(os.path.join(self.rootdir, path), "rb").read() + else: + status,output,error = \ + self._u_invoke_client("cat","-r",revision,path) + return output + def _rcs_duplicate_repo(self, directory, revision=None): + if revision == None: + RCS._rcs_duplicate_repo(self, directory, revision) + else: + self._u_invoke_client("archive", "--rev", revision, directory) + def _rcs_commit(self, commitfile): + self._u_invoke_client('commit', '--logfile', commitfile) + status,output,error = self._u_invoke_client('identify') + revision = None + revline = re.compile("(.*) tip") + match = revline.search(output) + assert match != None, output+error + assert len(match.groups()) == 1 + revision = match.groups()[0] + return revision + +class HgTestCase(RCStestCase): + Class = Hg + +unitsuite = unittest.TestLoader().loadTestsFromTestCase(HgTestCase) +suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/mapfile.py b/libbe/mapfile.py index 3f09edd..8f69554 100644 --- a/libbe/mapfile.py +++ b/libbe/mapfile.py @@ -17,6 +17,7 @@ import os.path import errno import utility +import doctest class IllegalKey(Exception): def __init__(self, key): @@ -107,8 +108,11 @@ def map_save(rcs, path, map): add = not os.path.exists(path) output = file(path, "wb") generate(output, map) + output.close() if add: - rcs.add_id(path) + rcs.add(path) + else: + rcs.update(path) class NoSuchFile(Exception): def __init__(self, pathname): @@ -122,3 +126,5 @@ def map_load(path): if e.errno != errno.ENOENT: raise e raise NoSuchFile(path) + +suite = doctest.DocTestSuite() diff --git a/libbe/names.py b/libbe/names.py index c86063d..b866f75 100644 --- a/libbe/names.py +++ b/libbe/names.py @@ -17,7 +17,7 @@ import os import sys - +import doctest def uuid(): # this code borrowed from standard commands module @@ -53,3 +53,5 @@ def unique_name(bug, bugs): while (bug.uuid[:chars] == some_bug.uuid[:chars]): chars+=1 return bug.uuid[:chars] + +suite = doctest.DocTestSuite() diff --git a/libbe/no_rcs.py b/libbe/no_rcs.py deleted file mode 100644 index 1b3b005..0000000 --- a/libbe/no_rcs.py +++ /dev/null @@ -1,51 +0,0 @@ -# Copyright (C) 2005 Aaron Bentley and Panometrics, Inc. -# <abentley@panoramicfeedback.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -import os -import config -from os import unlink - -def add_id(filename, paranoid=False): - """Compatibility function""" - pass - -def delete_id(filename): - """Compatibility function""" - pass - -def mkdir(path, paranoid=False): - os.mkdir(path) - -def set_file_contents(path, contents): - add = not os.path.exists(path) - file(path, "wb").write(contents) - if add: - add_id(path) - -def detect(path): - """Compatibility function""" - return True - -def precommit(directory): - pass - -def commit(directory, summary, body=None): - pass - -def postcommit(directory): - pass - -name = "None" diff --git a/libbe/plugin.py b/libbe/plugin.py index 9254986..05a4398 100644 --- a/libbe/plugin.py +++ b/libbe/plugin.py @@ -17,6 +17,8 @@ import os import os.path import sys +import doctest + def my_import(mod_name): module = __import__(mod_name) components = mod_name.split('.') @@ -56,6 +58,8 @@ plugin_path = os.path.realpath(os.path.dirname(os.path.dirname(__file__))) if plugin_path not in sys.path: sys.path.append(plugin_path) +suite = doctest.DocTestSuite() + def _test(): import doctest doctest.testmod() diff --git a/libbe/rcs.py b/libbe/rcs.py index 4487fba..2993a80 100644 --- a/libbe/rcs.py +++ b/libbe/rcs.py @@ -15,42 +15,43 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA from subprocess import Popen, PIPE +import os +import os.path +from socket import gethostname +import re import sys +import tempfile +import shutil +import unittest +import doctest +from utility import Dir -def rcs_by_name(rcs_name): - """Return the module for the RCS with the given name""" - if rcs_name == "Arch": - import arch - return arch - elif rcs_name == "bzr": - import bzr - return bzr - elif rcs_name == "hg": - import hg - return hg - elif rcs_name == "git": - import git - return git - elif rcs_name == "None": - import no_rcs - return no_rcs - -def detect(dir): - """Return the module for the rcs being used in this directory""" +def _get_matching_rcs(matchfn): + """Return the first module for which matchfn(RCS_instance) is true""" import arch import bzr import hg import git - if arch.detect(dir): - return arch - elif bzr.detect(dir): - return bzr - elif hg.detect(dir): - return hg - elif git.detect(dir): - return git - import no_rcs - return no_rcs + for module in [arch, bzr, hg, git]: + rcs = module.new() + if matchfn(rcs): + return rcs + else: + del(rcs) + return RCS() + +def rcs_by_name(rcs_name): + """Return the module for the RCS with the given name""" + return _get_matching_rcs(lambda rcs: rcs.name == rcs_name) + +def detect_rcs(dir): + """Return an RCS instance for the rcs being used in this directory""" + return _get_matching_rcs(lambda rcs: rcs.detect(dir)) + +def installed_rcs(): + """Return an instance of an installed RCS""" + return _get_matching_rcs(lambda rcs: rcs.installed()) + class CommandError(Exception): def __init__(self, err_str, status): @@ -58,19 +59,506 @@ class CommandError(Exception): self.err_str = err_str self.status = status -def invoke(args, expect=(0,), cwd=None): - try : - if sys.platform != "win32": - q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd) +class SettingIDnotSupported(NotImplementedError): + pass + +def new(): + return RCS() + +class RCS(object): + """ + Implement the 'no-rcs' interface. + + Support for other RCSs can be added by subclassing this class, and + overriding methods _rcs_*() with code appropriate for your RCS. + + The methods _u_*() are utility methods available to the _rcs_*() + methods. + """ + name = "None" + client = "" # command-line tool for _u_invoke_client + versioned = False + def __init__(self, paranoid=False): + self.paranoid = paranoid + self.verboseInvoke = False + self.rootdir = None + self._duplicateBasedir = None + self._duplicateDirname = None + def __del__(self): + self.cleanup() + + def _rcs_help(self): + """ + Return the command help string. + (Allows a simple test to see if the client is installed.) + """ + pass + def _rcs_detect(self, path=None): + """ + Detect whether a directory is revision controlled with this RCS. + """ + return True + def _rcs_root(self, path): + """ + Get the RCS root. This is the default working directory for + future invocations. You would normally set this to the root + directory for your RCS. + """ + if os.path.isdir(path)==False: + path = os.path.dirname(path) + if path == "": + path = os.path.abspath(".") + return path + def _rcs_init(self, path): + """ + Begin versioning the tree based at path. + """ + pass + def _rcs_cleanup(self): + """ + Remove any cruft that _rcs_init() created outside of the + versioned tree. + """ + pass + def _rcs_get_user_id(self): + """ + Get the RCS's suggested user id (e.g. "John Doe <jdoe@example.com>"). + If the RCS has not been configured with a username, return None. + """ + return None + def _rcs_set_user_id(self, value): + """ + Set the RCS's suggested user id (e.g "John Doe <jdoe@example.com>"). + This is run if the RCS has not been configured with a usename, so + that commits will have a reasonable FROM value. + """ + raise SettingIDnotSupported + def _rcs_add(self, path): + """ + Add the already created file at path to version control. + """ + pass + def _rcs_remove(self, path): + """ + Remove the file at path from version control. Optionally + remove the file from the filesystem as well. + """ + pass + def _rcs_update(self, path): + """ + Notify the versioning system of changes to the versioned file + at path. + """ + pass + def _rcs_get_file_contents(self, path, revision=None): + """ + Get the file as it was in a given revision. + Revision==None specifies the current revision. + """ + assert revision == None, \ + "The %s RCS does not support revision specifiers" % self.name + return file(os.path.join(self.rootdir, path), "rb").read() + def _rcs_duplicate_repo(self, directory, revision=None): + """ + Get the repository as it was in a given revision. + revision==None specifies the current revision. + dir specifies a directory to create the duplicate in. + """ + shutil.copytree(self.rootdir, directory, True) + def _rcs_commit(self, commitfile): + """ + Commit the current working directory, using the contents of + commitfile as the comment. Return the name of the old + revision. + """ + return None + def installed(self): + try: + self._rcs_help() + return True + except OSError, e: + if e.errno == errno.ENOENT: + return False + raise e + def detect(self, path=None): + """ + Detect whether a directory is revision controlled with this RCS. + """ + return self._rcs_detect(path) + def root(self, path): + """ + Set the root directory to the path's RCS root. This is the + default working directory for future invocations. + """ + self.rootdir = self._rcs_root(path) + def init(self, path): + """ + Begin versioning the tree based at path. + Also roots the rcs at path. + """ + if os.path.isdir(path)==False: + path = os.path.dirname(path) + self._rcs_init(path) + self.root(path) + def cleanup(self): + self._rcs_cleanup() + def get_user_id(self): + """ + Get the RCS's suggested user id (e.g. "John Doe <jdoe@example.com>"). + If the RCS has not been configured with a username, return the user's + id. + """ + id = self._rcs_get_user_id() + if id == None: + name = self._u_get_fallback_username() + email = self._u_get_fallback_email() + id = self._u_create_id(name, email) + print >> sys.stderr, "Guessing id '%s'" % id + try: + self.set_user_id(id) + except SettingIDnotSupported: + pass + return id + def set_user_id(self, value): + """ + Set the RCS's suggested user id (e.g "John Doe <jdoe@example.com>"). + This is run if the RCS has not been configured with a usename, so + that commits will have a reasonable FROM value. + """ + self._rcs_set_user_id(value) + def add(self, path): + """ + Add the already created file at path to version control. + """ + self._rcs_add(self._u_rel_path(path)) + def remove(self, path): + """ + Remove a file from both version control and the filesystem. + """ + self._rcs_remove(self._u_rel_path(path)) + if os.path.exists(path): + os.remove(path) + def recursive_remove(self, dirname): + """ + Remove a file/directory and all its decendents from both + version control and the filesystem. + """ + for dirpath,dirnames,filenames in os.walk(dirname, topdown=False): + filenames.extend(dirnames) + for path in filenames: + fullpath = os.path.join(dirpath, path) + if os.path.exists(fullpath) == False: + continue + self._rcs_remove(self._u_rel_path(fullpath)) + if os.path.exists(dirname): + shutil.rmtree(dirname) + def update(self, path): + """ + Notify the versioning system of changes to the versioned file + at path. + """ + self._rcs_update(self._u_rel_path(path)) + def get_file_contents(self, path, revision=None): + """ + Get the file as it was in a given revision. + Revision==None specifies the current revision. + """ + relpath = self._u_rel_path(path) + return self._rcs_get_file_contents(relpath, revision) + def set_file_contents(self, path, contents): + """ + Set the file contents under version control. + """ + add = not os.path.exists(path) + file(path, "wb").write(contents) + if add: + self.add(path) + else: + self.update(path) + def mkdir(self, path): + """ + Created directory at path under version control. + """ + os.mkdir(path) + self.add(path) + def duplicate_repo(self, revision=None): + """ + Get the repository as it was in a given revision. + revision==None specifies the current revision. + Return the path to the arbitrary directory at the base of the new repo. + """ + # Dirname in Baseir to protect against simlink attacks. + if self._duplicateBasedir == None: + self._duplicateBasedir = tempfile.mkdtemp(prefix='BErcs') + self._duplicateDirname = \ + os.path.join(self._duplicateBasedir, "duplicate") + self._rcs_duplicate_repo(directory=self._duplicateDirname, + revision=revision) + return self._duplicateDirname + def remove_duplicate_repo(self): + """ + Clean up a duplicate repo created with duplicate_repo(). + """ + if self._duplicateBasedir != None: + shutil.rmtree(self._duplicateBasedir) + self._duplicateBasedir = None + self._duplicateDirname = None + def commit(self, summary, body=None): + """ + Commit the current working directory, with a commit message + string summary and body. Return the name of the old revision + (or None if versioning is not supported). + """ + if body is not None: + summary += '\n' + body + descriptor, filename = tempfile.mkstemp() + revision = None + try: + temp_file = os.fdopen(descriptor, 'wb') + temp_file.write(summary) + temp_file.flush() + revision = self._rcs_commit(filename) + temp_file.close() + finally: + os.remove(filename) + return revision + def precommit(self, directory): + pass + def postcommit(self, directory): + pass + def _u_invoke(self, args, expect=(0,), cwd=None): + if cwd == None: + cwd = self.rootdir + try : + if self.verboseInvoke == True: + print "%s$ %s" % (cwd, " ".join(args)) + if sys.platform != "win32": + q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd) + else: + # win32 don't have os.execvp() so have to run command in a shell + q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, + shell=True, cwd=cwd) + except OSError, e : + strerror = "%s\nwhile executing %s" % (e.args[1], args) + raise CommandError(strerror, e.args[0]) + output, error = q.communicate() + status = q.wait() + if status not in expect: + raise CommandError(error, status) + return status, output, error + def _u_invoke_client(self, *args, **kwargs): + directory = kwargs.get('directory',None) + expect = kwargs.get('expect', (0,)) + cl_args = [self.client] + cl_args.extend(args) + return self._u_invoke(cl_args, expect, cwd=directory) + def _u_search_parent_directories(self, path, filename): + """ + Find the file (or directory) named filename in path or in any + of path's parents. + + e.g. + search_parent_directories("/a/b/c", ".be") + will return the path to the first existing file from + /a/b/c/.be + /a/b/.be + /a/.be + /.be + or None if none of those files exist. + """ + path = os.path.realpath(path) + assert os.path.exists(path) + old_path = None + while True: + if os.path.exists(os.path.join(path, filename)): + return os.path.join(path, filename) + if path == old_path: + return None + old_path = path + path = os.path.dirname(path) + def _u_rel_path(self, path, root=None): + """ + Return the relative path to path from root. + >>> rcs = new() + >>> rcs._u_rel_path("/a.b/c/.be", "/a.b/c") + '.be' + """ + if root == None: + assert self.rootdir != None, "RCS not rooted" + root = self.rootdir + if os.path.isabs(path): + absRoot = os.path.abspath(root) + absRootSlashedDir = os.path.join(absRoot,"") + assert path.startswith(absRootSlashedDir), \ + "file %s not in root %s" % (path, absRootSlashedDir) + assert path != absRootSlashedDir, \ + "file %s == root directory %s" % (path, absRootSlashedDir) + path = path[len(absRootSlashedDir):] + return path + def _u_abspath(self, path, root=None): + """ + Return the absolute path from a path realtive to root. + >>> rcs = new() + >>> rcs._u_abspath(".be", "/a.b/c") + '/a.b/c/.be' + """ + if root == None: + assert self.rootdir != None, "RCS not rooted" + root = self.rootdir + return os.path.abspath(os.path.join(root, path)) + def _u_create_id(self, name, email=None): + """ + >>> rcs = new() + >>> rcs._u_create_id("John Doe", "jdoe@example.com") + 'John Doe <jdoe@example.com>' + >>> rcs._u_create_id("John Doe") + 'John Doe' + """ + assert len(name) > 0 + if email == None or len(email) == 0: + return name else: - # win32 don't have os.execvp() so have to run command in a shell - q = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True, - cwd=cwd) - except OSError, e : - strerror = "%s\nwhile executing %s" % (e.args[1], args) - raise CommandError(strerror, e.args[0]) - output, error = q.communicate() - status = q.wait() - if status not in expect: - raise CommandError(error, status) - return status, output, error + return "%s <%s>" % (name, email) + def _u_parse_id(self, value): + """ + >>> rcs = new() + >>> rcs._u_parse_id("John Doe <jdoe@example.com>") + ('John Doe', 'jdoe@example.com') + >>> rcs._u_parse_id("John Doe") + ('John Doe', None) + >>> try: + ... rcs._u_parse_id("John Doe <jdoe@example.com><what?>") + ... except AssertionError: + ... print "Invalid match" + Invalid match + """ + emailexp = re.compile("(.*) <([^>]*)>(.*)") + match = emailexp.search(value) + if match == None: + email = None + name = value + else: + assert len(match.groups()) == 3 + assert match.groups()[2] == "", match.groups() + email = match.groups()[1] + name = match.groups()[0] + assert name != None + assert len(name) > 0 + return (name, email) + def _u_get_fallback_username(self): + name = None + for envariable in ["LOGNAME", "USERNAME"]: + if os.environ.has_key(envariable): + name = os.environ[envariable] + break + assert name != None + return name + def _u_get_fallback_email(self): + hostname = gethostname() + name = self._u_get_fallback_username() + return "%s@%s" % (name, hostname) + def _u_parse_commitfile(self, commitfile): + """ + Split the commitfile created in self.commit() back into + summary and header lines. + """ + f = file(commitfile, "rb") + summary = f.readline() + body = f.read() + body.lstrip('\n') + if len(body) == 0: + body = None + f.close + return (summary, body) + + +class RCStestCase(unittest.TestCase): + Class = RCS + def __init__(self, *args, **kwargs): + unittest.TestCase.__init__(self, *args, **kwargs) + self.dirname = None + def instantiateRCS(self): + return self.Class() + def setUp(self): + self.dir = Dir() + self.dirname = self.dir.path + self.rcs = self.instantiateRCS() + def tearDown(self): + del(self.rcs) + del(self.dirname) + def fullPath(self, path): + return os.path.join(self.dirname, path) + def assertPathExists(self, path): + fullpath = self.fullPath(path) + self.failUnless(os.path.exists(fullpath)==True, + "path %s does not exist" % fullpath) + def uidTest(self): + user_id = self.rcs.get_user_id() + self.failUnless(user_id != None, + "unable to get a user id") + user_idB = "John Doe <jdoe@example.com>" + if self.rcs.name in ["None", "hg"]: + self.assertRaises(SettingIDnotSupported, self.rcs.set_user_id, + user_idB) + else: + self.rcs.set_user_id(user_idB) + self.failUnless(self.rcs.get_user_id() == user_idB, + "user id not set correctly (was %s, is %s)" \ + % (user_id, self.rcs.get_user_id())) + self.failUnless(self.rcs.set_user_id(user_id) == None, + "unable to restore user id %s" % user_id) + self.failUnless(self.rcs.get_user_id() == user_id, + "unable to restore user id %s" % user_id) + def versionTest(self, path): + origpath = path + path = self.fullPath(path) + contentsA = "Lorem ipsum" + contentsB = "dolor sit amet" + self.rcs.set_file_contents(path,contentsA) + self.failUnless(self.rcs.get_file_contents(path)==contentsA, + "File contents not set or read correctly") + revision = self.rcs.commit("Commit current status") + self.failUnless(self.rcs.get_file_contents(path)==contentsA, + "Committing File contents not set or read correctly") + if self.rcs.versioned == True: + self.rcs.set_file_contents(path,contentsB) + self.failUnless(self.rcs.get_file_contents(path)==contentsB, + "File contents not set correctly after commit") + contentsArev = self.rcs.get_file_contents(path, revision) + self.failUnless(contentsArev==contentsA, \ + "Original file contents not saved in revision %s\n%s\n%s\n" \ + % (revision, contentsA, contentsArev)) + dup = self.rcs.duplicate_repo(revision) + duppath = os.path.join(dup, origpath) + dupcont = file(duppath, "rb").read() + self.failUnless(dupcont == contentsA) + self.rcs.remove_duplicate_repo() + def testRun(self): + self.failUnless(self.rcs.installed() == True, + "%s RCS not found" % self.Class.name) + if self.Class.name != "None": + self.failUnless(self.rcs.detect(self.dirname)==False, + "Detected %s RCS before initializing" \ + % self.Class.name) + self.rcs.init(self.dirname) + self.failUnless(self.rcs.detect(self.dirname)==True, + "Did not detect %s RCS after initializing" \ + % self.Class.name) + rp = os.path.realpath(self.rcs.rootdir) + dp = os.path.realpath(self.dirname) + self.failUnless(dp == rp or rp == None, + "%s RCS root in wrong dir (%s %s)" \ + % (self.Class.name, dp, rp)) + self.uidTest() + self.rcs.mkdir(self.fullPath('a')) + self.rcs.mkdir(self.fullPath('a/b')) + self.rcs.mkdir(self.fullPath('c')) + self.assertPathExists('a') + self.assertPathExists('a/b') + self.assertPathExists('c') + self.versionTest('a/text') + self.versionTest('a/b/text') + self.rcs.recursive_remove(self.fullPath('a')) + +unitsuite = unittest.TestLoader().loadTestsFromTestCase(RCStestCase) +suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) diff --git a/libbe/restconvert.py b/libbe/restconvert.py index cc7f866..57148e4 100644 --- a/libbe/restconvert.py +++ b/libbe/restconvert.py @@ -27,7 +27,7 @@ try : from xml.etree import ElementTree # Python 2.5 (and greater?) except ImportError : from elementtree import ElementTree - +import doctest def rest_xml(rest): warnings = StringIO() @@ -126,3 +126,5 @@ def foldout(name, arguments, options, content, lineno, content_offset, foldout += foldout_body foldout.set_class('foldout') return [foldout] + +suite = doctest.DocTestSuite() diff --git a/libbe/tests.py b/libbe/tests.py index 461e6e8..18277d7 100644 --- a/libbe/tests.py +++ b/libbe/tests.py @@ -16,40 +16,16 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import tempfile import shutil -import os -import os.path -from libbe import bugdir, bug, arch -cleanable = [] -def clean_up(): - global cleanable - tmp = cleanable - tmp.reverse() - for obj in tmp: - obj.clean_up() - cleanable = [] +from libbe import utility, names, restconvert, mapfile, config, diff, rcs, \ + arch, bzr, git, hg, bug, bugdir, plugin, cmdutil +import unittest -class Dir: - def __init__(self): - self.name = tempfile.mkdtemp(prefix="testdir") - cleanable.append(self) - def clean_up(self): - shutil.rmtree(self.name) +# can not use 'suite' or the base test.py file will include these suites twice. +testsuite = unittest.TestSuite([utility.suite, names.suite, restconvert.suite, + mapfile.suite, config.suite, diff.suite, + rcs.suite, arch.suite, bzr.suite, git.suite, + hg.suite, bug.suite, bugdir.suite, + plugin.suite, cmdutil.suite]) -def arch_dir(): - arch.ensure_user_id() - dir = Dir() - arch.init_tree(dir.name) - return dir - -def bug_arch_dir(): - dir = arch_dir() - return bugdir.create_bug_dir(dir.name, arch) - -def simple_bug_dir(): - dir = bug_arch_dir() - bug_a = bug.new_bug(dir, "a") - bug_b = bug.new_bug(dir, "b") - bug_b.status = "closed" - bug_a.save() - bug_b.save() - return dir +if __name__ == "__main__": + unittest.TextTestRunner(verbosity=2).run(testsuite) diff --git a/libbe/utility.py b/libbe/utility.py index 1fd83da..f595bdb 100644 --- a/libbe/utility.py +++ b/libbe/utility.py @@ -18,6 +18,8 @@ import calendar import time import os import tempfile +import shutil +import doctest class FileString(object): """Bare-bones pseudo-file class @@ -69,6 +71,14 @@ def get_file(f): else: return f +class Dir: + "A temporary directory for testing use" + def __init__(self): + self.path = tempfile.mkdtemp(prefix="BEtest") + def __del__(self): + shutil.rmtree(self.path) + def __call__(self): + return self.path RFC_2822_TIME_FMT = "%a, %d %b %Y %H:%M:%S +0000" @@ -162,3 +172,5 @@ def trimmed_string(instring): break out.append(line) return ''.join(out) + +suite = doctest.DocTestSuite() |