diff options
author | Matěj Cepl <mcepl@cepl.eu> | 2015-01-09 01:47:46 +0100 |
---|---|---|
committer | Matěj Cepl <mcepl@cepl.eu> | 2022-11-04 13:35:39 +0100 |
commit | e1ae554998880beb4eae3d446f9c83e9924850ef (patch) | |
tree | 11ba5ff3e993ff356d7cbd54ccfe3a7013499bfc | |
parent | 3babbe21a5d1d3d4d45fc8c89251460cb2b781f9 (diff) | |
download | pyg-e1ae554998880beb4eae3d446f9c83e9924850ef.tar.gz |
Getting some kind of parsed tree ... pretty ugly, so far.
-rw-r--r-- | parse_received.py | 41 | ||||
-rw-r--r-- | test/test_parse_received.py | 50 |
2 files changed, 78 insertions, 13 deletions
diff --git a/parse_received.py b/parse_received.py index 90c8e81..ad28117 100644 --- a/parse_received.py +++ b/parse_received.py @@ -234,6 +234,7 @@ __lg.add('IDSEP', r"(?i)\bid\b") __lg.add('TCPSEP', r"(?i)\btcp\b") __lg.add('SMTPSEP', r"(?i)\b[e]?smtp\b") __lg.add('IPV4ADDRESS', r"\[?\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\]?") +__lg.add('SMTPVERSION', r"\d{1,2}\.\d{1,2}\.\d{1,2}/\d{1,2}\.\d{1,2}\.\d{1,2}") __lg.add('DOMAIN', r"(?i)[<(]?(([\w][\w\-\.]*)\.)?([\w][\w\-]+)*" + r"(\.([a-z][a-z]*))[>)]?") __weekday = r"(Mon|Tue|Wed|Thu|Fri|Sat|Sun)" @@ -242,6 +243,7 @@ __lg.add('DATETIME', (r"%s\s*,\s+\d{1,2}\s+%s\s+\d{4} " + r"\d{1,2}:\d{2}:\d{2}\s+[-+]\d{4}(\s*\([A-Z]{3}\))?") % (__weekday, __month_name)) __lg.add("EMAILADDR", r'<[a-zA-Z0-9_.+-]+@[a-zA-Z0-9._-]+>') +__lg.add('IGNORABLESTR', r"may be forged") # __lg.add('ParentStr', r"\([^)]+\)") __lg.add('STRING', r"\S+") @@ -250,6 +252,27 @@ lexer = __lg.build() __pg = rply.ParserGenerator([rule.name for rule in lexer.rules], cache_id='received_parser') +""" +Result of: + +from server.mymailhost.com + (mail.mymailhost.com [126.43.75.123]) + by pilot01.cl.msu.edu (8.10.2/8.10.2) with ESMTP id NAA23597; + Fri, 12 Jul 2002 16:11:20 -0400 (EDT) + +is + +[[Token('FROMSEP', 'from'), + [Token('DOMAIN', 'server.mymailhost.com'), + [[Token('DOMAIN', 'mail.mymailhost.com'), + Token('IPV4ADDRESS', '[126.43.75.123]')]]]], + [Token('BYSEP', 'by'), + [Token('DOMAIN', 'pilot01.cl.msu.edu'), + [[Token('STRING', '8.10.2/8.10.2)')]]]], + [[[], [[Token('SMTPSEP', 'ESMTP')]], [Token('STRING', 'NAA23597;')], []]], + Token('DATETIME', 'Fri, 12 Jul 2002 16:11:20 -0400 (EDT)')] +""" + @__pg.production('main : stamp') def main(p): @@ -276,22 +299,28 @@ def by_domain(p): @__pg.production('extended-domain : DOMAIN') def extended_domain(p): - logging.debug('by_domain p = %s', p) + logging.debug('extended_domain p = %s', p) return [p[0], p[1]] @__pg.production('extended-domain : DOMAIN tcp-info') def extended_domain_tcp(p): - logging.debug('by_domain p = %s', p) + logging.debug('extended_domain_tcp p = %s', p) return [p[0], p[1]] @__pg.production('extended-domain : IPV4ADDRESS tcp-info') def extended_domain_addr(p): - logging.debug('by_domain p = %s', p) + logging.debug('extended_domain_addr p = %s', p) return [p[0], p[1]] +@__pg.production('extended-domain : extended-domain IGNORABLESTR') +def extended_domain_ignorable(p): + logging.debug('extended_domain_ignorable p = %s', p) + return p[0] + + @__pg.production('tcp-info : IPV4ADDRESS') def tcp_info(p): logging.debug('optinfo p = %s', p) @@ -304,10 +333,10 @@ def tcp_info_addr(p): return [p] -@__pg.production('tcp-info : STRING') +@__pg.production('tcp-info : SMTPVERSION') def tcp_info_string(p): logging.debug('optinfo p = %s', p) - return [p] + return p[0] @__pg.production('optinfo : via with id for') @@ -376,7 +405,7 @@ def link(p): @__pg.production('protocol : SMTPSEP') def protocol(p): logging.debug('protocol p = %s', p) - return [p[1]] + return p parser = __pg.build() diff --git a/test/test_parse_received.py b/test/test_parse_received.py index 5f365c2..e2afdaa 100644 --- a/test/test_parse_received.py +++ b/test/test_parse_received.py @@ -1,11 +1,15 @@ import logging -logging.basicConfig(level=logging.DEBUG) import parse_received +from pprint import pformat import unittest from rply import Token +logging.basicConfig(level=logging.DEBUG) + + + INPUT_01 = \ """from server.mymailhost.com (mail.mymailhost.com [126.43.75.123]) @@ -28,7 +32,7 @@ class TestReceivedLexer(unittest.TestCase): Token('IPV4ADDRESS', '[126.43.75.123]'), Token('BYSEP', 'by'), Token('DOMAIN', 'pilot01.cl.msu.edu'), - Token('STRING', '8.10.2/8.10.2)'), + Token('SMTPVERSION', '8.10.2/8.10.2'), Token('WITHSEP', 'with'), Token('SMTPSEP', 'ESMTP'), Token('IDSEP', 'id'), @@ -44,12 +48,10 @@ class TestReceivedLexer(unittest.TestCase): Token('DOMAIN', 'd1080.master.cz'), Token('DOMAIN', 'p-lab.cz'), Token('IPV4ADDRESS', '[89.185.245.149]'), - Token('STRING', 'may'), - Token('STRING', 'be'), - Token('STRING', 'forged))'), + Token('IGNORABLESTR', 'may be forged'), Token('BYSEP', 'by'), Token('DOMAIN', 'mx1.redhat.com'), - Token('STRING', '8.14.4/8.14.4)'), + Token('SMTPVERSION', '8.14.4/8.14.4'), Token('WITHSEP', 'with'), Token('SMTPSEP', 'ESMTP'), Token('IDSEP', 'id'), @@ -66,18 +68,52 @@ class TestReceivedParser(unittest.TestCase): def test_simple_01(self): expected = [ + [Token('FROMSEP', 'from'), + [Token('DOMAIN', 'server.mymailhost.com'), + [[Token('DOMAIN', 'mail.mymailhost.com'), + Token('IPV4ADDRESS', '[126.43.75.123]')]]] + ], + [Token('BYSEP', 'by'), + [Token('DOMAIN', 'pilot01.cl.msu.edu'), + Token('SMTPVERSION', '8.10.2/8.10.2')] + ], + [[[], + [[Token('SMTPSEP', 'ESMTP')]], + [Token('STRING', 'NAA23597;')], []] + ], + Token('DATETIME', 'Fri, 12 Jul 2002 16:11:20 -0400 (EDT)') ] + logging.debug('\nINPUT_01:\n%s', INPUT_01) stream = parse_received.lexer.lex(INPUT_01) logging.debug('\nstream:\n%s', list(stream)) + stream.idx = 0 parsed = parse_received.parser.parse(stream) + logging.debug('\nparsed:\n%s', pformat(parsed)) self.assertEqual(parsed, expected) - @unittest.skip('Not ready yet') + # @unittest.skip('Not ready yet') def test_simple_02(self): expected = [ + [Token('FROMSEP', 'from'), + [Token('DOMAIN', 'd1080.master.cz'), + [[Token('DOMAIN', 'p-lab.cz'), Token('IPV4ADDRESS', '[89.185.245.149]')]]] + ], + [Token('BYSEP', 'by'), + [Token('DOMAIN', 'mx1.redhat.com'), Token('SMTPVERSION', '8.14.4/8.14.4')] + ], + [[[], + [[Token('SMTPSEP', 'ESMTP')]], + [Token('STRING', 't07GaC1j031854')], + [Token('FORSEP', 'for')]] + ], + Token('DATETIME', 'Wed, 7 Jan 2015 11:36:13 -0500') ] + logging.debug('\nINPUT_02:\n%s', INPUT_02) stream = parse_received.lexer.lex(INPUT_02) + logging.debug('\nstream:\n%s', list(stream)) + stream.idx = 0 parsed = parse_received.parser.parse(stream) + logging.debug('\nparsed:\n%s', pformat(parsed)) self.assertEqual(parsed, expected) if __name__ == '__main__': |