OLD | NEW |
1 # This file is part of the Juju GUI, which lets users view and manage Juju | 1 # This file is part of the Juju GUI, which lets users view and manage Juju |
2 # environments within a graphical interface (https://launchpad.net/juju-gui). | 2 # environments within a graphical interface (https://launchpad.net/juju-gui). |
3 # Copyright (C) 2013 Canonical Ltd. | 3 # Copyright (C) 2013 Canonical Ltd. |
4 # | 4 # |
5 # This program is free software: you can redistribute it and/or modify it under | 5 # This program is free software: you can redistribute it and/or modify it under |
6 # the terms of the GNU Affero General Public License version 3, as published by | 6 # the terms of the GNU Affero General Public License version 3, as published by |
7 # the Free Software Foundation. | 7 # the Free Software Foundation. |
8 # | 8 # |
9 # This program is distributed in the hope that it will be useful, but WITHOUT | 9 # This program is distributed in the hope that it will be useful, but WITHOUT |
10 # ANY WARRANTY; without even the implied warranties of MERCHANTABILITY, | 10 # ANY WARRANTY; without even the implied warranties of MERCHANTABILITY, |
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
71 | 71 |
72 def assert_user(self, username, password, is_authenticated): | 72 def assert_user(self, username, password, is_authenticated): |
73 """Ensure the current user reflects the given values.""" | 73 """Ensure the current user reflects the given values.""" |
74 user = self.user | 74 user = self.user |
75 self.assertEqual(username, user.username) | 75 self.assertEqual(username, user.username) |
76 self.assertEqual(password, user.password) | 76 self.assertEqual(password, user.password) |
77 self.assertEqual(is_authenticated, user.is_authenticated) | 77 self.assertEqual(is_authenticated, user.is_authenticated) |
78 | 78 |
79 def test_login_request(self): | 79 def test_login_request(self): |
80 # The authentication process starts if a login request is processed. | 80 # The authentication process starts if a login request is processed. |
| 81 |
81 request = self.make_login_request(username='user', password='passwd') | 82 request = self.make_login_request(username='user', password='passwd') |
82 self.auth.process_request(request) | 83 response = self.auth.process_request(request) |
| 84 self.assertEqual(request, response) |
83 self.assertTrue(self.auth.in_progress()) | 85 self.assertTrue(self.auth.in_progress()) |
84 self.assert_user('user', 'passwd', False) | 86 self.assert_user('', '', False) |
85 | 87 |
86 def test_login_success(self): | 88 def test_login_success(self): |
87 # The user is logged in if the authentication process completes. | 89 # The user is logged in if the authentication process completes. |
88 request = self.make_login_request(username='user', password='passwd') | 90 request = self.make_login_request(username='user', password='passwd') |
89 self.auth.process_request(request) | 91 self.auth.process_request(request) |
90 response = self.make_login_response() | 92 response = self.make_login_response() |
91 self.auth.process_response(response) | 93 result = self.auth.process_response(response) |
| 94 self.assertEqual(response, result) |
92 self.assertFalse(self.auth.in_progress()) | 95 self.assertFalse(self.auth.in_progress()) |
93 self.assert_user('user', 'passwd', True) | 96 self.assert_user('user', 'passwd', True) |
94 | 97 |
95 def test_login_failure(self): | 98 def test_login_failure(self): |
96 # The user is not logged in if the authentication process fails. | 99 # The user is not logged in if the authentication process fails. |
97 request = self.make_login_request() | 100 request = self.make_login_request() |
98 self.auth.process_request(request) | 101 self.auth.process_request(request) |
99 response = self.make_login_response(successful=False) | 102 response = self.make_login_response(successful=False) |
100 self.auth.process_response(response) | 103 result = self.auth.process_response(response) |
| 104 self.assertEqual(response, result) |
101 self.assertFalse(self.auth.in_progress()) | 105 self.assertFalse(self.auth.in_progress()) |
102 self.assert_user('', '', False) | 106 self.assert_user('', '', False) |
103 | 107 |
104 def test_request_mismatch(self): | 108 def test_request_mismatch(self): |
105 # The authentication fails if the request and response identifiers | 109 # The authentication fails if the request and response identifiers |
106 # don't match. | 110 # don't match. |
107 request = self.make_login_request( | 111 request = self.make_login_request( |
108 request_id=42, username='user', password='passwd') | 112 request_id=42, username='user', password='passwd') |
109 self.auth.process_request(request) | 113 self.auth.process_request(request) |
110 response = self.make_login_response(request_id=47) | 114 response = self.make_login_response(request_id=47) |
111 self.auth.process_response(response) | 115 self.auth.process_response(response) |
112 self.assertTrue(self.auth.in_progress()) | 116 self.assertTrue(self.auth.in_progress()) |
113 self.assert_user('user', 'passwd', False) | 117 self.assert_user('', '', False) |
114 | 118 |
115 def test_multiple_auth_requests(self): | 119 def test_multiple_auth_requests(self): |
116 # Only the last authentication request is taken into consideration. | 120 # The last authentication request is honored. |
117 request1 = self.make_login_request(request_id=1) | 121 request1 = self.make_login_request(request_id=1) |
118 request2 = self.make_login_request( | 122 request2 = self.make_login_request( |
119 request_id=2, username='user2', password='passwd2') | 123 request_id=2, username='user2', password='passwd2') |
120 self.auth.process_request(request1) | 124 self.auth.process_request(request1) |
121 self.auth.process_request(request2) | 125 self.auth.process_request(request2) |
122 # The first response arrives. | 126 # The first response arrives. |
123 response = self.make_login_response(request_id=1) | 127 response = self.make_login_response(request_id=1) |
124 self.auth.process_response(response) | 128 self.auth.process_response(response) |
125 # The user is still not autheticated and the auth is in progress. | 129 # The user is authenticated but the auth is still in progress. |
126 self.assertFalse(self.user.is_authenticated) | 130 self.assertTrue(self.user.is_authenticated) |
127 self.assertTrue(self.auth.in_progress()) | 131 self.assertTrue(self.auth.in_progress()) |
128 # The second response arrives. | 132 # The second response arrives. |
129 response = self.make_login_response(request_id=2) | 133 response = self.make_login_response(request_id=2) |
130 self.auth.process_response(response) | 134 self.auth.process_response(response) |
131 # The user logged in and the auth process completed. | 135 # The user logged in and the auth process completed. |
132 self.assert_user('user2', 'passwd2', True) | 136 self.assert_user('user2', 'passwd2', True) |
133 self.assertFalse(self.auth.in_progress()) | 137 self.assertFalse(self.auth.in_progress()) |
134 | 138 |
135 def test_request_id_is_zero(self): | 139 def test_request_id_is_zero(self): |
136 # The authentication process starts if a login request is processed | 140 # The authentication process starts if a login request is processed |
137 # and the request id is zero. | 141 # and the request id is zero. |
138 request = self.make_login_request(request_id=0) | 142 request = self.make_login_request(request_id=0) |
139 self.auth.process_request(request) | 143 self.auth.process_request(request) |
140 self.assertTrue(self.auth.in_progress()) | 144 self.assertTrue(self.auth.in_progress()) |
141 | 145 |
142 | 146 |
143 class TestGoAuthMiddleware( | 147 class TestGoAuthMiddleware( |
144 helpers.GoAPITestMixin, AuthMiddlewareTestMixin, | 148 helpers.GoAPITestMixin, AuthMiddlewareTestMixin, |
145 LogTrapTestCase, unittest.TestCase): | 149 LogTrapTestCase, unittest.TestCase): |
146 pass | 150 |
| 151 def test_token_login_request(self): |
| 152 # The authentication process starts with a token login request also. |
| 153 request = self.make_token_login_request( |
| 154 self.tokens, username='user', password='passwd') |
| 155 response = self.auth.process_request(request) |
| 156 # The response now looks as if it were made without a token. |
| 157 self.assertEqual( |
| 158 self.make_login_request(username='user', password='passwd'), |
| 159 response) |
| 160 self.assertTrue(self.auth.in_progress()) |
| 161 self.assert_user('', '', False) |
| 162 self.assertFalse(self.write_message.called) |
| 163 |
| 164 def test_token_login_success(self): |
| 165 # The user is logged in if the authentication process completes. |
| 166 request = self.make_token_login_request( |
| 167 self.tokens, username='user', password='passwd') |
| 168 self.auth.process_request(request) |
| 169 response = self.make_login_response() |
| 170 result = self.auth.process_response(response) |
| 171 self.assertEqual( |
| 172 dict(RequestId=42, |
| 173 Response=dict(AuthTag='user', Password='passwd')), |
| 174 result) |
| 175 self.assertFalse(self.auth.in_progress()) |
| 176 self.assert_user('user', 'passwd', True) |
| 177 self.assertFalse(self.write_message.called) |
| 178 |
| 179 def test_token_login_failure(self): |
| 180 # The user is not logged in if the authentication process fails. |
| 181 request = self.make_token_login_request( |
| 182 self.tokens, username='user', password='passwd') |
| 183 self.auth.process_request(request) |
| 184 response = self.make_login_response(successful=False) |
| 185 result = self.auth.process_response(response) |
| 186 self.assertEqual(response, result) |
| 187 self.assertFalse(self.auth.in_progress()) |
| 188 self.assert_user('', '', False) |
| 189 self.assertFalse(self.write_message.called) |
| 190 |
| 191 def test_token_login_missing(self): |
| 192 # The user is not logged in if the authentication process fails. |
| 193 request = self.make_token_login_request() |
| 194 response = self.auth.process_request(request) |
| 195 # None is a marker indicating that the request has been handled and |
| 196 # should not be continued on through to Juju. |
| 197 self.assertIsNone(response) |
| 198 self.write_message.assert_called_once_with(dict( |
| 199 RequestId=42, |
| 200 Error='unknown, fulfilled, or expired token', |
| 201 ErrorCode='unauthorized access', |
| 202 Response={})) |
| 203 self.assertFalse(self.auth.in_progress()) |
| 204 self.assert_user('', '', False) |
147 | 205 |
148 | 206 |
149 class TestPythonAuthMiddleware( | 207 class TestPythonAuthMiddleware( |
150 helpers.PythonAPITestMixin, AuthMiddlewareTestMixin, | 208 helpers.PythonAPITestMixin, AuthMiddlewareTestMixin, |
151 LogTrapTestCase, unittest.TestCase): | 209 LogTrapTestCase, unittest.TestCase): |
152 pass | 210 pass |
153 | 211 |
154 | 212 |
155 class BackendTestMixin(object): | 213 class BackendTestMixin(object): |
156 """Include tests for the authentication backends. | 214 """Include tests for the authentication backends. |
(...skipping 28 matching lines...) Expand all Loading... |
185 def test_login_succeeded(self): | 243 def test_login_succeeded(self): |
186 # True is returned if the login attempt succeeded. | 244 # True is returned if the login attempt succeeded. |
187 response = self.make_login_response() | 245 response = self.make_login_response() |
188 self.assertTrue(self.backend.login_succeeded(response)) | 246 self.assertTrue(self.backend.login_succeeded(response)) |
189 | 247 |
190 def test_login_failed(self): | 248 def test_login_failed(self): |
191 # False is returned if the login attempt failed. | 249 # False is returned if the login attempt failed. |
192 response = self.make_login_response(successful=False) | 250 response = self.make_login_response(successful=False) |
193 self.assertFalse(self.backend.login_succeeded(response)) | 251 self.assertFalse(self.backend.login_succeeded(response)) |
194 | 252 |
| 253 def test_make_request(self): |
| 254 expected = self.make_login_request( |
| 255 request_id=42, username='user', password='passwd') |
| 256 self.assertEqual( |
| 257 expected, self.backend.make_request(42, 'user', 'passwd')) |
| 258 |
195 | 259 |
196 class TestGoBackend( | 260 class TestGoBackend( |
197 helpers.GoAPITestMixin, BackendTestMixin, unittest.TestCase): | 261 helpers.GoAPITestMixin, BackendTestMixin, unittest.TestCase): |
198 | 262 |
199 def test_request_is_not_login(self): | 263 def test_request_is_not_login(self): |
200 # False is returned if the passed data is not a login request. | 264 # False is returned if the passed data is not a login request. |
201 requests = ( | 265 requests = ( |
202 {}, | 266 {}, |
203 { | 267 { |
204 'RequestId': 1, | 268 'RequestId': 1, |
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
297 token_requested = self.tokens.token_requested(request) | 361 token_requested = self.tokens.token_requested(request) |
298 self.assertFalse(token_requested, request) | 362 self.assertFalse(token_requested, request) |
299 | 363 |
300 @mock.patch('uuid.uuid4', mock.Mock(return_value=mock.Mock(hex='DEFACED'))) | 364 @mock.patch('uuid.uuid4', mock.Mock(return_value=mock.Mock(hex='DEFACED'))) |
301 @mock.patch('datetime.datetime', | 365 @mock.patch('datetime.datetime', |
302 mock.Mock( | 366 mock.Mock( |
303 **{'utcnow.return_value': | 367 **{'utcnow.return_value': |
304 datetime.datetime(2013, 11, 21, 21)})) | 368 datetime.datetime(2013, 11, 21, 21)})) |
305 def test_process_token_request(self): | 369 def test_process_token_request(self): |
306 # It correctly responds to token requests. | 370 # It correctly responds to token requests. |
307 user = auth.User('user-admin', 'ADMINSECRET') | 371 user = auth.User('user-admin', 'ADMINSECRET', True) |
308 write_message = mock.Mock() | 372 write_message = mock.Mock() |
309 data = dict(RequestId=42, Type='GUIToken', Request='Create') | 373 data = dict(RequestId=42, Type='GUIToken', Request='Create') |
310 self.tokens.process_token_request(data, user, write_message) | 374 self.tokens.process_token_request(data, user, write_message) |
311 write_message.assert_called_once_with(dict( | 375 write_message.assert_called_once_with(dict( |
312 RequestId=42, | 376 RequestId=42, |
313 Response=dict( | 377 Response=dict( |
314 Token='DEFACED', | 378 Token='DEFACED', |
315 Created='2013-11-21T21:00:00Z', | 379 Created='2013-11-21T21:00:00Z', |
316 Expires='2013-11-21T21:01:00Z' | 380 Expires='2013-11-21T21:01:00Z' |
317 ) | 381 ) |
318 )) | 382 )) |
319 self.assertTrue('DEFACED' in self.tokens._data) | 383 self.assertTrue('DEFACED' in self.tokens._data) |
320 self.assertEqual( | 384 self.assertEqual( |
321 {'username', 'password', 'handle'}, | 385 {'username', 'password', 'handle'}, |
322 set(self.tokens._data['DEFACED'].keys())) | 386 set(self.tokens._data['DEFACED'].keys())) |
323 self.assertEqual( | 387 self.assertEqual( |
324 user.username, self.tokens._data['DEFACED']['username']) | 388 user.username, self.tokens._data['DEFACED']['username']) |
325 self.assertEqual( | 389 self.assertEqual( |
326 user.password, self.tokens._data['DEFACED']['password']) | 390 user.password, self.tokens._data['DEFACED']['password']) |
327 self.assertEqual( | 391 self.assertEqual( |
328 self.max_life, self.io_loop.add_timeout.call_args[0][0]) | 392 self.max_life, self.io_loop.add_timeout.call_args[0][0]) |
329 self.assertTrue('DEFACED' in self.tokens._data) | 393 self.assertTrue('DEFACED' in self.tokens._data) |
330 expire_token = self.io_loop.add_timeout.call_args[0][1] | 394 expire_token = self.io_loop.add_timeout.call_args[0][1] |
331 expire_token() | 395 expire_token() |
332 self.assertFalse('DEFACED' in self.tokens._data) | 396 self.assertFalse('DEFACED' in self.tokens._data) |
333 | 397 |
| 398 def test_unauthenticated_process_token_request(self): |
| 399 # Unauthenticated token requests get an informative error. |
| 400 user = auth.User(is_authenticated=False) |
| 401 write_message = mock.Mock() |
| 402 data = dict(RequestId=42, Type='GUIToken', Request='Create') |
| 403 self.tokens.process_token_request(data, user, write_message) |
| 404 write_message.assert_called_once_with(dict( |
| 405 RequestId=42, |
| 406 Error='tokens can only be created by authenticated users.', |
| 407 ErrorCode='unauthorized access', |
| 408 Response={} |
| 409 )) |
| 410 self.assertEqual({}, self.tokens._data) |
| 411 self.assertFalse(self.io_loop.add_timeout.called) |
| 412 |
334 def test_authentication_requested(self): | 413 def test_authentication_requested(self): |
335 # It recognizes an authentication request. | 414 # It recognizes an authentication request. |
336 request = dict( | 415 request = dict( |
337 RequestId=42, Type='GUIToken', Request='Login', | 416 RequestId=42, Type='GUIToken', Request='Login', |
338 Params={'Token': 'DEFACED'}) | 417 Params={'Token': 'DEFACED'}) |
339 auth_requested = self.tokens.authentication_requested(request) | 418 auth_requested = self.tokens.authentication_requested(request) |
340 self.assertTrue(auth_requested, request) | 419 self.assertTrue(auth_requested, request) |
341 | 420 |
342 def test_not_authentication_requested(self): | 421 def test_not_authentication_requested(self): |
343 # It rejects invalid authentication requests. | 422 # It rejects invalid authentication requests. |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
386 Response={})) | 465 Response={})) |
387 | 466 |
388 @mock.patch('uuid.uuid4', mock.Mock(return_value=mock.Mock(hex='DEFACED'))) | 467 @mock.patch('uuid.uuid4', mock.Mock(return_value=mock.Mock(hex='DEFACED'))) |
389 @mock.patch('datetime.datetime', | 468 @mock.patch('datetime.datetime', |
390 mock.Mock( | 469 mock.Mock( |
391 **{'utcnow.return_value': | 470 **{'utcnow.return_value': |
392 datetime.datetime(2013, 11, 21, 21)})) | 471 datetime.datetime(2013, 11, 21, 21)})) |
393 def test_token_request_and_authentication_collaborate(self): | 472 def test_token_request_and_authentication_collaborate(self): |
394 # process_token_request and process_authentication_request collaborate. | 473 # process_token_request and process_authentication_request collaborate. |
395 # This is a small integration test of the two functions' interaction. | 474 # This is a small integration test of the two functions' interaction. |
396 user = auth.User('user-admin', 'ADMINSECRET') | 475 user = auth.User('user-admin', 'ADMINSECRET', True) |
397 write_message = mock.Mock() | 476 write_message = mock.Mock() |
398 request = dict(RequestId=42, Type='GUIToken', Request='Create') | 477 request = dict(RequestId=42, Type='GUIToken', Request='Create') |
399 self.tokens.process_token_request(request, user, write_message) | 478 self.tokens.process_token_request(request, user, write_message) |
400 request = dict( | 479 request = dict( |
401 RequestId=43, Type='GUIToken', Request='Login', | 480 RequestId=43, Type='GUIToken', Request='Login', |
402 Params={'Token': 'DEFACED'}) | 481 Params={'Token': 'DEFACED'}) |
403 self.assertEqual( | 482 self.assertEqual( |
404 (user.username, user.password), | 483 (user.username, user.password), |
405 self.tokens.process_authentication_request(request, write_message)) | 484 self.tokens.process_authentication_request(request, write_message)) |
406 | 485 |
407 def test_process_authentication_response(self): | 486 def test_process_authentication_response(self): |
408 # It translates a normal authentication success. | 487 # It translates a normal authentication success. |
409 user = auth.User('user-admin', 'ADMINSECRET') | 488 user = auth.User('user-admin', 'ADMINSECRET', True) |
410 response = {'RequestId': 42, 'Response': {}} | 489 response = {'RequestId': 42, 'Response': {}} |
411 self.assertEqual( | 490 self.assertEqual( |
412 dict(RequestId=42, | 491 dict(RequestId=42, |
413 Response=dict(AuthTag=user.username, Password=user.password)), | 492 Response=dict(AuthTag=user.username, Password=user.password)), |
414 self.tokens.process_authentication_response(response, user)) | 493 self.tokens.process_authentication_response(response, user)) |
OLD | NEW |