aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatěj Cepl <mcepl@cepl.eu>2015-01-09 01:47:46 +0100
committerMatěj Cepl <mcepl@cepl.eu>2022-11-04 13:35:39 +0100
commite1ae554998880beb4eae3d446f9c83e9924850ef (patch)
tree11ba5ff3e993ff356d7cbd54ccfe3a7013499bfc
parent3babbe21a5d1d3d4d45fc8c89251460cb2b781f9 (diff)
downloadpyg-e1ae554998880beb4eae3d446f9c83e9924850ef.tar.gz
Getting some kind of parsed tree ... pretty ugly, so far.
-rw-r--r--parse_received.py41
-rw-r--r--test/test_parse_received.py50
2 files changed, 78 insertions, 13 deletions
diff --git a/parse_received.py b/parse_received.py
index 90c8e81..ad28117 100644
--- a/parse_received.py
+++ b/parse_received.py
@@ -234,6 +234,7 @@ __lg.add('IDSEP', r"(?i)\bid\b")
__lg.add('TCPSEP', r"(?i)\btcp\b")
__lg.add('SMTPSEP', r"(?i)\b[e]?smtp\b")
__lg.add('IPV4ADDRESS', r"\[?\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\]?")
+__lg.add('SMTPVERSION', r"\d{1,2}\.\d{1,2}\.\d{1,2}/\d{1,2}\.\d{1,2}\.\d{1,2}")
__lg.add('DOMAIN', r"(?i)[<(]?(([\w][\w\-\.]*)\.)?([\w][\w\-]+)*" +
r"(\.([a-z][a-z]*))[>)]?")
__weekday = r"(Mon|Tue|Wed|Thu|Fri|Sat|Sun)"
@@ -242,6 +243,7 @@ __lg.add('DATETIME', (r"%s\s*,\s+\d{1,2}\s+%s\s+\d{4} " +
r"\d{1,2}:\d{2}:\d{2}\s+[-+]\d{4}(\s*\([A-Z]{3}\))?") %
(__weekday, __month_name))
__lg.add("EMAILADDR", r'<[a-zA-Z0-9_.+-]+@[a-zA-Z0-9._-]+>')
+__lg.add('IGNORABLESTR', r"may be forged")
# __lg.add('ParentStr', r"\([^)]+\)")
__lg.add('STRING', r"\S+")
@@ -250,6 +252,27 @@ lexer = __lg.build()
__pg = rply.ParserGenerator([rule.name for rule in lexer.rules],
cache_id='received_parser')
+"""
+Result of:
+
+from server.mymailhost.com
+ (mail.mymailhost.com [126.43.75.123])
+ by pilot01.cl.msu.edu (8.10.2/8.10.2) with ESMTP id NAA23597;
+ Fri, 12 Jul 2002 16:11:20 -0400 (EDT)
+
+is
+
+[[Token('FROMSEP', 'from'),
+ [Token('DOMAIN', 'server.mymailhost.com'),
+ [[Token('DOMAIN', 'mail.mymailhost.com'),
+ Token('IPV4ADDRESS', '[126.43.75.123]')]]]],
+ [Token('BYSEP', 'by'),
+ [Token('DOMAIN', 'pilot01.cl.msu.edu'),
+ [[Token('STRING', '8.10.2/8.10.2)')]]]],
+ [[[], [[Token('SMTPSEP', 'ESMTP')]], [Token('STRING', 'NAA23597;')], []]],
+ Token('DATETIME', 'Fri, 12 Jul 2002 16:11:20 -0400 (EDT)')]
+"""
+
@__pg.production('main : stamp')
def main(p):
@@ -276,22 +299,28 @@ def by_domain(p):
@__pg.production('extended-domain : DOMAIN')
def extended_domain(p):
- logging.debug('by_domain p = %s', p)
+ logging.debug('extended_domain p = %s', p)
return [p[0], p[1]]
@__pg.production('extended-domain : DOMAIN tcp-info')
def extended_domain_tcp(p):
- logging.debug('by_domain p = %s', p)
+ logging.debug('extended_domain_tcp p = %s', p)
return [p[0], p[1]]
@__pg.production('extended-domain : IPV4ADDRESS tcp-info')
def extended_domain_addr(p):
- logging.debug('by_domain p = %s', p)
+ logging.debug('extended_domain_addr p = %s', p)
return [p[0], p[1]]
+@__pg.production('extended-domain : extended-domain IGNORABLESTR')
+def extended_domain_ignorable(p):
+ logging.debug('extended_domain_ignorable p = %s', p)
+ return p[0]
+
+
@__pg.production('tcp-info : IPV4ADDRESS')
def tcp_info(p):
logging.debug('optinfo p = %s', p)
@@ -304,10 +333,10 @@ def tcp_info_addr(p):
return [p]
-@__pg.production('tcp-info : STRING')
+@__pg.production('tcp-info : SMTPVERSION')
def tcp_info_string(p):
logging.debug('optinfo p = %s', p)
- return [p]
+ return p[0]
@__pg.production('optinfo : via with id for')
@@ -376,7 +405,7 @@ def link(p):
@__pg.production('protocol : SMTPSEP')
def protocol(p):
logging.debug('protocol p = %s', p)
- return [p[1]]
+ return p
parser = __pg.build()
diff --git a/test/test_parse_received.py b/test/test_parse_received.py
index 5f365c2..e2afdaa 100644
--- a/test/test_parse_received.py
+++ b/test/test_parse_received.py
@@ -1,11 +1,15 @@
import logging
-logging.basicConfig(level=logging.DEBUG)
import parse_received
+from pprint import pformat
import unittest
from rply import Token
+logging.basicConfig(level=logging.DEBUG)
+
+
+
INPUT_01 = \
"""from server.mymailhost.com
(mail.mymailhost.com [126.43.75.123])
@@ -28,7 +32,7 @@ class TestReceivedLexer(unittest.TestCase):
Token('IPV4ADDRESS', '[126.43.75.123]'),
Token('BYSEP', 'by'),
Token('DOMAIN', 'pilot01.cl.msu.edu'),
- Token('STRING', '8.10.2/8.10.2)'),
+ Token('SMTPVERSION', '8.10.2/8.10.2'),
Token('WITHSEP', 'with'),
Token('SMTPSEP', 'ESMTP'),
Token('IDSEP', 'id'),
@@ -44,12 +48,10 @@ class TestReceivedLexer(unittest.TestCase):
Token('DOMAIN', 'd1080.master.cz'),
Token('DOMAIN', 'p-lab.cz'),
Token('IPV4ADDRESS', '[89.185.245.149]'),
- Token('STRING', 'may'),
- Token('STRING', 'be'),
- Token('STRING', 'forged))'),
+ Token('IGNORABLESTR', 'may be forged'),
Token('BYSEP', 'by'),
Token('DOMAIN', 'mx1.redhat.com'),
- Token('STRING', '8.14.4/8.14.4)'),
+ Token('SMTPVERSION', '8.14.4/8.14.4'),
Token('WITHSEP', 'with'),
Token('SMTPSEP', 'ESMTP'),
Token('IDSEP', 'id'),
@@ -66,18 +68,52 @@ class TestReceivedParser(unittest.TestCase):
def test_simple_01(self):
expected = [
+ [Token('FROMSEP', 'from'),
+ [Token('DOMAIN', 'server.mymailhost.com'),
+ [[Token('DOMAIN', 'mail.mymailhost.com'),
+ Token('IPV4ADDRESS', '[126.43.75.123]')]]]
+ ],
+ [Token('BYSEP', 'by'),
+ [Token('DOMAIN', 'pilot01.cl.msu.edu'),
+ Token('SMTPVERSION', '8.10.2/8.10.2')]
+ ],
+ [[[],
+ [[Token('SMTPSEP', 'ESMTP')]],
+ [Token('STRING', 'NAA23597;')], []]
+ ],
+ Token('DATETIME', 'Fri, 12 Jul 2002 16:11:20 -0400 (EDT)')
]
+ logging.debug('\nINPUT_01:\n%s', INPUT_01)
stream = parse_received.lexer.lex(INPUT_01)
logging.debug('\nstream:\n%s', list(stream))
+ stream.idx = 0
parsed = parse_received.parser.parse(stream)
+ logging.debug('\nparsed:\n%s', pformat(parsed))
self.assertEqual(parsed, expected)
- @unittest.skip('Not ready yet')
+ # @unittest.skip('Not ready yet')
def test_simple_02(self):
expected = [
+ [Token('FROMSEP', 'from'),
+ [Token('DOMAIN', 'd1080.master.cz'),
+ [[Token('DOMAIN', 'p-lab.cz'), Token('IPV4ADDRESS', '[89.185.245.149]')]]]
+ ],
+ [Token('BYSEP', 'by'),
+ [Token('DOMAIN', 'mx1.redhat.com'), Token('SMTPVERSION', '8.14.4/8.14.4')]
+ ],
+ [[[],
+ [[Token('SMTPSEP', 'ESMTP')]],
+ [Token('STRING', 't07GaC1j031854')],
+ [Token('FORSEP', 'for')]]
+ ],
+ Token('DATETIME', 'Wed, 7 Jan 2015 11:36:13 -0500')
]
+ logging.debug('\nINPUT_02:\n%s', INPUT_02)
stream = parse_received.lexer.lex(INPUT_02)
+ logging.debug('\nstream:\n%s', list(stream))
+ stream.idx = 0
parsed = parse_received.parser.parse(stream)
+ logging.debug('\nparsed:\n%s', pformat(parsed))
self.assertEqual(parsed, expected)
if __name__ == '__main__':