diff options
Diffstat (limited to 'libbe/util')
-rw-r--r-- | libbe/util/id.py | 286 |
1 files changed, 196 insertions, 90 deletions
diff --git a/libbe/util/id.py b/libbe/util/id.py index 6b6b51d..f229bef 100644 --- a/libbe/util/id.py +++ b/libbe/util/id.py @@ -67,33 +67,56 @@ HIERARCHY = ['bugdir', 'bug', 'comment'] class MultipleIDMatches (ValueError): - def __init__(self, id, matches): - msg = ("More than one id matches %s. " - "Please be more specific.\n%s" % (id, matches)) + def __init__(self, id, common, matches): + msg = ('More than one id matches %s. ' + 'Please be more specific (%s/*).\n%s' % (id, common, matches)) ValueError.__init__(self, msg) self.id = id + self.common = common self.matches = matches class NoIDMatches (KeyError): - def __init__(self, id, possible_ids): - msg = "No id matches %s.\n%s" % (id, possible_ids) - KeyError.__init__(self, msg) + def __init__(self, id, possible_ids, msg=None): + KeyError.__init__(self, id) self.id = id self.possible_ids = possible_ids + self.msg = msg + def __str__(self): + if self.msg == None: + return 'No id matches %s.\n%s' % (self.id, self.possible_ids) + return self.msg + +class InvalidIDStructure (KeyError): + def __init__(self, id, msg=None): + KeyError.__init__(self, id) + self.id = id + self.msg = msg + def __str__(self): + if self.msg == None: + return 'Invalid id structure "%s"' % self.id + return self.msg - -def _assemble(*args): +def _assemble(args, check_length=False): args = list(args) for i,arg in enumerate(args): if arg == None: args[i] = '' - return '/'.join(args) - -def _split(id): + id = '/'.join(args) + if check_length == True: + assert len(args) > 0, args + if len(args) > 3: + raise InvalidIDStructure(id, '%d > 3 levels in "%s"' % (len(args), id)) + return id + +def _split(id, check_length=False): args = id.split('/') for i,arg in enumerate(args): if arg == '': args[i] = None + if check_length == True: + assert len(args) > 0, args + if len(args) > 3: + raise InvalidIDStructure(id, '%d > 3 levels in "%s"' % (len(args), id)) return args def _truncate(uuid, other_uuids, min_length=3): @@ -105,14 +128,21 @@ def _truncate(uuid, other_uuids, min_length=3): chars+=1 return uuid[:chars] -def _expand(truncated_id, other_ids): +def _expand(truncated_id, common, other_ids): + other_ids = list(other_ids) + if len(other_ids) == 0: + raise NoIDMatches(truncated_id, other_ids) + if truncated_id == None: + if len(other_ids) == 1: + return other_ids[0] + raise MultipleIDMatches(truncated_id, common, other_ids) matches = [] other_ids = list(other_ids) for id in other_ids: if id.startswith(truncated_id): matches.append(id) if len(matches) > 1: - raise MultipleIDMatches(truncated_id, matches) + raise MultipleIDMatches(truncated_id, common, matches) if len(matches) == 0: raise NoIDMatches(truncated_id, other_ids) return matches[0] @@ -172,7 +202,7 @@ class ID (object): assert self._type in HIERARCHY, self._type def storage(self, *args): - return _assemble(self._object.uuid, *args) + return _assemble([self._object.uuid]+list(args)) def _ancestors(self): ret = [self._object] @@ -187,7 +217,8 @@ class ID (object): return ret def long_user(self): - return _assemble(*[o.uuid for o in self._ancestors()]) + return _assemble([o.uuid for o in self._ancestors()], + check_length=True) def user(self): ids = [] @@ -196,7 +227,7 @@ class ID (object): ids.append(None) else: ids.append(_truncate(o.uuid, o.sibling_uuids())) - return _assemble(*ids) + return _assemble(ids, check_length=True) def child_uuids(child_storage_ids): """ @@ -210,54 +241,74 @@ def child_uuids(child_storage_ids): if len(fields) == 1: yield fields[0] +def long_to_short_user(bugdirs, id): + ids = _split(id, check_length=True) + bugdir = [bd for bd in bugdirs if bd.uuid == ids[0]][0] + objects = [bugdir] + if len(ids) >= 2: + bug = bugdir.bug_from_uuid(ids[1]) + objects.append(bug) + if len(ids) >= 3: + comment = bug.comment_from_uuid(ids[2]) + objects.append(comment) + for i,obj in enumerate(objects): + ids[i] = _truncate(ids[i], obj.sibling_uuids()) + return _assemble(ids) + +def short_to_long_user(bugdirs, id): + ids = _split(id, check_length=True) + ids[0] = _expand(ids[0], common=None, + other_ids=[bd.uuid for bd in bugdirs]) + if len(ids) == 1: + return _assemble(ids) + bugdir = [bd for bd in bugdirs if bd.uuid == ids[0]][0] + ids[1] = _expand(ids[1], common=bugdir.id.user(), + other_ids=bugdir.uuids()) + if len(ids) == 2: + return _assemble(ids) + bug = bugdir.bug_from_uuid(ids[1]) + ids[2] = _expand(ids[2], common=bug.id.user(), + other_ids=bug.uuids()) + return _assemble(ids) + REGEXP = '#([-a-f0-9]*)(/[-a-g0-9]*)?(/[-a-g0-9]*)?#' class IDreplacer (object): - def __init__(self, bugdirs, direction): + def __init__(self, bugdirs, replace_fn): self.bugdirs = bugdirs - self.direction = direction + self.replace_fn = replace_fn def __call__(self, match): - ids = [m.lstrip('/') for m in match.groups() if m != None] - ids = self.switch_ids(ids) - return '#' + '/'.join(ids) + '#' - def switch_id(self, id, sibling_uuids): - if id == None: - return None - if self.direction == 'long_to_short': - return _truncate(id, sibling_uuids) - return _expand(id, sibling_uuids) - def switch_ids(self, ids): - assert ids[0] != None, ids - if self.direction == 'long_to_short': - bugdir = [bd for bd in self.bugdirs if bd.uuid == ids[0]][0] - objects = [bugdir] - if len(ids) >= 2: - bug = bugdir.bug_from_uuid(ids[1]) - objects.append(bug) - if len(ids) >= 3: - comment = bug.comment_from_uuid(ids[2]) - objects.append(comment) - for i,obj in enumerate(objects): - ids[i] = self.switch_id(ids[i], obj.sibling_uuids()) - else: - ids[0] = self.switch_id(ids[0], [bd.uuid for bd in self.bugdirs]) - if len(ids) == 1: - return ids - bugdir = [bd for bd in self.bugdirs if bd.uuid == ids[0]][0] - ids[1] = self.switch_id(ids[1], bugdir.uuids()) - if len(ids) == 2: - return ids - bug = bugdir.bug_from_uuid(ids[1]) - ids[2] = self.switch_id(ids[2], bug.uuids()) - return ids - -def short_to_long_user(bugdirs, text): - return re.sub(REGEXP, IDreplacer(bugdirs, 'short_to_long'), text) - -def long_to_short_user(bugdirs, text): - return re.sub(REGEXP, IDreplacer(bugdirs, 'long_to_short'), text) + ids = [] + for m in match.groups(): + if m == None: + m = '' + ids.append(m) + return '#' + self.replace_fn(self.bugdirs, ''.join(ids)) + '#' + +def short_to_long_text(bugdirs, text): + return re.sub(REGEXP, IDreplacer(bugdirs, short_to_long_user), text) +def long_to_short_text(bugdirs, text): + return re.sub(REGEXP, IDreplacer(bugdirs, long_to_short_user), text) + +def residual(base, fragment): + """ + >>> residual('ABC/DEF/', '//GHI') + ('//', 'GHI') + >>> residual('ABC/DEF/', '/D/GHI') + ('/D/', 'GHI') + >>> residual('ABC/DEF', 'A/D/GHI') + ('A/D/', 'GHI') + >>> residual('ABC/DEF', 'A/D/GHI/JKL') + ('A/D/', 'GHI/JKL') + """ + base = base.rstrip('/') + '/' + ids = fragment.split('/') + base_count = base.count('/') + root_ids = ids[:base_count] + [''] + residual_ids = ids[base_count:] + return ('/'.join(root_ids), '/'.join(residual_ids)) def _parse_user(id): """ @@ -270,21 +321,34 @@ def _parse_user(id): >>> _parse_user('ABC') == \\ ... {'bugdir':'ABC', 'type':'bugdir'} True + >>> _parse_user('') == \\ + ... {'bugdir':None, 'type':'bugdir'} + True + >>> _parse_user('/') == \\ + ... {'bugdir':None, 'bug':None, 'type':'bug'} + True + >>> _parse_user('/DEF/') == \\ + ... {'bugdir':None, 'bug':'DEF', 'comment':None, 'type':'comment'} + True + >>> _parse_user('a/b/c/d') + Traceback (most recent call last): + ... + InvalidIDStructure: 4 > 3 levels in "a/b/c/d" """ ret = {} - args = _split(id) - assert len(args) > 0 and len(args) < 4, 'Invalid id "%s"' % id - for type,arg in zip(HIERARCHY, args): - assert len(arg) > 0, 'Invalid part "%s" of id "%s"' % (arg, id) + args = _split(id, check_length=True) + for i,(type,arg) in enumerate(zip(HIERARCHY, args)): + if arg != None and len(arg) == 0: + raise InvalidIDStructure( + id, 'Invalid %s part %d "%s" of id "%s"' % (type, i, arg, id)) ret['type'] = type ret[type] = arg return ret def parse_user(bugdir, id): - long_id = short_to_long_user([bugdir], '#%s#' % id).strip('#') + long_id = short_to_long_user([bugdir], id) return _parse_user(long_id) - if libbe.TESTING == True: class UUIDtestCase(unittest.TestCase): def testUUID_gen(self): @@ -292,22 +356,28 @@ if libbe.TESTING == True: self.failUnless(len(id) == 36, 'invalid UUID "%s"' % id) class DummyObject (object): - def __init__(self, uuid, siblings=[]): + def __init__(self, uuid, parent=None, siblings=[]): self.uuid = uuid self._siblings = siblings + if parent == None: + type_i = 0 + else: + assert parent.type in HIERARCHY, parent + setattr(self, parent.type, parent) + type_i = HIERARCHY.index(parent.type) + 1 + self.type = HIERARCHY[type_i] + self.id = ID(self, self.type) def sibling_uuids(self): return self._siblings class IDtestCase(unittest.TestCase): def setUp(self): self.bugdir = DummyObject('1234abcd') - self.bug = DummyObject('abcdef', ['a1234', 'ab9876']) - self.bug.bugdir = self.bugdir - self.comment = DummyObject('12345678', ['1234abcd', '1234cdef']) - self.comment.bug = self.bug - self.bd_id = ID(self.bugdir, 'bugdir') - self.b_id = ID(self.bug, 'bug') - self.c_id = ID(self.comment, 'comment') + self.bug = DummyObject('abcdef', self.bugdir, ['a1234', 'ab9876']) + self.comment = DummyObject('12345678', self.bug, ['1234abcd', '1234cdef']) + self.bd_id = self.bugdir.id + self.b_id = self.bug.id + self.c_id = self.comment.id def test_storage(self): self.failUnless(self.bd_id.storage() == self.bugdir.uuid, self.bd_id.storage()) @@ -315,8 +385,9 @@ if libbe.TESTING == True: self.b_id.storage()) self.failUnless(self.c_id.storage() == self.comment.uuid, self.c_id.storage()) - self.failUnless(self.bd_id.storage('x','y','z') == \ - '1234abcd/x/y/z', self.bd_id.storage()) + self.failUnless(self.bd_id.storage('x', 'y', 'z') == \ + '1234abcd/x/y/z', + self.bd_id.storage('x', 'y', 'z')) def test_long_user(self): self.failUnless(self.bd_id.long_user() == self.bugdir.uuid, self.bd_id.long_user()) @@ -338,30 +409,65 @@ if libbe.TESTING == True: class ShortLongParseTestCase(unittest.TestCase): def setUp(self): self.bugdir = DummyObject('1234abcd') - self.bug = DummyObject('abcdef', ['a1234', 'ab9876']) - self.bug.bugdir = self.bugdir + self.bug = DummyObject('abcdef', self.bugdir, ['a1234', 'ab9876']) + self.comment = DummyObject('12345678', self.bug, ['1234abcd', '1234cdef']) + self.bd_id = self.bugdir.id + self.b_id = self.bug.id + self.c_id = self.comment.id self.bugdir.bug_from_uuid = lambda uuid: self.bug self.bugdir.uuids = lambda : self.bug.sibling_uuids() + [self.bug.uuid] - self.comment = DummyObject('12345678', ['1234abcd', '1234cdef']) - self.comment.bug = self.bug self.bug.comment_from_uuid = lambda uuid: self.comment self.bug.uuids = lambda : self.comment.sibling_uuids() + [self.comment.uuid] - self.bd_id = ID(self.bugdir, 'bugdir') - self.b_id = ID(self.bug, 'bug') - self.c_id = ID(self.comment, 'comment') self.short = 'bla bla #123/abc# bla bla #123/abc/12345# bla bla' self.long = 'bla bla #1234abcd/abcdef# bla bla #1234abcd/abcdef/12345678# bla bla' - self.short_id = '123/abc' - def test_short_to_long(self): - self.failUnless(short_to_long_user([self.bugdir], self.short) == self.long, - '\n' + self.short + '\n' + short_to_long_user([self.bugdir], self.short) + '\n' + self.long) - def test_long_to_short(self): - self.failUnless(long_to_short_user([self.bugdir], self.long) == self.short, - '\n' + long_to_short_user([self.bugdir], self.long) + '\n' + self.short) + self.short_id_parse_pairs = [ + ('', {'bugdir':'1234abcd', 'type':'bugdir'}), + ('123/abc', {'bugdir':'1234abcd', 'bug':'abcdef', + 'type':'bug'}), + ('123/abc/12345', {'bugdir':'1234abcd', 'bug':'abcdef', + 'comment':'12345678', 'type':'comment'}), + ] + self.short_id_exception_pairs = [ + ('z', NoIDMatches('z', ['1234abcd'])), + ('///', InvalidIDStructure( + '///', msg='4 > 3 levels in "///"')), + ('/', MultipleIDMatches( + None, '123', ['a1234', 'ab9876', 'abcdef'])), + ('123/', MultipleIDMatches( + None, '123', ['a1234', 'ab9876', 'abcdef'])), + ('123/abc/', MultipleIDMatches( + None, '123/abc', ['1234abcd','1234cdef','12345678'])), + ] + def test_short_to_long_text(self): + self.failUnless(short_to_long_text([self.bugdir], self.short) == self.long, + '\n' + self.short + '\n' + short_to_long_text([self.bugdir], self.short) + '\n' + self.long) + def test_long_to_short_text(self): + self.failUnless(long_to_short_text([self.bugdir], self.long) == self.short, + '\n' + long_to_short_text([self.bugdir], self.long) + '\n' + self.short) def test_parse_user(self): - self.failUnless(parse_user(self.bugdir, self.short_id) == \ - {'bugdir':'1234abcd', 'bug':'abcdef', 'type':'bug'}, - parse_user(self.bugdir, self.short_id)) + for short_id,parsed in self.short_id_parse_pairs: + ret = parse_user(self.bugdir, short_id) + self.failUnless(ret == parsed, + 'got %s\nexpected %s' % (ret, parsed)) + def test_parse_user_exceptions(self): + for short_id,exception in self.short_id_exception_pairs: + try: + ret = parse_user(self.bugdir, short_id) + self.fail('Expected parse_user(bugdir, "%s") to raise %s,' + '\n but it returned %s' + % (short_id, exception.__class__.__name__, ret)) + except exception.__class__, e: + for attr in dir(e): + if attr.startswith('_') or attr == 'args': + continue + value = getattr(e, attr) + expected = getattr(exception, attr) + self.failUnless( + value == expected, + 'Expected parse_user(bugdir, "%s") %s.%s' + '\n to be %s, but it is %s\n\n%s' + % (short_id, exception.__class__.__name__, + attr, expected, value, e)) unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) |