OLD | NEW |
1 """Base implementation of event loop. | 1 """Base implementation of event loop. |
2 | 2 |
3 The event loop can be broken up into a multiplexer (the part | 3 The event loop can be broken up into a multiplexer (the part |
4 responsible for notifying us of IO events) and the event loop proper, | 4 responsible for notifying us of IO events) and the event loop proper, |
5 which wraps a multiplexer with functionality for scheduling callbacks, | 5 which wraps a multiplexer with functionality for scheduling callbacks, |
6 immediately or at a given time in the future. | 6 immediately or at a given time in the future. |
7 | 7 |
8 Whenever a public API takes a callback, subsequent positional | 8 Whenever a public API takes a callback, subsequent positional |
9 arguments will be passed to the callback if/when it is called. This | 9 arguments will be passed to the callback if/when it is called. This |
10 avoids the proliferation of trivial lambdas implementing closures. | 10 avoids the proliferation of trivial lambdas implementing closures. |
11 Keyword arguments for the callback are not supported; this is a | 11 Keyword arguments for the callback are not supported; this is a |
12 conscious design decision, leaving the door open for keyword arguments | 12 conscious design decision, leaving the door open for keyword arguments |
13 to modify the meaning of the API call itself. | 13 to modify the meaning of the API call itself. |
14 """ | 14 """ |
15 | 15 |
16 | 16 |
17 import collections | 17 import collections |
18 import concurrent.futures | 18 import concurrent.futures |
19 import heapq | 19 import heapq |
20 import logging | 20 import logging |
21 import socket | 21 import socket |
22 import time | 22 import time |
| 23 import os |
| 24 import sys |
23 | 25 |
24 from . import events | 26 from . import events |
25 from . import futures | 27 from . import futures |
26 from . import tasks | 28 from . import tasks |
27 from .log import tulip_log | 29 from .log import tulip_log |
28 | 30 |
29 | 31 |
30 __all__ = ['BaseEventLoop'] | 32 __all__ = ['BaseEventLoop'] |
31 | 33 |
32 | 34 |
(...skipping 373 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
406 raise exceptions[0] | 408 raise exceptions[0] |
407 | 409 |
408 protocol = protocol_factory() | 410 protocol = protocol_factory() |
409 transport = self._make_datagram_transport( | 411 transport = self._make_datagram_transport( |
410 sock, protocol, r_addr, extra={'addr': l_addr}) | 412 sock, protocol, r_addr, extra={'addr': l_addr}) |
411 return transport, protocol | 413 return transport, protocol |
412 | 414 |
413 # TODO: Or create_server()? | 415 # TODO: Or create_server()? |
414 @tasks.task | 416 @tasks.task |
415 def start_serving(self, protocol_factory, host=None, port=None, *, | 417 def start_serving(self, protocol_factory, host=None, port=None, *, |
416 family=0, proto=0, flags=0, backlog=100, sock=None, | 418 family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, |
417 ssl=False): | 419 sock=None, backlog=100, ssl=False, reuse_address=None): |
418 """XXX""" | 420 """XXX""" |
419 if host is not None or port is not None: | 421 if host is not None or port is not None: |
420 if sock is not None: | 422 if sock is not None: |
421 raise ValueError( | 423 raise ValueError( |
422 "host, port and sock can not be specified at the same time") | 424 "host, port and sock can not be specified at the same time") |
423 | 425 |
| 426 AF_INET6 = getattr(socket, 'AF_INET6', 0) |
| 427 if reuse_address is None: |
| 428 reuse_address = os.name == 'posix' and sys.platform != 'cygwin' |
| 429 sockets = [] |
| 430 if host == "": |
| 431 host = None |
| 432 |
424 infos = yield from self.getaddrinfo( | 433 infos = yield from self.getaddrinfo( |
425 host, port, family=family, | 434 host, port, family=family, |
426 type=socket.SOCK_STREAM, proto=proto, flags=flags) | 435 type=socket.SOCK_STREAM, proto=0, flags=flags) |
427 | |
428 if not infos: | 436 if not infos: |
429 raise socket.error('getaddrinfo() returned empty list') | 437 raise socket.error('getaddrinfo() returned empty list') |
430 | 438 |
431 # TODO: Maybe we want to bind every address in the list | 439 completed = False |
432 # instead of the first one that works? | 440 try: |
433 exceptions = [] | 441 for res in infos: |
434 for family, type, proto, cname, address in infos: | 442 af, socktype, proto, canonname, sa = res |
435 sock = socket.socket(family=family, type=type, proto=proto) | 443 sock = socket.socket(af, socktype, proto) |
436 try: | 444 sockets.append(sock) |
437 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | 445 if reuse_address: |
438 sock.bind(address) | 446 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, |
439 except socket.error as exc: | 447 True) |
440 sock.close() | 448 # Disable IPv4/IPv6 dual stack support (enabled by |
441 exceptions.append(exc) | 449 # default on Linux) which makes a single socket |
442 else: | 450 # listen on both address families. |
443 break | 451 if af == AF_INET6 and hasattr(socket, "IPPROTO_IPV6"): |
444 else: | 452 sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, |
445 raise exceptions[0] | 453 True) |
| 454 try: |
| 455 sock.bind(sa) |
| 456 except socket.error as err: |
| 457 raise socket.error(err.errno, "error while attempting " |
| 458 "to bind on address %r: %s" \ |
| 459 % (sa, err.strerror.lower())) |
| 460 completed = True |
| 461 finally: |
| 462 if not completed: |
| 463 for sock in sockets: |
| 464 sock.close() |
| 465 else: |
| 466 if sock is None: |
| 467 raise ValueError( |
| 468 "host and port was not specified and no sock specified") |
| 469 sockets = [sock] |
446 | 470 |
447 elif sock is None: | 471 for sock in sockets: |
448 raise ValueError( | 472 sock.listen(backlog) |
449 "host and port was not specified and no sock specified") | 473 sock.setblocking(False) |
450 | 474 self._start_serving(protocol_factory, sock, ssl) |
451 sock.listen(backlog) | 475 return sockets |
452 sock.setblocking(False) | |
453 self._start_serving(protocol_factory, sock, ssl) | |
454 return sock | |
455 | 476 |
456 @tasks.coroutine | 477 @tasks.coroutine |
457 def connect_read_pipe(self, protocol_factory, pipe): | 478 def connect_read_pipe(self, protocol_factory, pipe): |
458 protocol = protocol_factory() | 479 protocol = protocol_factory() |
459 waiter = futures.Future() | 480 waiter = futures.Future() |
460 transport = self._make_read_pipe_transport(pipe, protocol, waiter, | 481 transport = self._make_read_pipe_transport(pipe, protocol, waiter, |
461 extra={}) | 482 extra={}) |
462 yield from waiter | 483 yield from waiter |
463 return transport, protocol | 484 return transport, protocol |
464 | 485 |
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
550 handle.run() | 571 handle.run() |
551 | 572 |
552 # Future.__del__ uses log level | 573 # Future.__del__ uses log level |
553 _log_level = logging.WARNING | 574 _log_level = logging.WARNING |
554 | 575 |
555 def set_log_level(self, val): | 576 def set_log_level(self, val): |
556 self._log_level = val | 577 self._log_level = val |
557 | 578 |
558 def get_log_level(self): | 579 def get_log_level(self): |
559 return self._log_level | 580 return self._log_level |
OLD | NEW |