From 3bb1971fb39dd4d2d8cdd1509b51571ccf3070b2 Mon Sep 17 00:00:00 2001 From: Matěj Cepl Date: Tue, 6 Jan 2015 14:35:12 +0100 Subject: Groundwork for parsing Received: header (and generating Path: not to be forgotten) --- test/test_parse_received.py | 157 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 157 insertions(+) create mode 100644 test/test_parse_received.py (limited to 'test/test_parse_received.py') diff --git a/test/test_parse_received.py b/test/test_parse_received.py new file mode 100644 index 0000000..8fa1945 --- /dev/null +++ b/test/test_parse_received.py @@ -0,0 +1,157 @@ +import datetime +import email +import unittest + +from dateutil.tz import tzoffset +from rply import Token + +import parse_received + + +#logging.basicConfig(level=logging.DEBUG) + + +INPUT_01 = \ + """from server.mymailhost.com + (mail.mymailhost.com [126.43.75.123]) + by pilot01.cl.msu.edu (8.10.2/8.10.2) with ESMTP id NAA23597; + Fri, 12 Jul 2002 16:11:20 -0400 (EDT)""" +INPUT_02 = \ + """from d1080.master.cz (p-lab.cz [89.185.245.149] (may be forged)) + by mx1.redhat.com (8.14.4/8.14.4) with ESMTP id t07GaC1j031854 + for ; Wed, 7 Jan 2015 11:36:13 -0500""" + + +class TestReceivedLexer(unittest.TestCase): + maxDiff = None + + def test_simple_01(self): + expected = [ + Token('FROMSEP', 'from'), + Token('DOMAIN', 'server.mymailhost.com'), + Token('DOMAIN', 'mail.mymailhost.com'), + Token('IPV4ADDRESS', '[126.43.75.123]'), + Token('BYSEP', 'by'), + Token('DOMAIN', 'pilot01.cl.msu.edu'), + Token('SMTPVERSION', '8.10.2/8.10.2'), + Token('WITHSEP', 'with'), + Token('SMTPSEP', 'ESMTP'), + Token('IDSEP', 'id'), + Token('STRING', 'NAA23597;'), + Token('DATETIME', 'Fri, 12 Jul 2002 16:11:20 -0400 (EDT)') + ] + parsed = list(parse_received.lexer.lex(INPUT_01)) + self.assertEqual(parsed, expected) + + def test_simple_02(self): + expected = [ + Token('FROMSEP', 'from'), + Token('DOMAIN', 'd1080.master.cz'), + Token('DOMAIN', 'p-lab.cz'), + Token('IPV4ADDRESS', '[89.185.245.149]'), + Token('IGNORABLESTR', 'may be forged'), + Token('BYSEP', 'by'), + Token('DOMAIN', 'mx1.redhat.com'), + Token('SMTPVERSION', '8.14.4/8.14.4'), + Token('WITHSEP', 'with'), + Token('SMTPSEP', 'ESMTP'), + Token('IDSEP', 'id'), + Token('STRING', 't07GaC1j031854'), + Token('FORSEP', 'for'), + Token('EMAILADDR', ''), + Token('DATETIME', 'Wed, 7 Jan 2015 11:36:13 -0500') + ] + parsed = list(parse_received.lexer.lex(INPUT_02)) + self.assertEqual(parsed, expected) + + +class TestReceivedParser(unittest.TestCase): + maxDiff = None + + def test_simple_01(self): + expected = [ + [Token('FROMSEP', 'from'), + [Token('DOMAIN', 'server.mymailhost.com'), + [Token('DOMAIN', 'mail.mymailhost.com'), + Token('IPV4ADDRESS', '[126.43.75.123]')]] + ], + [Token('BYSEP', 'by'), + [Token('DOMAIN', 'pilot01.cl.msu.edu'), + [Token('SMTPVERSION', '8.10.2/8.10.2')]] + ], + [[[], + [Token('SMTPSEP', 'ESMTP')], + [Token('STRING', 'NAA23597;')], []] + ], + Token('DATETIME', 'Fri, 12 Jul 2002 16:11:20 -0400 (EDT)') + ] + stream = parse_received.lexer.lex(INPUT_01) + stream.idx = 0 + parsed = parse_received.parser.parse(stream) + self.assertEqual(parsed, expected) + + def test_simple_02(self): + expected = [ + [Token('FROMSEP', 'from'), + [Token('DOMAIN', 'd1080.master.cz'), + [Token('DOMAIN', 'p-lab.cz'), + Token('IPV4ADDRESS', '[89.185.245.149]')]] + ], + [Token('BYSEP', 'by'), + [Token('DOMAIN', 'mx1.redhat.com'), + [Token('SMTPVERSION', '8.14.4/8.14.4')]] + ], + [[[], + [Token('SMTPSEP', 'ESMTP')], + [Token('STRING', 't07GaC1j031854')], + [Token('FORSEP', 'for')]] + ], + Token('DATETIME', 'Wed, 7 Jan 2015 11:36:13 -0500') + ] + stream = parse_received.lexer.lex(INPUT_02) + stream.idx = 0 + parsed = parse_received.parser.parse(stream) + self.assertEqual(parsed, expected) + + def test_parse_01(self): + expected = { + 'by': {'server': 'pilot01.cl.msu.edu'}, + 'date': datetime.datetime( + 2002, 7, 12, 16, 11, 20, tzinfo=tzoffset(u'EDT', -14400)), + 'from': { + 'halo': 'server.mymailhost.com', + 'ipaddr': '[126.43.75.123]', + 'reveresed': 'mail.mymailhost.com' + } + } + + observed = parse_received.parse_header(INPUT_01) + self.assertEqual(observed, expected) + + def test_parse_02(self): + expected = { + 'by': {'server': 'mx1.redhat.com'}, + 'date': datetime.datetime( + 2015, 1, 7, 11, 36, 13, tzinfo=tzoffset(None, -18000)), + 'from': { + 'halo': 'd1080.master.cz', + 'ipaddr': '[89.185.245.149]', + 'reveresed': 'p-lab.cz' + } + } + + observed = parse_received.parse_header(INPUT_02) + self.assertEqual(observed, expected) + + def test_email_01(self): + with open('examples/mail', 'r') as inmail: + msg = email.message_from_file(inmail) + received_hdrs = msg.get_all('Received') + expected = [] + parsed_headers = [] + for hdr in received_hdrs: + parsed_headers.append(parse_received.parse_header(hdr)) + self.assertEqual(parsed_headers, expected) + +if __name__ == '__main__': + unittest.main() -- cgit