aboutsummaryrefslogtreecommitdiffstats
path: root/libbe
diff options
context:
space:
mode:
authorW. Trevor King <wking@drexel.edu>2008-11-24 07:09:03 -0500
committerW. Trevor King <wking@drexel.edu>2008-11-24 07:09:03 -0500
commit63a7726eba738fe2ed340027039ba655ff91898a (patch)
tree5fab6648ee4bcbe14c6413f46bfcc683cb9f34f2 /libbe
parentd80720fcad22215cdbd1f2b39434945364ba11d5 (diff)
downloadbugseverywhere-63a7726eba738fe2ed340027039ba655ff91898a.tar.gz
Added 'allow_no_rcs' flag to RCS file system access methods.
Now mapfile access has fewer special cases, and there is less redundant rcs.add/update code.
Diffstat (limited to 'libbe')
-rw-r--r--libbe/bug.py2
-rw-r--r--libbe/bugdir.py132
-rw-r--r--libbe/comment.py2
-rw-r--r--libbe/mapfile.py84
-rw-r--r--libbe/rcs.py109
5 files changed, 204 insertions, 125 deletions
diff --git a/libbe/bug.py b/libbe/bug.py
index ef1629c..09a2c1d 100644
--- a/libbe/bug.py
+++ b/libbe/bug.py
@@ -188,7 +188,7 @@ class Bug(object):
return os.path.join(my_dir, name)
def load(self, load_comments=False):
- map = mapfile.map_load(self.get_path("values"))
+ map = mapfile.map_load(self.rcs, self.get_path("values"))
self.summary = map.get("summary")
self.creator = map.get("creator")
self.target = map.get("target")
diff --git a/libbe/bugdir.py b/libbe/bugdir.py
index e134a2c..175f518 100644
--- a/libbe/bugdir.py
+++ b/libbe/bugdir.py
@@ -25,7 +25,8 @@ import doctest
import mapfile
import bug
import utility
-from rcs import rcs_by_name, detect_rcs, installed_rcs, PathNotInRoot
+import rcs
+
class NoBugDir(Exception):
def __init__(self, path):
@@ -67,18 +68,16 @@ def setting_property(name, valid=None, doc=None):
def getter(self):
value = self.settings.get(name)
if valid is not None:
- if value not in valid:
+ if value not in valid and value != None:
raise InvalidValue(name, value)
return value
def setter(self, value):
- if valid is not None:
- if value not in valid and value is not None:
- raise InvalidValue(name, value)
- if value is None:
- del self.settings[name]
- else:
- self.settings[name] = value
+ if value != getter(self):
+ if value is None:
+ del self.settings[name]
+ else:
+ self.settings[name] = value
self._save_settings(self.get_path("settings"), self.settings)
return property(getter, setter, doc=doc)
@@ -151,27 +150,36 @@ class BugDir (list):
return tree_version
def set_version(self):
- self.rcs.set_file_contents(self.get_path("version"), TREE_VERSION_STRING)
+ self.rcs.set_file_contents(self.get_path("version"),
+ TREE_VERSION_STRING)
rcs_name = setting_property("rcs_name",
("None", "bzr", "git", "Arch", "hg"),
- doc="The name of the current RCS. Kept seperate to make saving/loading settings easy. Don't set this attribute. Set .rcs instead, and .rcs_name will be automatically adjusted.")
+ doc=
+"""The name of the current RCS. Kept seperate to make saving/loading
+settings easy. Don't set this attribute. Set .rcs instead, and
+.rcs_name will be automatically adjusted.""")
_rcs = None
def _get_rcs(self):
return self._rcs
- def _set_rcs(self, rcs):
- if rcs == None:
- rcs = rcs_by_name("None")
- self._rcs = rcs
- rcs.root(self.root)
- self.rcs_name = rcs.name
+ def _set_rcs(self, new_rcs):
+ if new_rcs == None:
+ new_rcs = rcs.rcs_by_name("None")
+ self._rcs = new_rcs
+ new_rcs.root(self.root)
+ self.rcs_name = new_rcs.name
- rcs = property(_get_rcs, _set_rcs, doc="A revision control system (RCS) instance")
+ rcs = property(_get_rcs, _set_rcs,
+ doc="A revision control system (RCS) instance")
- _user_id = setting_property("user-id", doc="The user's prefered name. Kept seperate to make saving/loading settings easy. Don't set this attribute. Set .user_id instead, and ._user_id will be automatically adjusted. This setting is only saved if ._save_user_id == True")
+ _user_id = setting_property("user-id", doc=
+"""The user's prefered name. Kept seperate to make saving/loading
+settings easy. Don't set this attribute. Set .user_id instead,
+and ._user_id will be automatically adjusted. This setting is
+only saved if ._save_user_id == True""")
def _get_user_id(self):
if self._user_id == None and self.rcs != None:
@@ -183,9 +191,12 @@ class BugDir (list):
self.rcs.user_id = user_id
self._user_id = user_id
- user_id = property(_get_user_id, _set_user_id, doc="The user's prefered name, e.g 'John Doe <jdoe@example.com>'. Not that the Arch RCS backend *enforces* ids with this format.")
+ user_id = property(_get_user_id, _set_user_id, doc=
+"""The user's prefered name, e.g 'John Doe <jdoe@example.com>'. Note
+that the Arch RCS backend *enforces* ids with this format.""")
- target = setting_property("target", doc="The current project development target")
+ target = setting_property("target",
+ doc="The current project development target")
def save_user_id(self, user_id=None):
if user_id == None:
@@ -204,25 +215,26 @@ class BugDir (list):
deepdir = self.get_path()
if not os.path.exists(deepdir):
deepdir = os.path.dirname(deepdir)
- rcs = detect_rcs(deepdir)
+ new_rcs = rcs.detect_rcs(deepdir)
install = False
- if rcs.name == "None":
+ if new_rcs.name == "None":
if allow_rcs_init == True:
- rcs = installed_rcs()
- rcs.init(self.root)
- self.rcs = rcs
- return rcs
+ new_rcs = rcs.installed_rcs()
+ new_rcs.init(self.root)
+ self.rcs = new_rcs
+ return new_rcs
def load(self):
version = self.get_version()
if version != TREE_VERSION_STRING:
- raise NotImplementedError, "BugDir cannot handle version '%s' yet." % version
+ raise NotImplementedError, \
+ "BugDir cannot handle version '%s' yet." % version
else:
if not os.path.exists(self.get_path()):
raise NoBugDir(self.get_path())
self.settings = self._get_settings(self.get_path("settings"))
- self.rcs = rcs_by_name(self.rcs_name)
+ self.rcs = rcs.rcs_by_name(self.rcs_name) # set real RCS
if self._user_id != None: # was a user name in the settings file
self.save_user_id()
@@ -243,9 +255,20 @@ class BugDir (list):
bug.save()
def _get_settings(self, settings_path):
+ if self.rcs_name == None:
+ # Use a temporary RCS to loading settings the first time
+ RCS = rcs.rcs_by_name("None")
+ RCS.root(self.root)
+ else:
+ RCS = self.rcs
+
+ allow_no_rcs = not RCS.path_in_root(settings_path)
+ # allow_no_rcs=True should only be for the special case of
+ # configuring duplicate bugdir settings
+
try:
- settings = mapfile.map_load(settings_path)
- except mapfile.NoSuchFile:
+ settings = mapfile.map_load(RCS, settings_path, allow_no_rcs)
+ except rcs.NoSuchFile:
settings = {"rcs_name": "None"}
return settings
@@ -263,19 +286,18 @@ class BugDir (list):
if "user-id" in settings:
settings = copy.copy(settings)
del settings["user-id"]
- try:
- mapfile.map_save(self.rcs, settings_path, settings)
- except PathNotInRoot, e:
- # Special case for configuring duplicate bugdir settings
- none_rcs = rcs_by_name("None")
- none_rcs.root(settings_path)
- mapfile.map_save(none_rcs, settings_path, settings)
+ allow_no_rcs = not self.rcs.path_in_root(settings_path)
+ # allow_no_rcs=True should only be for the special case of
+ # configuring duplicate bugdir settings
+ mapfile.map_save(self.rcs, settings_path, settings, allow_no_rcs)
def duplicate_bugdir(self, revision):
duplicate_path = self.rcs.duplicate_repo(revision)
- # setup revision RCS as None, since the duplicate may not be initialized for versioning
- duplicate_settings_path = os.path.join(duplicate_path, ".be", "settings")
+ # setup revision RCS as None, since the duplicate may not be
+ # initialized for versioning
+ duplicate_settings_path = os.path.join(duplicate_path,
+ ".be", "settings")
duplicate_settings = self._get_settings(duplicate_settings_path)
if "rcs_name" in duplicate_settings:
duplicate_settings["rcs_name"] = "None"
@@ -372,7 +394,8 @@ class BugDir (list):
if uuid not in self.bug_map:
self._bug_map_gen()
if uuid not in self.bug_map:
- raise KeyError("No bug matches %s" % uuid +str(self.bug_map)+str(self))
+ raise KeyError("No bug matches %s\n bug map: %s\n root: %s" \
+ % (uuid, self.bug_map, self.root))
if self.bug_map[uuid] == None:
self._load_bug(uuid)
return self.bug_map[uuid]
@@ -407,7 +430,8 @@ class BugDirTestCase(unittest.TestCase):
unittest.TestCase.__init__(self, *args, **kwargs)
def setUp(self):
self.dir = utility.Dir()
- self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False, allow_rcs_init=True)
+ self.bugdir = BugDir(self.dir.path, sink_to_existing_root=False,
+ allow_rcs_init=True)
self.rcs = self.bugdir.rcs
def tearDown(self):
self.rcs.cleanup()
@@ -429,16 +453,24 @@ class BugDirTestCase(unittest.TestCase):
self.bugdir.save()
new = self.rcs.commit("Fixed bug a")
dupdir = self.bugdir.duplicate_bugdir(original)
- self.failUnless(dupdir.root != self.bugdir.root, "%s, %s" % (dupdir.root, self.bugdir.root))
+ self.failUnless(dupdir.root != self.bugdir.root,
+ "%s, %s" % (dupdir.root, self.bugdir.root))
bugAorig = dupdir.bug_from_uuid("a")
- self.failUnless(bugA != bugAorig, "\n%s\n%s" % (bugA.string(), bugAorig.string()))
+ self.failUnless(bugA != bugAorig,
+ "\n%s\n%s" % (bugA.string(), bugAorig.string()))
bugAorig.status = "fixed"
- self.failUnless(bug.cmp_status(bugA, bugAorig)==0, "%s, %s" % (bugA.status, bugAorig.status))
- self.failUnless(bug.cmp_severity(bugA, bugAorig)==0, "%s, %s" % (bugA.severity, bugAorig.severity))
- self.failUnless(bug.cmp_assigned(bugA, bugAorig)==0, "%s, %s" % (bugA.assigned, bugAorig.assigned))
- self.failUnless(bug.cmp_time(bugA, bugAorig)==0, "%s, %s" % (bugA.time, bugAorig.time))
- self.failUnless(bug.cmp_creator(bugA, bugAorig)==0, "%s, %s" % (bugA.creator, bugAorig.creator))
- self.failUnless(bugA == bugAorig, "\n%s\n%s" % (bugA.string(), bugAorig.string()))
+ self.failUnless(bug.cmp_status(bugA, bugAorig)==0,
+ "%s, %s" % (bugA.status, bugAorig.status))
+ self.failUnless(bug.cmp_severity(bugA, bugAorig)==0,
+ "%s, %s" % (bugA.severity, bugAorig.severity))
+ self.failUnless(bug.cmp_assigned(bugA, bugAorig)==0,
+ "%s, %s" % (bugA.assigned, bugAorig.assigned))
+ self.failUnless(bug.cmp_time(bugA, bugAorig)==0,
+ "%s, %s" % (bugA.time, bugAorig.time))
+ self.failUnless(bug.cmp_creator(bugA, bugAorig)==0,
+ "%s, %s" % (bugA.creator, bugAorig.creator))
+ self.failUnless(bugA == bugAorig,
+ "\n%s\n%s" % (bugA.string(), bugAorig.string()))
self.bugdir.remove_duplicate_bugdir()
self.failUnless(os.path.exists(dupdir.root)==False, str(dupdir.root))
def testRun(self):
diff --git a/libbe/comment.py b/libbe/comment.py
index 9b87f18..710c773 100644
--- a/libbe/comment.py
+++ b/libbe/comment.py
@@ -181,7 +181,7 @@ class Comment(Tree):
return os.path.join(my_dir, name)
def load(self):
- map = mapfile.map_load(self.get_path("values"))
+ map = mapfile.map_load(self.rcs, self.get_path("values"))
self.time = utility.str_to_time(map["Date"])
self.From = map["From"]
self.in_reply_to = map.get("In-reply-to")
diff --git a/libbe/mapfile.py b/libbe/mapfile.py
index 9a7fa8b..559d713 100644
--- a/libbe/mapfile.py
+++ b/libbe/mapfile.py
@@ -29,28 +29,27 @@ class IllegalValue(Exception):
Exception.__init__(self, 'Illegal value "%s"' % value)
self.value = value
-def generate(f, map, context=3):
- """Generate a format-2 mapfile. This is a simpler format, but should merge
- better, because there's no chance of confusion for appends, and lines
- are unique for both key and value.
+def generate(map, context=3):
+ """Generate a format-2 mapfile content string. This is a simpler
+ format, but should merge better, because there's no chance of
+ confusion for appends, and lines are unique for both key and
+ value.
- >>> f = utility.FileString()
- >>> generate(f, {"q":"p"})
- >>> f.str
+ >>> generate({"q":"p"})
'\\n\\n\\nq=p\\n\\n\\n\\n'
- >>> generate(f, {"q=":"p"})
+ >>> generate({"q=":"p"})
Traceback (most recent call last):
IllegalKey: Illegal key "q="
- >>> generate(f, {"q\\n":"p"})
+ >>> generate({"q\\n":"p"})
Traceback (most recent call last):
IllegalKey: Illegal key "q\\n"
- >>> generate(f, {"":"p"})
+ >>> generate({"":"p"})
Traceback (most recent call last):
IllegalKey: Illegal key ""
- >>> generate(f, {">q":"p"})
+ >>> generate({">q":"p"})
Traceback (most recent call last):
IllegalKey: Illegal key ">q"
- >>> generate(f, {"q":"p\\n"})
+ >>> generate({"q":"p\\n"})
Traceback (most recent call last):
IllegalValue: Illegal value "p\\n"
"""
@@ -68,63 +67,48 @@ def generate(f, map, context=3):
if "\n" in map[key]:
raise IllegalValue(map[key].encode('string_escape'))
+ lines = []
for key in keys:
for i in range(context):
- f.write("\n")
- f.write("%s=%s\n" % (key.encode("utf-8"), map[key].encode("utf-8")))
+ lines.append("")
+ lines.append("%s=%s" % (key, map[key]))
for i in range(context):
- f.write("\n")
+ lines.append("")
+ return '\n'.join(lines) + '\n'
-def parse(f):
+def parse(contents):
"""
- Parse a format-2 mapfile.
+ Parse a format-2 mapfile string.
>>> parse('\\n\\n\\nq=p\\n\\n\\n\\n')['q']
- u'p'
+ 'p'
>>> parse('\\n\\nq=\\'p\\'\\n\\n\\n\\n')['q']
- u"\'p\'"
- >>> f = utility.FileString()
- >>> generate(f, {"a":"b", "c":"d", "e":"f"})
- >>> dict = parse(f)
+ "\'p\'"
+ >>> contents = generate({"a":"b", "c":"d", "e":"f"})
+ >>> dict = parse(contents)
>>> dict["a"]
- u'b'
+ 'b'
>>> dict["c"]
- u'd'
+ 'd'
>>> dict["e"]
- u'f'
+ 'f'
"""
- f = utility.get_file(f)
result = {}
- for line in f:
- line = line.decode("utf-8").rstrip('\n')
+ for line in contents.splitlines():
+ line = line.rstrip('\n')
if len(line) == 0:
continue
- name,value = [f for f in line.split('=', 1)]
+ name,value = [field for field in line.split('=', 1)]
assert not result.has_key(name)
result[name] = value
return result
-def map_save(rcs, path, map):
+def map_save(rcs, path, map, allow_no_rcs=False):
"""Save the map as a mapfile to the specified path"""
- add = not os.path.exists(path)
- output = file(path, "wb")
- generate(output, map)
- output.close()
- if add:
- rcs.add(path)
- else:
- rcs.update(path)
+ contents = generate(map)
+ rcs.set_file_contents(path, contents, allow_no_rcs)
-class NoSuchFile(Exception):
- def __init__(self, pathname):
- Exception.__init__(self, "No such file: %s" % pathname)
-
-
-def map_load(path):
- try:
- return parse(file(path, "rb"))
- except IOError, e:
- if e.errno != errno.ENOENT:
- raise e
- raise NoSuchFile(path)
+def map_load(rcs, path, allow_no_rcs=False):
+ contents = rcs.get_file_contents(path, allow_no_rcs=allow_no_rcs)
+ return parse(contents)
suite = doctest.DocTestSuite()
diff --git a/libbe/rcs.py b/libbe/rcs.py
index f222795..3519c3d 100644
--- a/libbe/rcs.py
+++ b/libbe/rcs.py
@@ -64,8 +64,22 @@ class CommandError(Exception):
class SettingIDnotSupported(NotImplementedError):
pass
+class RCSnotRooted(Exception):
+ def __init__(self):
+ msg = "RCS not rooted"
+ Exception.__init__(self, msg)
+
class PathNotInRoot(Exception):
- pass
+ def __init__(self, path, root):
+ msg = "Path '%s' not in root '%s'" % (path, root)
+ Exception.__init__(self, msg)
+ self.path = path
+ self.root = root
+
+class NoSuchFile(Exception):
+ def __init__(self, pathname):
+ Exception.__init__(self, "No such file: %s" % pathname)
+
def new():
return RCS()
@@ -255,6 +269,8 @@ class RCS(object):
Remove a file/directory and all its decendents from both
version control and the filesystem.
"""
+ if not os.path.exists(dirname):
+ raise NoSuchFile(dirname)
for dirpath,dirnames,filenames in os.walk(dirname, topdown=False):
filenames.extend(dirnames)
for path in filenames:
@@ -270,34 +286,44 @@ class RCS(object):
at path.
"""
self._rcs_update(self._u_rel_path(path))
- def get_file_contents(self, path, revision=None):
+ def get_file_contents(self, path, revision=None, allow_no_rcs=False):
"""
Get the file as it was in a given revision.
Revision==None specifies the current revision.
"""
- relpath = self._u_rel_path(path)
- return self._rcs_get_file_contents(relpath, revision).decode("utf-8")
- def set_file_contents(self, path, contents):
+ if not os.path.exists(path):
+ raise NoSuchFile(path)
+ if self._use_rcs(path, allow_no_rcs):
+ relpath = self._u_rel_path(path)
+ contents = self._rcs_get_file_contents(relpath,revision)
+ else:
+ contents = file(path, "rb").read()
+ return contents.decode("utf-8")
+ def set_file_contents(self, path, contents, allow_no_rcs=False):
"""
Set the file contents under version control.
"""
add = not os.path.exists(path)
file(path, "wb").write(contents.encode("utf-8"))
- if add:
- self.add(path)
- else:
- self.update(path)
- def mkdir(self, path):
+
+ if self._use_rcs(path, allow_no_rcs):
+ if add:
+ self.add(path)
+ else:
+ self.update(path)
+ def mkdir(self, path, allow_no_rcs=False):
"""
Create (if neccessary) a directory at path under version
control.
"""
if not os.path.exists(path):
os.mkdir(path)
- self.add(path)
+ if self._use_rcs(path, allow_no_rcs):
+ self.add(path)
else:
assert os.path.isdir(path)
- self.update(path)
+ if self._use_rcs(path, allow_no_rcs):
+ self.update(path)
def duplicate_repo(self, revision=None):
"""
Get the repository as it was in a given revision.
@@ -387,6 +413,43 @@ class RCS(object):
or None if none of those files exist.
"""
return search_parent_directories(path, filename)
+ def _use_rcs(self, path, allow_no_rcs):
+ """
+ Try and decide if _rcs_add/update/mkdir/etc calls will
+ succeed. Returns True is we think the rcs_call would
+ succeeed, and False otherwise.
+ """
+ use_rcs = True
+ exception = None
+ if self.rootdir != None:
+ if self.path_in_root(path) == False:
+ use_rcs = False
+ exception = PathNotInRoot(path, self.rootdir)
+ else:
+ use_rcs = False
+ exception = RCSnotRooted
+ if use_rcs == False and allow_no_rcs==False:
+ raise exception
+ return use_rcs
+ def path_in_root(self, path, root=None):
+ """
+ Return the relative path to path from root.
+ >>> rcs = new()
+ >>> rcs.path_in_root("/a.b/c/.be", "/a.b/c")
+ True
+ >>> rcs.path_in_root("/a.b/.be", "/a.b/c")
+ False
+ """
+ if root == None:
+ if self.rootdir == None:
+ raise RCSnotRooted
+ root = self.rootdir
+ path = os.path.abspath(path)
+ absRoot = os.path.abspath(root)
+ absRootSlashedDir = os.path.join(absRoot,"")
+ if not path.startswith(absRootSlashedDir):
+ return False
+ return True
def _u_rel_path(self, path, root=None):
"""
Return the relative path to path from root.
@@ -395,18 +458,18 @@ class RCS(object):
'.be'
"""
if root == None:
- assert self.rootdir != None, "RCS not rooted"
+ if self.rootdir == None:
+ raise RCSnotRooted
root = self.rootdir
- if os.path.isabs(path):
- absRoot = os.path.abspath(root)
- absRootSlashedDir = os.path.join(absRoot,"")
- if not path.startswith(absRootSlashedDir):
- raise PathNotInRoot, \
- "file %s not in root %s" % (path, absRootSlashedDir)
- assert path != absRootSlashedDir, \
- "file %s == root directory %s" % (path, absRootSlashedDir)
- path = path[len(absRootSlashedDir):]
- return path
+ path = os.path.abspath(path)
+ absRoot = os.path.abspath(root)
+ absRootSlashedDir = os.path.join(absRoot,"")
+ if not path.startswith(absRootSlashedDir):
+ raise PathNotInRoot(path, absRootSlashedDir)
+ assert path != absRootSlashedDir, \
+ "file %s == root directory %s" % (path, absRootSlashedDir)
+ relpath = path[len(absRootSlashedDir):]
+ return relpath
def _u_abspath(self, path, root=None):
"""
Return the absolute path from a path realtive to root.