From d9c7ff5ccf3faca00973c5b4ed32aac03f8faae5 Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Fri, 24 Aug 2012 10:13:45 -0400 Subject: command:serve_commands: remove duplicate cruft and get working unit tests. --- libbe/command/serve_commands.py | 346 +++------------------------------------- 1 file changed, 19 insertions(+), 327 deletions(-) (limited to 'libbe/command/serve_commands.py') diff --git a/libbe/command/serve_commands.py b/libbe/command/serve_commands.py index 1912180..810f698 100644 --- a/libbe/command/serve_commands.py +++ b/libbe/command/serve_commands.py @@ -79,6 +79,7 @@ if libbe.TESTING == True: cherrypy_test_webtest = None import libbe.bugdir + import libbe.command.list class ServerApp (libbe.command.serve.WSGI_AppObject): @@ -281,7 +282,7 @@ class Serve_Commands (libbe.command.Command): (params['host'], params['port']), app) #server.throw_errors = True #server.show_tracebacks = True - private_key,certificate = get_cert_filenames( + private_key,certificate = libbe.command.serve.get_cert_filenames( 'be-server', logger=self.logger) if cherrypy.wsgiserver.ssl_builtin == None: server.ssl_module = 'builtin' @@ -341,339 +342,30 @@ for example:: # alias for libbe.command.base.get_command_class() Serve_commands = Serve_Commands -def random_string(length=256): - if os.path.exists(os.path.join('dev', 'urandom')): - return open("/dev/urandom").read(length) - else: - import array - from random import randint - d = array.array('B') - for i in xrange(1000000): - d.append(randint(0,255)) - return d.tostring() - if libbe.TESTING == True: - class WSGITestCase (unittest.TestCase): - def setUp(self): - self.logstream = StringIO.StringIO() - self.logger = logging.getLogger('be-serve-test') - console = logging.StreamHandler(self.logstream) - console.setFormatter(logging.Formatter('%(message)s')) - self.logger.addHandler(console) - self.logger.propagate = False - console.setLevel(logging.INFO) - self.logger.setLevel(logging.INFO) - self.default_environ = { # required by PEP 333 - 'REQUEST_METHOD': 'GET', # 'POST', 'HEAD' - 'REMOTE_ADDR': '192.168.0.123', - 'SCRIPT_NAME':'', - 'PATH_INFO': '', - #'QUERY_STRING':'', # may be empty or absent - #'CONTENT_TYPE':'', # may be empty or absent - #'CONTENT_LENGTH':'', # may be empty or absent - 'SERVER_NAME':'example.com', - 'SERVER_PORT':'80', - 'SERVER_PROTOCOL':'HTTP/1.1', - 'wsgi.version':(1,0), - 'wsgi.url_scheme':'http', - 'wsgi.input':StringIO.StringIO(), - 'wsgi.errors':StringIO.StringIO(), - 'wsgi.multithread':False, - 'wsgi.multiprocess':False, - 'wsgi.run_once':False, - } - def getURL(self, app, path='/', method='GET', data=None, - scheme='http', environ={}): - env = copy.copy(self.default_environ) - env['PATH_INFO'] = path - env['REQUEST_METHOD'] = method - env['scheme'] = scheme - if data != None: - enc_data = urllib.urlencode(data) - if method == 'POST': - env['CONTENT_LENGTH'] = len(enc_data) - env['wsgi.input'] = StringIO.StringIO(enc_data) - else: - assert method in ['GET', 'HEAD'], method - env['QUERY_STRING'] = enc_data - for key,value in environ.items(): - env[key] = value - return ''.join(app(env, self.start_response)) - def start_response(self, status, response_headers, exc_info=None): - self.status = status - self.response_headers = response_headers - self.exc_info = exc_info - - class WSGI_ObjectTestCase (WSGITestCase): - def setUp(self): - WSGITestCase.setUp(self) - self.app = WSGI_Object(self.logger) - def test_error(self): - contents = self.app.error( - environ=self.default_environ, - start_response=self.start_response, - error=123, - message='Dummy Error', - headers=[('X-Dummy-Header','Dummy Value')]) - self.failUnless(contents == ['Dummy Error'], contents) - self.failUnless(self.status == '123 Dummy Error', self.status) - self.failUnless(self.response_headers == [ - ('Content-Type','text/plain'), - ('X-Dummy-Header','Dummy Value')], - self.response_headers) - self.failUnless(self.exc_info == None, self.exc_info) - def test_log_request(self): - self.app.log_request( - environ=self.default_environ, status='-1 OK', bytes=123) - log = self.logstream.getvalue() - self.failUnless(log.startswith('192.168.0.123 -'), log) - - class ExceptionAppTestCase (WSGITestCase): - def setUp(self): - WSGITestCase.setUp(self) - def child_app(environ, start_response): - raise ValueError('Dummy Error') - self.app = ExceptionApp(child_app, self.logger) - def test_traceback(self): - try: - self.getURL(self.app) - except ValueError, e: - pass - log = self.logstream.getvalue() - self.failUnless(log.startswith('Traceback'), log) - self.failUnless('child_app' in log, log) - self.failUnless('ValueError: Dummy Error' in log, log) - - class AdminAppTestCase (WSGITestCase): - def setUp(self): - WSGITestCase.setUp(self) - self.users = Users() - self.users.add_user( - User('Aladdin', 'Big Al', password='open sesame')) - self.users.add_user( - User('guest', 'Guest', password='guestpass')) - def child_app(environ, start_response): - pass - self.app = AdminApp( - child_app, users=self.users, logger=self.logger) - self.app = AuthenticationApp( - self.app, realm='Dummy Realm', users=self.users, - logger=self.logger) - self.app = UppercaseHeaderApp(self.app, logger=self.logger) - def basic_auth(self, uname, password): - """HTTP basic authorization string""" - return 'Basic %s' % \ - ('%s:%s' % (uname, password)).encode('base64') - def test_new_name(self): - self.getURL( - self.app, '/admin/', method='POST', - data={'name':'Prince Al'}, - environ={'HTTP_Authorization': - self.basic_auth('Aladdin', 'open sesame')}) - self.failUnless(self.status == '200 OK', self.status) - self.failUnless(self.response_headers == [], - self.response_headers) - self.failUnless(self.exc_info == None, self.exc_info) - self.failUnless(self.users['Aladdin'].name == 'Prince Al', - self.users['Aladdin'].name) - self.failUnless(self.users.changed == True, - self.users.changed) - def test_new_password(self): - self.getURL( - self.app, '/admin/', method='POST', - data={'password':'New Pass'}, - environ={'HTTP_Authorization': - self.basic_auth('Aladdin', 'open sesame')}) - self.failUnless(self.status == '200 OK', self.status) - self.failUnless(self.response_headers == [], - self.response_headers) - self.failUnless(self.exc_info == None, self.exc_info) - self.failUnless(self.users['Aladdin'].passhash == \ - self.users['Aladdin'].hash('New Pass'), - self.users['Aladdin'].passhash) - self.failUnless(self.users.changed == True, - self.users.changed) - def test_guest_name(self): - self.getURL( - self.app, '/admin/', method='POST', - data={'name':'SPAM'}, - environ={'HTTP_Authorization': - self.basic_auth('guest', 'guestpass')}) - self.failUnless(self.status.startswith('403 '), self.status) - self.failUnless(self.response_headers == [ - ('Content-Type', 'text/plain')], - self.response_headers) - self.failUnless(self.exc_info == None, self.exc_info) - self.failUnless(self.users['guest'].name == 'Guest', - self.users['guest'].name) - self.failUnless(self.users.changed == False, - self.users.changed) - def test_guest_password(self): - self.getURL( - self.app, '/admin/', method='POST', - data={'password':'SPAM'}, - environ={'HTTP_Authorization': - self.basic_auth('guest', 'guestpass')}) - self.failUnless(self.status.startswith('403 '), self.status) - self.failUnless(self.response_headers == [ - ('Content-Type', 'text/plain')], - self.response_headers) - self.failUnless(self.exc_info == None, self.exc_info) - self.failUnless(self.users['guest'].name == 'Guest', - self.users['guest'].name) - self.failUnless(self.users.changed == False, - self.users.changed) - - class ServerAppTestCase (WSGITestCase): + class ServerAppTestCase (libbe.command.serve.WSGITestCase): def setUp(self): - WSGITestCase.setUp(self) + libbe.command.serve.WSGITestCase.setUp(self) self.bd = libbe.bugdir.SimpleBugDir(memory=False) self.app = ServerApp(self.bd.storage, logger=self.logger) def tearDown(self): self.bd.cleanup() - WSGITestCase.tearDown(self) - def test_add_get(self): - self.getURL(self.app, '/add/', method='GET') - self.failUnless(self.status.startswith('404 '), self.status) - self.failUnless(self.response_headers == [ - ('Content-Type', 'text/plain')], - self.response_headers) + libbe.command.serve.WSGITestCase.tearDown(self) + def test_run_list(self): + list = libbe.command.list.List() + params = list._parse_options_args() + data = yaml.safe_dump({ + 'command': 'list', + 'parameters': params, + }) + self.getURL(self.app, '/run', method='POST', data=data) + self.failUnless(self.status.startswith('200 '), self.status) + self.failUnless( + ('Content-Type', 'application/octet-stream' + ) in self.response_headers, + self.response_headers) self.failUnless(self.exc_info == None, self.exc_info) - def test_add_post(self): - self.getURL(self.app, '/add/', method='POST', - data={'id':'123456', 'parent':'abc123', - 'directory':'True'}) - self.failUnless(self.status == '200 OK', self.status) - self.failUnless(self.response_headers == [], - self.response_headers) - self.failUnless(self.exc_info == None, self.exc_info) - # Note: other methods tested in libbe.storage.http - - # TODO: integration tests on Serve? + # TODO: integration tests on ServeCommands? unitsuite =unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) suite = unittest.TestSuite([unitsuite, doctest.DocTestSuite()]) - - -# The following certificate-creation code is adapted From pyOpenSSL's -# examples. - -def get_cert_filenames(server_name, autogenerate=True, logger=None): - """ - Generate private key and certification filenames. - get_cert_filenames(server_name) -> (pkey_filename, cert_filename) - """ - pkey_file = '%s.pkey' % server_name - cert_file = '%s.cert' % server_name - if autogenerate == True: - for file in [pkey_file, cert_file]: - if not os.path.exists(file): - make_certs(server_name, logger) - return (pkey_file, cert_file) - -def createKeyPair(type, bits): - """Create a public/private key pair. - - Returns the public/private key pair in a PKey object. - - Parameters - ---------- - type : TYPE_RSA or TYPE_DSA - Key type. - bits : int - Number of bits to use in the key. - """ - pkey = OpenSSL.crypto.PKey() - pkey.generate_key(type, bits) - return pkey - -def createCertRequest(pkey, digest="md5", **name): - """Create a certificate request. - - Returns the certificate request in an X509Req object. - - Parameters - ---------- - pkey : PKey - The key to associate with the request. - digest : "md5" or ? - Digestion method to use for signing, default is "md5", - `**name` : - The name of the subject of the request, possible. - Arguments are: - - ============ ======================== - C Country name - ST State or province name - L Locality name - O Organization name - OU Organizational unit name - CN Common name - emailAddress E-mail address - ============ ======================== - """ - req = OpenSSL.crypto.X509Req() - subj = req.get_subject() - - for (key,value) in name.items(): - setattr(subj, key, value) - - req.set_pubkey(pkey) - req.sign(pkey, digest) - return req - -def createCertificate(req, (issuerCert, issuerKey), serial, (notBefore, notAfter), digest="md5"): - """Generate a certificate given a certificate request. - - Returns the signed certificate in an X509 object. - - Parameters - ---------- - req : - Certificate reqeust to use - issuerCert : - The certificate of the issuer - issuerKey : - The private key of the issuer - serial : - Serial number for the certificate - notBefore : - Timestamp (relative to now) when the certificate - starts being valid - notAfter : - Timestamp (relative to now) when the certificate - stops being valid - digest : - Digest method to use for signing, default is md5 - """ - cert = OpenSSL.crypto.X509() - cert.set_serial_number(serial) - cert.gmtime_adj_notBefore(notBefore) - cert.gmtime_adj_notAfter(notAfter) - cert.set_issuer(issuerCert.get_subject()) - cert.set_subject(req.get_subject()) - cert.set_pubkey(req.get_pubkey()) - cert.sign(issuerKey, digest) - return cert - -def make_certs(server_name, logger=None) : - """Generate private key and certification files. - - `mk_certs(server_name) -> (pkey_filename, cert_filename)` - """ - if OpenSSL == None: - raise libbe.command.UserError, \ - 'SSL certificate generation requires the OpenSSL module' - pkey_file,cert_file = get_cert_filenames( - server_name, autogenerate=False) - if logger != None: - logger.log(logger._server_level, - 'Generating certificates', pkey_file, cert_file) - cakey = createKeyPair(OpenSSL.crypto.TYPE_RSA, 1024) - careq = createCertRequest(cakey, CN='Certificate Authority') - cacert = createCertificate( - careq, (careq, cakey), 0, (0, 60*60*24*365*5)) # five years - open(pkey_file, 'w').write(OpenSSL.crypto.dump_privatekey( - OpenSSL.crypto.FILETYPE_PEM, cakey)) - open(cert_file, 'w').write(OpenSSL.crypto.dump_certificate( - OpenSSL.crypto.FILETYPE_PEM, cacert)) -- cgit