aboutsummaryrefslogtreecommitdiffstats
path: root/libbe/command/serve_commands.py
diff options
context:
space:
mode:
authorW. Trevor King <wking@tremily.us>2012-08-24 10:13:45 -0400
committerW. Trevor King <wking@tremily.us>2012-08-24 10:13:45 -0400
commitd9c7ff5ccf3faca00973c5b4ed32aac03f8faae5 (patch)
tree3a817385746d83476a461040875dd41afd5b744c /libbe/command/serve_commands.py
parentcdc8049dd294d046813b18c759ee7f16ccf08746 (diff)
downloadbugseverywhere-d9c7ff5ccf3faca00973c5b4ed32aac03f8faae5.tar.gz
command:serve_commands: remove duplicate cruft and get working unit tests.
Diffstat (limited to 'libbe/command/serve_commands.py')
-rw-r--r--libbe/command/serve_commands.py346
1 files changed, 19 insertions, 327 deletions
diff --git a/libbe/command/serve_commands.py b/libbe/command/serve_commands.py
index 1912180..810f698 100644
--- a/libbe/command/serve_commands.py
+++ b/libbe/command/serve_commands.py
@@ -79,6 +79,7 @@ if libbe.TESTING == True:
cherrypy_test_webtest = None
import libbe.bugdir
+ import libbe.command.list
class ServerApp (libbe.command.serve.WSGI_AppObject):
@@ -281,7 +282,7 @@ class Serve_Commands (libbe.command.Command):
(params['host'], params['port']), app)
#server.throw_errors = True
#server.show_tracebacks = True
- private_key,certificate = get_cert_filenames(
+ private_key,certificate = libbe.command.serve.get_cert_filenames(
'be-server', logger=self.logger)
if cherrypy.wsgiserver.ssl_builtin == None:
server.ssl_module = 'builtin'
@@ -341,339 +342,30 @@ for example::
# alias for libbe.command.base.get_command_class()
Serve_commands = Serve_Commands
-def random_string(length=256):
- if os.path.exists(os.path.join('dev', 'urandom')):
- return open("/dev/urandom").read(length)
- else:
- import array
- from random import randint
- d = array.array('B')
- for i in xrange(1000000):
- d.append(randint(0,255))
- return d.tostring()
-
if libbe.TESTING == True:
- class WSGITestCase (unittest.TestCase):
- def setUp(self):
- self.logstream = StringIO.StringIO()
- self.logger = logging.getLogger('be-serve-test')
- console = logging.StreamHandler(self.logstream)
- console.setFormatter(logging.Formatter('%(message)s'))
- self.logger.addHandler(console)
- self.logger.propagate = False
- console.setLevel(logging.INFO)
- self.logger.setLevel(logging.INFO)
- self.default_environ = { # required by PEP 333
- 'REQUEST_METHOD': 'GET', # 'POST', 'HEAD'
- 'REMOTE_ADDR': '192.168.0.123',
- 'SCRIPT_NAME':'',
- 'PATH_INFO': '',
- #'QUERY_STRING':'', # may be empty or absent
- #'CONTENT_TYPE':'', # may be empty or absent
- #'CONTENT_LENGTH':'', # may be empty or absent
- 'SERVER_NAME':'example.com',
- 'SERVER_PORT':'80',
- 'SERVER_PROTOCOL':'HTTP/1.1',
- 'wsgi.version':(1,0),
- 'wsgi.url_scheme':'http',
- 'wsgi.input':StringIO.StringIO(),
- 'wsgi.errors':StringIO.StringIO(),
- 'wsgi.multithread':False,
- 'wsgi.multiprocess':False,
- 'wsgi.run_once':False,
- }
- def getURL(self, app, path='/', method='GET', data=None,
- scheme='http', environ={}):
- env = copy.copy(self.default_environ)
- env['PATH_INFO'] = path
- env['REQUEST_METHOD'] = method
- env['scheme'] = scheme
- if data != None:
- enc_data = urllib.urlencode(data)
- if method == 'POST':
- env['CONTENT_LENGTH'] = len(enc_data)
- env['wsgi.input'] = StringIO.StringIO(enc_data)
- else:
- assert method in ['GET', 'HEAD'], method
- env['QUERY_STRING'] = enc_data
- for key,value in environ.items():
- env[key] = value
- return ''.join(app(env, self.start_response))
- def start_response(self, status, response_headers, exc_info=None):
- self.status = status
- self.response_headers = response_headers
- self.exc_info = exc_info
-
- class WSGI_ObjectTestCase (WSGITestCase):
- def setUp(self):
- WSGITestCase.setUp(self)
- self.app = WSGI_Object(self.logger)
- def test_error(self):
- contents = self.app.error(
- environ=self.default_environ,
- start_response=self.start_response,
- error=123,
- message='Dummy Error',
- headers=[('X-Dummy-Header','Dummy Value')])
- self.failUnless(contents == ['Dummy Error'], contents)
- self.failUnless(self.status == '123 Dummy Error', self.status)
- self.failUnless(self.response_headers == [
- ('Content-Type','text/plain'),
- ('X-Dummy-Header','Dummy Value')],
- self.response_headers)
- self.failUnless(self.exc_info == None, self.exc_info)
- def test_log_request(self):
- self.app.log_request(
- environ=self.default_environ, status='-1 OK', bytes=123)
- log = self.logstream.getvalue()
- self.failUnless(log.startswith('192.168.0.123 -'), log)
-
- class ExceptionAppTestCase (WSGITestCase):
- def setUp(self):
- WSGITestCase.setUp(self)
- def child_app(environ, start_response):
- raise ValueError('Dummy Error')
- self.app = ExceptionApp(child_app, self.logger)
- def test_traceback(self):
- try:
- self.getURL(self.app)
- except ValueError, e:
- pass
- log = self.logstream.getvalue()
- self.failUnless(log.startswith('Traceback'), log)
- self.failUnless('child_app' in log, log)
- self.failUnless('ValueError: Dummy Error' in log, log)
-
- class AdminAppTestCase (WSGITestCase):
- def setUp(self):
- WSGITestCase.setUp(self)
- self.users = Users()
- self.users.add_user(
- User('Aladdin', 'Big Al', password='open sesame'))
- self.users.add_user(
- User('guest', 'Guest', password='guestpass'))
- def child_app(environ, start_response):
- pass
- self.app = AdminApp(
- child_app, users=self.users, logger=self.logger)
- self.app = AuthenticationApp(
- self.app, realm='Dummy Realm', users=self.users,
- logger=self.logger)
- self.app = UppercaseHeaderApp(self.app, logger=self.logger)
- def basic_auth(self, uname, password):
- """HTTP basic authorization string"""
- return 'Basic %s' % \
- ('%s:%s' % (uname, password)).encode('base64')
- def test_new_name(self):
- self.getURL(
- self.app, '/admin/', method='POST',
- data={'name':'Prince Al'},
- environ={'HTTP_Authorization':
- self.basic_auth('Aladdin', 'open sesame')})
- self.failUnless(self.status == '200 OK', self.status)
- self.failUnless(self.response_headers == [],
- self.response_headers)
- self.failUnless(self.exc_info == None, self.exc_info)
- self.failUnless(self.users['Aladdin'].name == 'Prince Al',
- self.users['Aladdin'].name)
- self.failUnless(self.users.changed == True,
- self.users.changed)
- def test_new_password(self):
- self.getURL(
- self.app, '/admin/', method='POST',
- data={'password':'New Pass'},
- environ={'HTTP_Authorization':
- self.basic_auth('Aladdin', 'open sesame')})
- self.failUnless(self.status == '200 OK', self.status)
- self.failUnless(self.response_headers == [],
- self.response_headers)
- self.failUnless(self.exc_info == None, self.exc_info)
- self.failUnless(self.users['Aladdin'].passhash == \
- self.users['Aladdin'].hash('New Pass'),
- self.users['Aladdin'].passhash)
- self.failUnless(self.users.changed == True,
- self.users.changed)
- def test_guest_name(self):
- self.getURL(
- self.app, '/admin/', method='POST',
- data={'name':'SPAM'},
- environ={'HTTP_Authorization':
- self.basic_auth('guest', 'guestpass')})
- self.failUnless(self.status.startswith('403 '), self.status)
- self.failUnless(self.response_headers == [
- ('Content-Type', 'text/plain')],
- self.response_headers)
- self.failUnless(self.exc_info == None, self.exc_info)
- self.failUnless(self.users['guest'].name == 'Guest',
- self.users['guest'].name)
- self.failUnless(self.users.changed == False,
- self.users.changed)
- def test_guest_password(self):
- self.getURL(
- self.app, '/admin/', method='POST',
- data={'password':'SPAM'},
- environ={'HTTP_Authorization':
- self.basic_auth('guest', 'guestpass')})
- self.failUnless(self.status.startswith('403 '), self.status)
- self.failUnless(self.response_headers == [
- ('Content-Type', 'text/plain')],
- self.response_headers)
- self.failUnless(self.exc_info == None, self.exc_info)
- self.failUnless(self.users['guest'].name == 'Guest',
- self.users['guest'].name)
- self.failUnless(self.users.changed == False,
- self.users.changed)
-
- class ServerAppTestCase (WSGITestCase):
+ class ServerAppTestCase (libbe.command.serve.WSGITestCase):
def setUp(self):
- WSGITestCase.setUp(self)
+ libbe.command.serve.WSGITestCase.setUp(self)
self.bd = libbe.bugdir.SimpleBugDir(memory=False)
self.app = ServerApp(self.bd.storage, logger=self.logger)
def tearDown(self):
self.bd.cleanup()
- WSGITestCase.tearDown(self)
- def test_add_get(self):
- self.getURL(self.app, '/add/', method='GET')
- self.failUnless(self.status.startswith('404 '), self.status)
- self.failUnless(self.response_headers == [
- ('Content-Type', 'text/plain')],
- self.response_headers)
+ libbe.command.serve.WSGITestCase.tearDown(self)
+ def test_run_list(self):
+ list = libbe.command.list.List()
+ params = list._parse_options_args()
+ data = yaml.safe_dump({
+ 'command': 'list',
+ 'parameters': params,
+ })
+ self.getURL(self.app, '/run', method='POST', data=data)
+ self.failUnless(self.status.startswith('200 '), self.status)
+ self.failUnless(
+ ('Content-Type', 'application/octet-stream'
+ ) in self.response_headers,
+ self.response_headers)
self.failUnless(self.exc_info == None, self.exc_info)
- def test_add_post(self):
- self.getURL(self.app, '/add/', method='POST',
- data={'id':'123456', 'parent':'abc123',
- 'directory':'True'})
- self.failUnless(self.status == '200 OK', self.status)
- self.failUnless(self.response_headers == [],
- self.response_headers)
- self.failUnless(self.exc_info == None, self.exc_info)
- # Note: other methods tested in libbe.storage.http
-
- # TODO: integration tests on Serve?
+ # TODO: integration tests on ServeCommands?
unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()])
-
-
-# The following certificate-creation code is adapted From pyOpenSSL's
-# examples.
-
-def get_cert_filenames(server_name, autogenerate=True, logger=None):
- """
- Generate private key and certification filenames.
- get_cert_filenames(server_name) -> (pkey_filename, cert_filename)
- """
- pkey_file = '%s.pkey' % server_name
- cert_file = '%s.cert' % server_name
- if autogenerate == True:
- for file in [pkey_file, cert_file]:
- if not os.path.exists(file):
- make_certs(server_name, logger)
- return (pkey_file, cert_file)
-
-def createKeyPair(type, bits):
- """Create a public/private key pair.
-
- Returns the public/private key pair in a PKey object.
-
- Parameters
- ----------
- type : TYPE_RSA or TYPE_DSA
- Key type.
- bits : int
- Number of bits to use in the key.
- """
- pkey = OpenSSL.crypto.PKey()
- pkey.generate_key(type, bits)
- return pkey
-
-def createCertRequest(pkey, digest="md5", **name):
- """Create a certificate request.
-
- Returns the certificate request in an X509Req object.
-
- Parameters
- ----------
- pkey : PKey
- The key to associate with the request.
- digest : "md5" or ?
- Digestion method to use for signing, default is "md5",
- `**name` :
- The name of the subject of the request, possible.
- Arguments are:
-
- ============ ========================
- C Country name
- ST State or province name
- L Locality name
- O Organization name
- OU Organizational unit name
- CN Common name
- emailAddress E-mail address
- ============ ========================
- """
- req = OpenSSL.crypto.X509Req()
- subj = req.get_subject()
-
- for (key,value) in name.items():
- setattr(subj, key, value)
-
- req.set_pubkey(pkey)
- req.sign(pkey, digest)
- return req
-
-def createCertificate(req, (issuerCert, issuerKey), serial, (notBefore, notAfter), digest="md5"):
- """Generate a certificate given a certificate request.
-
- Returns the signed certificate in an X509 object.
-
- Parameters
- ----------
- req :
- Certificate reqeust to use
- issuerCert :
- The certificate of the issuer
- issuerKey :
- The private key of the issuer
- serial :
- Serial number for the certificate
- notBefore :
- Timestamp (relative to now) when the certificate
- starts being valid
- notAfter :
- Timestamp (relative to now) when the certificate
- stops being valid
- digest :
- Digest method to use for signing, default is md5
- """
- cert = OpenSSL.crypto.X509()
- cert.set_serial_number(serial)
- cert.gmtime_adj_notBefore(notBefore)
- cert.gmtime_adj_notAfter(notAfter)
- cert.set_issuer(issuerCert.get_subject())
- cert.set_subject(req.get_subject())
- cert.set_pubkey(req.get_pubkey())
- cert.sign(issuerKey, digest)
- return cert
-
-def make_certs(server_name, logger=None) :
- """Generate private key and certification files.
-
- `mk_certs(server_name) -> (pkey_filename, cert_filename)`
- """
- if OpenSSL == None:
- raise libbe.command.UserError, \
- 'SSL certificate generation requires the OpenSSL module'
- pkey_file,cert_file = get_cert_filenames(
- server_name, autogenerate=False)
- if logger != None:
- logger.log(logger._server_level,
- 'Generating certificates', pkey_file, cert_file)
- cakey = createKeyPair(OpenSSL.crypto.TYPE_RSA, 1024)
- careq = createCertRequest(cakey, CN='Certificate Authority')
- cacert = createCertificate(
- careq, (careq, cakey), 0, (0, 60*60*24*365*5)) # five years
- open(pkey_file, 'w').write(OpenSSL.crypto.dump_privatekey(
- OpenSSL.crypto.FILETYPE_PEM, cakey))
- open(cert_file, 'w').write(OpenSSL.crypto.dump_certificate(
- OpenSSL.crypto.FILETYPE_PEM, cacert))