Left: | ||
Right: |
OLD | NEW |
---|---|
1 # -*- coding: utf-8 -*- | 1 # -*- coding: utf-8 -*- |
2 # Copyright 2013 Google Inc. All Rights Reserved. | 2 # Copyright 2013 Google Inc. All Rights Reserved. |
3 # | 3 # |
4 # Licensed under the Apache License, Version 2.0 (the "License"); | 4 # Licensed under the Apache License, Version 2.0 (the "License"); |
5 # you may not use this file except in compliance with the License. | 5 # you may not use this file except in compliance with the License. |
6 # You may obtain a copy of the License at | 6 # You may obtain a copy of the License at |
7 # | 7 # |
8 # http://www.apache.org/licenses/LICENSE-2.0 | 8 # http://www.apache.org/licenses/LICENSE-2.0 |
9 # | 9 # |
10 # Unless required by applicable law or agreed to in writing, software | 10 # Unless required by applicable law or agreed to in writing, software |
11 # distributed under the License is distributed on an "AS IS" BASIS, | 11 # distributed under the License is distributed on an "AS IS" BASIS, |
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 # See the License for the specific language governing permissions and | 13 # See the License for the specific language governing permissions and |
14 # limitations under the License. | 14 # limitations under the License. |
15 """Utility classes and methods for the parallelism framework.""" | 15 """Utility classes and methods for the parallelism framework.""" |
16 | 16 |
17 from __future__ import absolute_import | 17 from __future__ import absolute_import |
18 from __future__ import print_function | 18 from __future__ import print_function |
19 | 19 |
20 import collections | 20 import collections |
21 import errno | |
21 import multiprocessing | 22 import multiprocessing |
22 import threading | 23 import threading |
23 import traceback | 24 import traceback |
24 | 25 |
25 from six.moves import queue as Queue | |
26 | |
27 import gslib | |
28 from gslib.utils import constants | 26 from gslib.utils import constants |
29 from gslib.utils import system_util | 27 from gslib.utils import system_util |
28 from six.moves import queue as Queue | |
30 | 29 |
31 # pylint: disable=g-import-not-at-top | 30 # pylint: disable=g-import-not-at-top |
32 try: | 31 try: |
33 # This module doesn't necessarily exist on Windows. | 32 # This module doesn't necessarily exist on Windows. |
34 import resource | 33 import resource |
35 _HAS_RESOURCE_MODULE = True | 34 _HAS_RESOURCE_MODULE = True |
36 except ImportError, e: | 35 except ImportError, e: |
37 _HAS_RESOURCE_MODULE = False | 36 _HAS_RESOURCE_MODULE = False |
38 | 37 |
39 # Maximum time to wait (join) on the SeekAheadThread after the ProducerThread | 38 # Maximum time to wait (join) on the SeekAheadThread after the ProducerThread |
(...skipping 184 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
224 resource.setrlimit(resource_name, (fallback_value, hard_limit)) | 223 resource.setrlimit(resource_name, (fallback_value, hard_limit)) |
225 return fallback_value | 224 return fallback_value |
226 except (resource.error, ValueError): | 225 except (resource.error, ValueError): |
227 # We couldn't change the soft limit, so just report the current | 226 # We couldn't change the soft limit, so just report the current |
228 # value of the soft limit. | 227 # value of the soft limit. |
229 return soft_limit | 228 return soft_limit |
230 else: | 229 else: |
231 return soft_limit | 230 return soft_limit |
232 | 231 |
233 | 232 |
233 def ShouldProhibitMultiprocessing(): | |
234 """Determines if the OS doesn't support multiprocessing. | |
235 | |
236 There are two cases we currently know about: | |
237 - Multiple processes are not supported on Windows. | |
238 - If an error is encountered while using multiple processes on Alpine Linux | |
239 gsutil hangs. For this case it's possible we could do more work to find | |
240 the root cause but after a fruitless initial attempt we decided instead | |
241 to fall back on multi-threading w/o multiprocesing. | |
242 | |
243 Returns: | |
244 (bool indicator if multiprocessing should be prohibited, OS name) | |
245 """ | |
246 if system_util.IS_WINDOWS: | |
247 return (True, 'Windows') | |
248 try: | |
249 with open('/etc/os-release', 'r') as f: | |
250 os_name = f.read().split('\n')[0].split('=')[1].strip('"') | |
251 return ('alpine linux' in os_name.lower(), os_name) | |
252 except IOError as e: | |
253 if e.errno == errno.ENOENT: | |
254 raise IOError('Unable to open /etc/os-release to determine OS release') | |
houglum
2018/10/12 21:47:23
Do we want to raise an exception in this case? I'
Mike Schwartz
2018/10/12 22:06:34
Done.
houglum
2018/10/12 23:07:34
I believe you, but I don't see the patch where thi
Mike Schwartz
2018/10/12 23:18:41
I uploaded to the wrong rietveld issue. I just tri
| |
255 else: | |
houglum
2018/10/12 21:47:23
The "else:" isn't necessary here.
Mike Schwartz
2018/10/12 22:06:34
Sure, but it seems marginally more readable to me
houglum
2018/10/12 23:07:34
Meh, I don't feel strongly. And if we're not raisi
| |
256 raise | |
257 | |
258 | |
234 def CheckMultiprocessingAvailableAndInit(logger=None): | 259 def CheckMultiprocessingAvailableAndInit(logger=None): |
235 """Checks if multiprocessing is available. | 260 """Checks if multiprocessing is available, and if so performs initialization. |
236 | 261 |
237 There are some environments in which there is no way to use multiprocessing | 262 There are some environments in which there is no way to use multiprocessing |
238 logic that's built into Python (e.g., if /dev/shm is not available, then | 263 logic that's built into Python (e.g., if /dev/shm is not available, then |
239 we can't create semaphores). This simply tries out a few things that will be | 264 we can't create semaphores). This simply tries out a few things that will be |
240 needed to make sure the environment can support the pieces of the | 265 needed to make sure the environment can support the pieces of the |
241 multiprocessing module that we need. | 266 multiprocessing module that we need. |
242 | 267 |
243 If multiprocessing is available, this performs necessary initialization for | 268 See gslib.command.InitializeMultiprocessingVariables for |
244 multiprocessing. See gslib.command.InitializeMultiprocessingVariables for | |
245 an explanation of why this is necessary. | 269 an explanation of why this is necessary. |
246 | 270 |
247 Args: | 271 Args: |
248 logger: (logging.Logger) Logger to use for debug output. | 272 logger: (logging.Logger) Logger to use for debug output. |
249 | 273 |
250 Returns: | 274 Returns: |
251 (MultiprocessingIsAvailableResult) A namedtuple with the following attrs: | 275 (MultiprocessingIsAvailableResult) A namedtuple with the following attrs: |
252 - multiprocessing_is_available: True iff the multiprocessing module is | 276 - multiprocessing_is_available: True iff the multiprocessing module is |
253 available for use. | 277 available for use. |
254 - stack_trace: The stack trace generated by the call we tried that | 278 - stack_trace: The stack trace generated by the call we tried that |
255 failed. | 279 failed. |
256 """ | 280 """ |
257 # pylint: disable=global-variable-undefined | 281 # pylint: disable=global-variable-undefined |
258 global _cached_multiprocessing_is_available | 282 global _cached_multiprocessing_is_available |
259 global _cached_multiprocessing_check_stack_trace | 283 global _cached_multiprocessing_check_stack_trace |
260 global _cached_multiprocessing_is_available_message | 284 global _cached_multiprocessing_is_available_message |
261 if _cached_multiprocessing_is_available is not None: | 285 if _cached_multiprocessing_is_available is not None: |
262 if logger: | 286 if logger: |
263 logger.debug(_cached_multiprocessing_check_stack_trace) | 287 logger.debug(_cached_multiprocessing_check_stack_trace) |
264 logger.warn(_cached_multiprocessing_is_available_message) | 288 logger.warn(_cached_multiprocessing_is_available_message) |
265 return MultiprocessingIsAvailableResult( | 289 return MultiprocessingIsAvailableResult( |
266 is_available=_cached_multiprocessing_is_available, | 290 is_available=_cached_multiprocessing_is_available, |
267 stack_trace=_cached_multiprocessing_check_stack_trace) | 291 stack_trace=_cached_multiprocessing_check_stack_trace) |
268 | 292 |
269 if system_util.IS_WINDOWS: | 293 should_prohibit_multiprocessing, os_name = ShouldProhibitMultiprocessing() |
294 if should_prohibit_multiprocessing: | |
270 message = """ | 295 message = """ |
271 Multiple processes are not supported on Windows. Operations requesting | 296 Multiple processes are not supported on %s. Operations requesting |
272 parallelism will be executed with multiple threads in a single process only. | 297 parallelism will be executed with multiple threads in a single process only. |
273 """ | 298 """ % os_name |
274 if logger: | 299 if logger: |
275 logger.warn(message) | 300 logger.warn(message) |
276 return MultiprocessingIsAvailableResult(is_available=False, | 301 return MultiprocessingIsAvailableResult(is_available=False, |
277 stack_trace=None) | 302 stack_trace=None) |
278 | 303 |
279 stack_trace = None | 304 stack_trace = None |
280 multiprocessing_is_available = True | 305 multiprocessing_is_available = True |
281 message = """ | 306 message = """ |
282 You have requested multiple processes for an operation, but the | 307 You have requested multiple processes for an operation, but the |
283 required functionality of Python\'s multiprocessing module is not available. | 308 required functionality of Python\'s multiprocessing module is not available. |
(...skipping 105 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
389 timeout: (optional) amount of time to wait before repeating put request. | 414 timeout: (optional) amount of time to wait before repeating put request. |
390 """ | 415 """ |
391 put_success = False | 416 put_success = False |
392 while not put_success: | 417 while not put_success: |
393 try: | 418 try: |
394 queue.put(msg, timeout=timeout) | 419 queue.put(msg, timeout=timeout) |
395 put_success = True | 420 put_success = True |
396 except Queue.Full: | 421 except Queue.Full: |
397 pass | 422 pass |
398 # pylint: enable=invalid-name | 423 # pylint: enable=invalid-name |
OLD | NEW |