aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/mail2news.py27
-rw-r--r--src/news2mail.py21
-rw-r--r--src/whitelist.py24
-rwxr-xr-xtest/test_pyg.py34
4 files changed, 46 insertions, 60 deletions
diff --git a/src/mail2news.py b/src/mail2news.py
index dc05c1d..087ad7c 100644
--- a/src/mail2news.py
+++ b/src/mail2news.py
@@ -62,8 +62,7 @@ class mail2news(object):
with open(options.input, 'r') as inp_stream:
self.message = self.__readfile(options, inp_stream)
- self.message['X-Gateway'] = 'pyg {0} {1}'.format(__version__,
- __description__)
+ self.message['X-Gateway']=f'pyg {__version__} {__description__}'
def __add_header(self, header, value, msg=None):
if msg is None:
@@ -72,14 +71,16 @@ class mail2news(object):
msg[header] = value.strip()
def __readfile(self, opt, input_stream):
- message = email.message_from_file(input_stream, policy=email.policy.SMTP)
+ message = email.message_from_file(input_stream,
+ policy=email.policy.SMTP)
if (len(message) == 0) \
and message.get_payload().startswith('/'):
msg_file_name = message.get_payload().strip()
del message
with open(msg_file_name, 'r') as msg_file:
- message = email.message_from_file(msg_file, policy=email.policy.SMTP)
+ message = email.message_from_file(msg_file,
+ policy=email.policy.SMTP)
# introduce nntpheads
self.__add_header('Newsgroups', opt.newsgroup, message)
@@ -116,7 +117,7 @@ class mail2news(object):
# if found, keep first element that seems a Msg-ID.
if (ref and len(ref)):
- self.message['References'] = '%s\n' % ref[0]
+ self.message['References'] = f'{ref[0]}\n'
except KeyError as message:
print(message)
@@ -142,7 +143,7 @@ class mail2news(object):
del self.message['Message-Id']
self.message['Message-Id'] = msgid
else:
- msgid = '<pyg.%d@tuchailepuppapera.org>\n' % (os.getpid())
+ msgid = '<pyg.{os.getpid()}@tuchailepuppapera.org>\n'
self.message['Message-Id'] = msgid
except KeyError as message:
@@ -195,15 +196,15 @@ class mail2news(object):
with tempfile.NamedTemporaryFile(suffix="eml", prefix="failed_msg",
delete=False) as tmpf:
tmpf.write(msg_bytes)
- logging.info(f"failed file name = {tmpf.name}")
+ logging.info("failed file name = %s", tmpf.name)
logging.exception("Failed to convert message!")
server.quit()
-def parse_cmdline(args):
+def parse_cmdline(arg_in):
parser = argparse.ArgumentParser(
- description='%s version %s - Copyright 2000 Cosimo Alfarano\n%s' %
- ('pyg', __version__, __description__))
+ description=f'pyg version {__version__} - Copyright 2000 Cosimo Alfarano' + \
+ f'\n{__description__}')
parser.add_argument('-s', '--newsserver', default='')
parser.add_argument('-a', '--approver', default='',
@@ -226,7 +227,7 @@ def parse_cmdline(args):
help='verbose output ' +
'(usefull with -T option for debugging)')
- args = parser.parse_args(args)
+ args = parser.parse_args(arg_in)
if not args.newsgroup:
raise argparse.ArgumentError('Error: Missing Newsgroups\n')
@@ -252,7 +253,7 @@ def main(args_in=None):
opt = parse_cmdline(args_in)
m2n = mail2news(opt)
- owner = None
+ # owner = None
"""phase 3:
format rfc 822 headers from input article
@@ -270,7 +271,7 @@ def main(args_in=None):
# wl.logmsg(m2n.heads_dict,wl.ACCEPT,owner)
if not opt.test:
try:
- resp = m2n.sendemail()
+ m2n.sendemail()
except nntplib.NNTPError as ex:
logging.exception(ex)
except KeyboardInterrupt:
diff --git a/src/news2mail.py b/src/news2mail.py
index f99b0d7..001c8b5 100644
--- a/src/news2mail.py
+++ b/src/news2mail.py
@@ -26,15 +26,17 @@ import argparse
from collections import OrderedDict
import email
import email.policy
+import os
import smtplib
from socket import gethostbyaddr, gethostname
import sys
import time
+import whitelist
import mail2news
# logging.basicConfig(level=logging.DEBUG)
-class news2mail(object):
+class news2mail():
"""news to mail gateway class"""
def __init__(self, verbose=False):
@@ -59,8 +61,8 @@ class news2mail(object):
"""add new header like X-Gateway: Received:
"""
- msg['X-Gateway'] = 'pyg {0} {1}'.format(mail2news.__version__,
- mail2news.__description__)
+ msg['X-Gateway'] = f'pyg {mail2news.__version__}' + \
+ ' {mail2news.__description__}'
# to make Received: header
t = time.ctime(time.time())
@@ -178,8 +180,8 @@ def parse_cmdline(a_in):
return (test,verbose) boolean tuple
"""
parser = argparse.ArgumentParser(
- description='pyg version %s - Copyright 2000 Cosimo Alfarano\n%s' %
- (mail2news.__version__, mail2news.__description__))
+ description=f'pyg version {mail2news.__version__} - Copyright 2000 Cosimo Alfarano\n' + \
+ f'{mail2news.__description__}')
parser.add_argument('-H', '--smtpserver', default='')
parser.add_argument('-s', '--sender', required=True, default='')
@@ -232,12 +234,14 @@ def main(args_in=None):
# check if n2m has some file prefercences set on commandline
if args.wlfile is None:
- wl = os.path.expanduser(os.path.join(os.path.dirname(__file__), 'pyg.whitelist'))
+ wl = os.path.expanduser(os.path.join(os.path.dirname(__file__),
+ 'pyg.whitelist'))
else:
wl = args.wlfile
if args.logfile is None:
- log = os.path.expanduser(os.path.join(os.path.dirname(__file__), 'pyg.log'))
+ log = os.path.expanduser(os.path.join(os.path.dirname(__file__),
+ 'pyg.log'))
else:
log = args.logfile
@@ -251,7 +255,8 @@ def main(args_in=None):
owner = wl.checkfrom(n2m.message['From'])
if owner is None:
if sys.stdin.isatty() == 1 or args.test:
- out += str('"%s" is not in whitelist!' % (n2m.message['From'][:-1])) + '\n'
+ out += str('"%s" is not in whitelist!' %
+ (n2m.message['From'][:-1])) + '\n'
else:
wl.logmsg(n2m.nntpheads, wl.DENY)
diff --git a/src/whitelist.py b/src/whitelist.py
index 4e02f0e..abb5d24 100644
--- a/src/whitelist.py
+++ b/src/whitelist.py
@@ -15,7 +15,6 @@ whitelist manage a list of trusted user.
from __future__ import absolute_import
import logging
# logging.basicConfig(level=logging.DEBUG)
-import sys
import time
import wlp
@@ -54,8 +53,7 @@ class whitelist(object):
# limitation.
if fromhead.find(self.wl[owner]['From:']) >= 0:
return owner
- else:
- return None
+ return None
def logmsg(self, heads, ok=DENY, owner=None):
"""who are walking through my gate?
@@ -75,25 +73,25 @@ class whitelist(object):
self.logger.info('at %s (%s)', ltime, tzone)
if owner is not None:
- self.logger.debug('\tWLOwner: ' + owner + '')
- self.logger.debug('\tFrom: ' + heads.get('From', 'NOT PRESENT'))
- self.logger.debug('\tSubject: ' + heads.get('Subject', 'NOT PRESENT'))
- self.logger.debug('\tSender: ' + heads.get('Sender', 'NOT PRESENT'))
- self.logger.debug('\tDate: ' + heads.get('Date', 'NOT PRESENT'))
+ self.logger.debug('\tWLOwner: %s', owner)
+ self.logger.debug('\tFrom: %s', heads.get('From', 'NOT PRESENT'))
+ self.logger.debug('\tSubject: %s', heads.get('Subject', 'NOT PRESENT'))
+ self.logger.debug('\tSender: %s', heads.get('Sender', 'NOT PRESENT'))
+ self.logger.debug('\tDate: %s', heads.get('Date', 'NOT PRESENT'))
# some client create Message-Id other Message-ID.
if 'Message-ID' in heads:
- self.logger.debug('\tMessage-ID: ' + heads.get('Message-ID'))
+ self.logger.debug('\tMessage-ID: %s', heads.get('Message-ID'))
else:
- self.logger.debug('\tMessage-Id: ' + heads.get('Message-Id',
+ self.logger.debug('\tMessage-Id: %s', heads.get('Message-Id',
'NOT PRESENT'))
# X-Newsgroups: and To: are present if user is trusted, else
# Newsgroup: exists since no changes on nntp headers are done.
if 'X-Newsgroups' in heads:
- self.logger.debug('\tTo: ' + heads.get('To', 'NOT PRESENT'))
- self.logger.debug('\tX-Newsgroups: ' + heads.get('X-Newsgroups',
+ self.logger.debug('\tTo: %s', heads.get('To', 'NOT PRESENT'))
+ self.logger.debug('\tX-Newsgroups: %s', heads.get('X-Newsgroups',
'NOT PRESENT'))
else:
- self.logger.debug('\tNewsgroups: ' +
+ self.logger.debug('\tNewsgroups: %s',
heads.get('Newsgroups', 'NOT PRESENT'))
diff --git a/test/test_pyg.py b/test/test_pyg.py
index 6777bd2..69a9eeb 100755
--- a/test/test_pyg.py
+++ b/test/test_pyg.py
@@ -1,6 +1,5 @@
#!/usr/bin/python
import re
-import subprocess
import unittest
import mail2news
@@ -9,31 +8,24 @@ import news2mail
@unittest.skip('not ready')
class TestM2N(unittest.TestCase):
- expected_output = """Newsgroups: pyg.test
+ expected_output = f"""Newsgroups: pyg.test
From: Pyg <pyg@localhost.com>
To: User <user@localhost.com>
Subject: test
-Date: Sun, 1 Feb 2002 16:40:40 +0200
+Date: Fri, 01 Feb 2002 16:40:40 +0200
Message-Id: <20001001164040.Aa8326@localhost>
Return-Path: <pyg@localhost>
Mime-Version: 1.0
-Content-Type: text/plain; charset=us-ascii
+Content-Type: text/plain; charset="us-ascii"
User-Agent: Mutt/1.2.5i
X-Multiline: this header probably broke RFC, but is frequent.
-X-Gateway: pyg %s %s
+X-Gateway: pyg {mail2news.__version__} {mail2news.__description__}
one line test
-""" % (mail2news.__version__, mail2news.__description__)
+"""
def test_m2n(self):
- # with open('examples/mail') as in_mail:
- # pid = subprocess.Popen([sys.executable,
- # 'src/mail2news.py', '-Tv', '-n', 'pyg.test'],
- # stdin=subprocess.PIPE,
- # stdout=subprocess.PIPE,
- # universal_newlines=True)
- # out, _ = pid.communicate(in_mail.read())
out = mail2news.main(['-T', '-v',
'-i', 'examples/mail', '-n', 'pyg.test'])
self.assertEqual(out, self.expected_output)
@@ -41,7 +33,7 @@ one line test
@unittest.skip('not ready')
class TestN2M(unittest.TestCase):
- expected_output = """Received: from GATEWAY by mitmanek.ceplovi.cz with pyg
+ expected_output = f"""Received: from GATEWAY by mitmanek.ceplovi.cz with pyg
for <test@example.com> ; Mon Dec 15 17:13:30 2014 (CEST)
From: kame@inwind.it (PYG)
To: test@example.com
@@ -54,23 +46,13 @@ Mime-Version: 1.0
Content-Transfer-Encoding: 7bit
X-Trace: pyg.server.tld 960672047 927 192.168.1.2 (10 Jun 2000 21:20:47 GMT)
X-Newsgroups: local.moderated
-X-Gateway: pyg %s %s
+X-Gateway: pyg {mail2news.__version__} {mail2news.__description__}
X-NNTP-Posting-Host: pyg.server.tld
Resent-From: sender@example.com
Resent-Sender: sender@example.com
-""" % (mail2news.__version__, mail2news.__description__)
+"""
def test_n2m(self):
- # with open('examples/articletest.accepted') as in_mail:
- # pid = subprocess.Popen([sys.executable, 'src/news2mail.py', '-Tvt', 'test@example.com',
- # '-s', 'sender@example.com',
- # '-w', 'examples/whitelist.example'],
- # stdin=subprocess.PIPE,
- # stdout=subprocess.PIPE,
- # universal_newlines=True)
- # in_message = in_mail.read().replace('pyg@pyg.server.tld',
- # 'kame@inwind.it')
- # out, err = pid.communicate(in_message)
out = news2mail.main(['-T', '-v', '-t', 'test@example.com',
'-s', 'sender@example.com',
'-w', 'examples/whitelist.example'])