| Index: Lib/asynchat.py |
| =================================================================== |
| --- Lib/asynchat.py (revision 62704) |
| +++ Lib/asynchat.py (working copy) |
| @@ -60,16 +60,35 @@ |
| ac_out_buffer_size = 4096 |
| def __init__ (self, conn=None): |
| + # for string terminator matching |
| self.ac_in_buffer = '' |
| - self.ac_out_buffer = '' |
| - self.producer_fifo = fifo() |
| + |
| + # we use a list here rather than cStringIO for a few reasons... |
| + # del lst[:] is faster than sio.truncate(0) |
| + # lst = [] is faster than sio.truncate(0) |
| + # cStringIO will be gaining unicode support in py3k, which |
| + # will negatively affect the performance of bytes compared to |
| + # a ''.join() equivalent |
| + self.incoming = [] |
| + |
| + # we toss the use of the "simple producer" and replace it with |
| + # a pure deque, which the original fifo was a wrapping of |
| + self.producer_fifo = deque() |
| asyncore.dispatcher.__init__ (self, conn) |
| def collect_incoming_data(self, data): |
| - raise NotImplementedError, "must be implemented in subclass" |
| + raise NotImplementedError("must be implemented in subclass") |
| + |
| + def _collect_incoming_data(self, data): |
| + self.incoming.append(data) |
| + |
| + def _get_data(self): |
| + d = ''.join(self.incoming) |
| + del self.incoming[:] |
| + return d |
| def found_terminator(self): |
| - raise NotImplementedError, "must be implemented in subclass" |
| + raise NotImplementedError("must be implemented in subclass") |
| def set_terminator (self, term): |
| "Set the input delimiter. Can be a fixed string of any length, an integer, or None" |
| @@ -96,7 +115,7 @@ |
| # Continue to search for self.terminator in self.ac_in_buffer, |
| # while calling self.collect_incoming_data. The while loop |
| # is necessary because we might read several data+terminator |
| - # combos with a single recv(1024). |
| + # combos with a single recv(4096). |
| while self.ac_in_buffer: |
| lb = len(self.ac_in_buffer) |
| @@ -150,88 +169,83 @@ |
| self.ac_in_buffer = '' |
| def handle_write (self): |
| - self.initiate_send () |
| + self.initiate_send() |
| def handle_close (self): |
| self.close() |
| def push (self, data): |
| - self.producer_fifo.push (simple_producer (data)) |
| + sabs = self.ac_out_buffer_size |
| + if len(data) > sabs: |
| + for i in xrange(0, len(data), sabs): |
| + self.producer_fifo.append(data[i:i+sabs]) |
| + else: |
| + self.producer_fifo.append(data) |
| self.initiate_send() |
| - |
| + |
| def push_with_producer (self, producer): |
| - self.producer_fifo.push (producer) |
| + self.producer_fifo.append(producer) |
| self.initiate_send() |
| - |
| + |
| def readable (self): |
| "predicate for inclusion in the readable for select()" |
| - return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) |
| + # cannot use the old predicate, it violates the claim of the |
| + # set_terminator method. |
| + |
| + # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) |
| + return 1 |
| def writable (self): |
| "predicate for inclusion in the writable for select()" |
| - # return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected) |
| - # this is about twice as fast, though not as clear. |
| - return not ( |
| - (self.ac_out_buffer == '') and |
| - self.producer_fifo.is_empty() and |
| - self.connected |
| - ) |
| + return self.producer_fifo or (not self.connected) |
| def close_when_done (self): |
| "automatically close this channel once the outgoing queue is empty" |
| - self.producer_fifo.push (None) |
| + self.producer_fifo.append(None) |
| - # refill the outgoing buffer by calling the more() method |
| - # of the first producer in the queue |
| - def refill_buffer (self): |
| - while 1: |
| - if len(self.producer_fifo): |
| - p = self.producer_fifo.first() |
| - # a 'None' in the producer fifo is a sentinel, |
| - # telling us to close the channel. |
| - if p is None: |
| - if not self.ac_out_buffer: |
| - self.producer_fifo.pop() |
| - self.close() |
| + def initiate_send(self): |
| + while self.producer_fifo and self.connected: |
| + first = self.producer_fifo[0] |
| + # handle empty string/buffer or None entry |
| + if not first: |
| + del self.producer_fifo[0] |
| + if first is None: |
| + self.handle_close() |
| return |
| - elif isinstance(p, str): |
| - self.producer_fifo.pop() |
| - self.ac_out_buffer = self.ac_out_buffer + p |
| - return |
| - data = p.more() |
| + |
| + # handle classic producer behavior |
| + obs = self.ac_out_buffer_size |
| + try: |
| + data = buffer(first, 0, obs) |
| + except TypeError: |
| + data = first.more() |
| if data: |
| - self.ac_out_buffer = self.ac_out_buffer + data |
| - return |
| + self.producer_fifo.appendleft(data) |
| else: |
| - self.producer_fifo.pop() |
| - else: |
| - return |
| + del self.producer_fifo[0] |
| + continue |
| - def initiate_send (self): |
| - obs = self.ac_out_buffer_size |
| - # try to refill the buffer |
| - if (len (self.ac_out_buffer) < obs): |
| - self.refill_buffer() |
| - |
| - if self.ac_out_buffer and self.connected: |
| - # try to send the buffer |
| + # send the data |
| try: |
| - num_sent = self.send (self.ac_out_buffer[:obs]) |
| - if num_sent: |
| - self.ac_out_buffer = self.ac_out_buffer[num_sent:] |
| - |
| - except socket.error, why: |
| + num_sent = self.send(data) |
| + except socket.error: |
| self.handle_error() |
| return |
| + if num_sent: |
| + if num_sent < len(data) or obs < len(first): |
| + self.producer_fifo[0] = first[num_sent:] |
| + else: |
| + del self.producer_fifo[0] |
| + # we tried to send some actual data |
| + return |
| + |
| def discard_buffers (self): |
| # Emergencies only! |
| self.ac_in_buffer = '' |
| - self.ac_out_buffer = '' |
| - while self.producer_fifo: |
| - self.producer_fifo.pop() |
| + del self.incoming[:] |
| + self.producer_fifo.clear() |
| - |
| class simple_producer: |
| def __init__ (self, data, buffer_size=512): |