import datetime import email import unittest from dateutil.tz import tzoffset from rply import Token import parse_received #logging.basicConfig(level=logging.DEBUG) INPUT_01 = \ """from server.mymailhost.com (mail.mymailhost.com [126.43.75.123]) by pilot01.cl.msu.edu (8.10.2/8.10.2) with ESMTP id NAA23597; Fri, 12 Jul 2002 16:11:20 -0400 (EDT)""" INPUT_02 = \ """from d1080.master.cz (p-lab.cz [89.185.245.149] (may be forged)) by mx1.redhat.com (8.14.4/8.14.4) with ESMTP id t07GaC1j031854 for ; Wed, 7 Jan 2015 11:36:13 -0500""" class TestReceivedLexer(unittest.TestCase): maxDiff = None def test_simple_01(self): expected = [ Token('FROMSEP', 'from'), Token('DOMAIN', 'server.mymailhost.com'), Token('DOMAIN', 'mail.mymailhost.com'), Token('IPV4ADDRESS', '[126.43.75.123]'), Token('BYSEP', 'by'), Token('DOMAIN', 'pilot01.cl.msu.edu'), Token('SMTPVERSION', '8.10.2/8.10.2'), Token('WITHSEP', 'with'), Token('SMTPSEP', 'ESMTP'), Token('IDSEP', 'id'), Token('STRING', 'NAA23597;'), Token('DATETIME', 'Fri, 12 Jul 2002 16:11:20 -0400 (EDT)') ] parsed = list(parse_received.lexer.lex(INPUT_01)) self.assertEqual(parsed, expected) def test_simple_02(self): expected = [ Token('FROMSEP', 'from'), Token('DOMAIN', 'd1080.master.cz'), Token('DOMAIN', 'p-lab.cz'), Token('IPV4ADDRESS', '[89.185.245.149]'), Token('IGNORABLESTR', 'may be forged'), Token('BYSEP', 'by'), Token('DOMAIN', 'mx1.redhat.com'), Token('SMTPVERSION', '8.14.4/8.14.4'), Token('WITHSEP', 'with'), Token('SMTPSEP', 'ESMTP'), Token('IDSEP', 'id'), Token('STRING', 't07GaC1j031854'), Token('FORSEP', 'for'), Token('EMAILADDR', ''), Token('DATETIME', 'Wed, 7 Jan 2015 11:36:13 -0500') ] parsed = list(parse_received.lexer.lex(INPUT_02)) self.assertEqual(parsed, expected) class TestReceivedParser(unittest.TestCase): maxDiff = None def test_simple_01(self): expected = [ [Token('FROMSEP', 'from'), [Token('DOMAIN', 'server.mymailhost.com'), [Token('DOMAIN', 'mail.mymailhost.com'), Token('IPV4ADDRESS', '[126.43.75.123]')]] ], [Token('BYSEP', 'by'), [Token('DOMAIN', 'pilot01.cl.msu.edu'), [Token('SMTPVERSION', '8.10.2/8.10.2')]] ], [[[], [Token('SMTPSEP', 'ESMTP')], [Token('STRING', 'NAA23597;')], []] ], Token('DATETIME', 'Fri, 12 Jul 2002 16:11:20 -0400 (EDT)') ] stream = parse_received.lexer.lex(INPUT_01) stream.idx = 0 parsed = parse_received.parser.parse(stream) self.assertEqual(parsed, expected) def test_simple_02(self): expected = [ [Token('FROMSEP', 'from'), [Token('DOMAIN', 'd1080.master.cz'), [Token('DOMAIN', 'p-lab.cz'), Token('IPV4ADDRESS', '[89.185.245.149]')]] ], [Token('BYSEP', 'by'), [Token('DOMAIN', 'mx1.redhat.com'), [Token('SMTPVERSION', '8.14.4/8.14.4')]] ], [[[], [Token('SMTPSEP', 'ESMTP')], [Token('STRING', 't07GaC1j031854')], [Token('FORSEP', 'for')]] ], Token('DATETIME', 'Wed, 7 Jan 2015 11:36:13 -0500') ] stream = parse_received.lexer.lex(INPUT_02) stream.idx = 0 parsed = parse_received.parser.parse(stream) self.assertEqual(parsed, expected) def test_parse_01(self): expected = { 'by': {'server': 'pilot01.cl.msu.edu'}, 'date': datetime.datetime( 2002, 7, 12, 16, 11, 20, tzinfo=tzoffset(u'EDT', -14400)), 'from': { 'halo': 'server.mymailhost.com', 'ipaddr': '[126.43.75.123]', 'reveresed': 'mail.mymailhost.com' } } observed = parse_received.parse_header(INPUT_01) self.assertEqual(observed, expected) def test_parse_02(self): expected = { 'by': {'server': 'mx1.redhat.com'}, 'date': datetime.datetime( 2015, 1, 7, 11, 36, 13, tzinfo=tzoffset(None, -18000)), 'from': { 'halo': 'd1080.master.cz', 'ipaddr': '[89.185.245.149]', 'reveresed': 'p-lab.cz' } } observed = parse_received.parse_header(INPUT_02) self.assertEqual(observed, expected) def test_email_01(self): with open('examples/mail', 'r') as inmail: msg = email.message_from_file(inmail) received_hdrs = msg.get_all('Received') expected = [] parsed_headers = [] for hdr in received_hdrs: parsed_headers.append(parse_received.parse_header(hdr)) self.assertEqual(parsed_headers, expected) if __name__ == '__main__': unittest.main()