aboutsummaryrefslogtreecommitdiffstats
path: root/libbe
diff options
context:
space:
mode:
authorW. Trevor King <wking@drexel.edu>2009-12-09 07:23:54 -0500
committerW. Trevor King <wking@drexel.edu>2009-12-09 07:23:54 -0500
commitf52fc3a243edf5ccef2dcdfd0c4b4cded4357e13 (patch)
treeefb9a0219064402a929a41cde649e535454ac8ef /libbe
parent3e6096fb5bcb9c9e8a50faa76461da96d145ca8f (diff)
downloadbugseverywhere-f52fc3a243edf5ccef2dcdfd0c4b4cded4357e13.tar.gz
Rethought libbe.util.id module
Diffstat (limited to 'libbe')
-rw-r--r--libbe/util/id.py294
1 files changed, 254 insertions, 40 deletions
diff --git a/libbe/util/id.py b/libbe/util/id.py
index d57205f..d443706 100644
--- a/libbe/util/id.py
+++ b/libbe/util/id.py
@@ -20,10 +20,13 @@ Handle ID creation and parsing.
"""
import os.path
+import re
import libbe
if libbe.TESTING == True:
+ import doctest
+ import sys
import unittest
try:
@@ -60,6 +63,25 @@ except ImportError:
return output.rstrip('\n')
+HIERARCHY = ['bugdir', 'bug', 'comment']
+
+
+class MultipleIDMatches (ValueError):
+ def __init__(self, id, matches):
+ msg = ("More than one id matches %s. "
+ "Please be more specific.\n%s" % (id, matches))
+ ValueError.__init__(self, msg)
+ self.id = id
+ self.matches = matches
+
+class NoIDMatches (KeyError):
+ def __init__(self, id, possible_ids):
+ msg = "No id matches %s.\n%s" % (id, possible_ids)
+ KeyError.__init__(self, msg)
+ self.id = id
+ self.possible_ids = possible_ids
+
+
def _assemble(*args):
args = list(args)
for i,arg in enumerate(args):
@@ -74,50 +96,242 @@ def _split(id):
args[i] = None
return args
-def _is_a_uuid(id):
- if id.startswith('uuid:'):
- return True
- return False
-
-def _uuid_to_id(id):
- return 'uuid:' + id
-
-def _id_to_uuid(id):
- return id[len('uuid:'):]
-
-def bugdir_id(bugdir, *args):
- return _assemble(_uuid_to_id(bugdir.uuid), *args)
-
-def bug_id(bug, *args):
- if bug.bugdir == None:
- bdid = None
- else:
- bdid = bugdir_id(bug.bugdir)
- return _assemble(bdid, _uuid_to_id(bug.uuid), *args)
-
-def comment_id(comment, *args):
- if comment.bug == None:
- bid = None
- else:
- bid = bug_id(comment.bug)
- return _assemble(bid, _uuid_to_id(comment.uuid), *args)
-
-def parse_id(id):
- args = _split(id)
- ret = {'bugdir':_id_to_uuid(args.pop(0))}
- type = 'bugdir'
- for child_name in ['bug', 'comment']:
- if len(args) > 0 and _is_a_uuid(args[0]):
- ret[child_name] = _id_to_uuid(args.pop(0))
- type = child_name
- ret['type'] = type
- ret['remaining'] = os.path.join(args)
+def _truncate(uuid, other_uuids, min_length=3):
+ chars = min_length
+ for id in other_uuids:
+ if id == uuid:
+ continue
+ while (id[:chars] == uuid[:chars]):
+ chars+=1
+ return uuid[:chars]
+
+def _expand(truncated_id, other_ids):
+ matches = []
+ for id in other_ids:
+ if id.startswith(truncated_id):
+ matches.append(id)
+ if len(matches) > 1:
+ raise MultipleIDMatches(truncated_id, matches)
+ if len(matches) == 0:
+ raise NoIDMatches(truncated_id, other_ids)
+ return matches[0]
+
+
+class ID (object):
+ """
+ IDs have several formats specialized for different uses.
+
+ In storage, all objects are represented by their uuid alone,
+ because that is the simplest globally unique identifier. You can
+ generate ids of this sort with the .storage() method. Because an
+ object's storage may be distributed across several chunks, and the
+ chunks may not have their own uuid, we generate chunk ids by
+ prepending the objects uuid to the chunk name. The user id types
+ do not support this chunk extension feature.
+
+ For users, the full uuids are a bit overwhelming, so we truncate
+ them while retaining local uniqueness (with regards to the other
+ objects currently in storage). We also prepend truncated parent
+ ids for two reasons:
+ (1) so that a user can locate the repository containing the
+ referenced object. It would be hard to find bug 'XYZ' if
+ that's all you knew. Much easier with 'ABC/XYZ', where ABC
+ is the bugdir. Each project can publish a list of bugdir-id
+x - to - location mappings, e.g.
+ ABC...(full uuid)...DEF https://server.com/projectX/be/
+ which is easier than publishing all-object-ids-to-location
+ mappings.
+ (2) because it's easier to generate and parse truncated ids if
+ you don't have to fetch all the ids in the storage
+ repository, but can restrict yourself to a specific branch.
+ You can generate ids of this sort with the .user() method,
+ although in order to preform the truncation, your object (and its
+ parents must define a .sibling_uuids() method.
+
+
+ While users can use the convenient short user ids in the short
+ term, the truncation will inevitably lead to name collision. To
+ avoid that, we provide a non-truncated form of the short user ids
+ via the .long_user() method. These long user ids should be
+ converted to short user ids by intelligent user interfaces.
+
+ Related tools:
+ * get uuids back out of the user ids:
+ parse_user()
+ * scan text for user ids & convert to long user ids:
+ short_to_long_user()
+ * scan text for long user ids & convert to short user ids:
+ long_to_short_user()
+
+ Supported types: 'bugdir', 'bug', 'comment'
+ """
+ def __init__(self, object, type):
+ self._object = object
+ self._type = type
+ assert self._type in HIERARCHY, self._type
+ self.uuid = self._object.uuid
+
+ def storage(self, *args):
+ return _assemble(self._object.uuid, *args)
+
+ def _ancestors(self):
+ ret = [self._object]
+ index = HIERARCHY.index(self._type)
+ if index == 0:
+ return ret
+ o = self._object
+ for i in range(index, 0, -1):
+ parent_name = HIERARCHY[i-1]
+ o = getattr(o, parent_name)
+ ret.insert(0, o)
+ return ret
+
+ def long_user(self):
+ return _assemble(*[o.uuid for o in self._ancestors()])
+
+ def user(self):
+ return _assemble(*[_truncate(o.uuid, o.sibling_uuids())
+ for o in self._ancestors()])
+
+def parse_user(id):
+ """
+ >>> parse_user('ABC/DEF/GHI') == \\
+ ... {'bugdir':'ABC', 'bug':'DEF', 'comment':'GHI', 'type':'comment'}
+ True
+ >>> parse_user('ABC/DEF') == \\
+ ... {'bugdir':'ABC', 'bug':'DEF', 'type':'bug'}
+ True
+ >>> parse_user('ABC') == \\
+ ... {'bugdir':'ABC', 'type':'bugdir'}
+ True
+ """
+ ret = {}
+ args = _split(id)
+ assert len(args) > 0 and len(args) < 4, 'Invalid id "%s"' % id
+ for type,arg in zip(HIERARCHY, args):
+ assert len(arg) > 0, 'Invalid part "%s" of id "%s"' % (arg, id)
+ ret['type'] = type
+ ret[type] = arg
return ret
+REGEXP = '#([-a-f0-9]*)(/[-a-g0-9]*)?(/[-a-g0-9]*)?#'
+
+class IDreplacer (object):
+ def __init__(self, bugdirs, direction):
+ self.bugdirs = bugdirs
+ self.direction = direction
+ def __call__(self, match):
+ ids = [m.lstrip('/') for m in match.groups() if m != None]
+ ids = self.switch_ids(ids)
+ return '#' + '/'.join(ids) + '#'
+ def switch_id(self, id, sibling_uuids):
+ if id == None:
+ return None
+ if self.direction == 'long_to_short':
+ return _truncate(id, sibling_uuids)
+ return _expand(id, sibling_uuids)
+ def switch_ids(self, ids):
+ assert ids[0] != None, ids
+ if self.direction == 'long_to_short':
+ bugdir = [bd for bd in self.bugdirs if bd.uuid == ids[0]][0]
+ objects = [bugdir]
+ if len(ids) >= 2:
+ bug = bugdir.bug_from_uuid(ids[1])
+ objects.append(bug)
+ if len(ids) >= 3:
+ comment = bug.comment_from_uuid(ids[2])
+ objects.append(comment)
+ for i,obj in enumerate(objects):
+ ids[i] = self.switch_id(ids[i], obj.sibling_uuids())
+ else:
+ ids[0] = self.switch_id(ids[0], [bd.uuid for bd in self.bugdirs])
+ if len(ids) == 1:
+ return ids
+ bugdir = [bd for bd in self.bugdirs if bd.uuid == ids[0]][0]
+ ids[1] = self.switch_id(ids[1], bugdir.uuids())
+ if len(ids) == 2:
+ return ids
+ bug = bugdir.bug_from_uuid(ids[1])
+ ids[2] = self.switch_id(ids[2], bug.uuids())
+ return ids
+
+def short_to_long_user(bugdirs, text):
+ return re.sub(REGEXP, IDreplacer(bugdirs, 'short_to_long'), text)
+def long_to_short_user(bugdirs, text):
+ return re.sub(REGEXP, IDreplacer(bugdirs, 'long_to_short'), text)
+
if libbe.TESTING == True:
class UUIDtestCase(unittest.TestCase):
def testUUID_gen(self):
id = uuid_gen()
- self.failUnless(len(id) == 36, "invalid UUID '%s'" % id)
+ self.failUnless(len(id) == 36, 'invalid UUID "%s"' % id)
+
+ class DummyObject (object):
+ def __init__(self, uuid, siblings=[]):
+ self.uuid = uuid
+ self._siblings = siblings
+ def sibling_uuids(self):
+ return self._siblings
+
+ class IDtestCase(unittest.TestCase):
+ def setUp(self):
+ self.bugdir = DummyObject('1234abcd')
+ self.bug = DummyObject('abcdef', ['a1234', 'ab9876'])
+ self.bug.bugdir = self.bugdir
+ self.comment = DummyObject('12345678', ['1234abcd', '1234cdef'])
+ self.comment.bug = self.bug
+ self.bd_id = ID(self.bugdir, 'bugdir')
+ self.b_id = ID(self.bug, 'bug')
+ self.c_id = ID(self.comment, 'comment')
+ def test_storage(self):
+ self.failUnless(self.bd_id.storage() == self.bugdir.uuid,
+ self.bd_id.storage())
+ self.failUnless(self.b_id.storage() == self.bug.uuid,
+ self.b_id.storage())
+ self.failUnless(self.c_id.storage() == self.comment.uuid,
+ self.c_id.storage())
+ self.failUnless(self.bd_id.storage('x','y','z') == \
+ '1234abcd/x/y/z', self.bd_id.storage())
+ def test_long_user(self):
+ self.failUnless(self.bd_id.long_user() == self.bugdir.uuid,
+ self.bd_id.long_user())
+ self.failUnless(self.b_id.long_user() == \
+ '/'.join([self.bugdir.uuid, self.bug.uuid]),
+ self.b_id.long_user())
+ self.failUnless(self.c_id.long_user() ==
+ '/'.join([self.bugdir.uuid, self.bug.uuid,
+ self.comment.uuid]),
+ self.c_id.long_user)
+ def test_user(self):
+ self.failUnless(self.bd_id.user() == '123',
+ self.bd_id.user())
+ self.failUnless(self.b_id.user() == '123/abc',
+ self.b_id.user())
+ self.failUnless(self.c_id.user() == '123/abc/12345',
+ self.c_id.user())
+
+ class IDtestCase(unittest.TestCase):
+ def setUp(self):
+ self.bugdir = DummyObject('1234abcd')
+ self.bug = DummyObject('abcdef', ['a1234', 'ab9876'])
+ self.bug.bugdir = self.bugdir
+ self.bugdir.bug_from_uuid = lambda uuid: self.bug
+ self.bugdir.uuids = lambda : self.bug.sibling_uuids() + [self.bug.uuid]
+ self.comment = DummyObject('12345678', ['1234abcd', '1234cdef'])
+ self.comment.bug = self.bug
+ self.bug.comment_from_uuid = lambda uuid: self.comment
+ self.bug.uuids = lambda : self.comment.sibling_uuids() + [self.comment.uuid]
+ self.bd_id = ID(self.bugdir, 'bugdir')
+ self.b_id = ID(self.bug, 'bug')
+ self.c_id = ID(self.comment, 'comment')
+ self.short = 'bla bla #123/abc# bla bla #123/abc/12345# bla bla'
+ self.long = 'bla bla #1234abcd/abcdef# bla bla #1234abcd/abcdef/12345678# bla bla'
+ def test_short_to_long(self):
+ self.failUnless(short_to_long_user([self.bugdir], self.short) == self.long,
+ '\n' + self.short + '\n' + short_to_long_user([self.bugdir], self.short) + '\n' + self.long)
+ def test_long_to_short(self):
+ self.failUnless(long_to_short_user([self.bugdir], self.long) == self.short,
+ '\n' + long_to_short_user([self.bugdir], self.long) + '\n' + self.short)
- suite = unittest.TestLoader().loadTestsFromTestCase(UUIDtestCase)
+ unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
+ suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])