| OLD | NEW |
| 1 # -*- Mode: Python; tab-width: 4 -*- | 1 # -*- Mode: Python; tab-width: 4 -*- |
| 2 # Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp | 2 # Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp |
| 3 # Author: Sam Rushing <rushing@nightmare.com> | 3 # Author: Sam Rushing <rushing@nightmare.com> |
| 4 | 4 |
| 5 # ====================================================================== | 5 # ====================================================================== |
| 6 # Copyright 1996 by Sam Rushing | 6 # Copyright 1996 by Sam Rushing |
| 7 # | 7 # |
| 8 # All Rights Reserved | 8 # All Rights Reserved |
| 9 # | 9 # |
| 10 # Permission to use, copy, modify, and distribute this software and | 10 # Permission to use, copy, modify, and distribute this software and |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 53 class async_chat (asyncore.dispatcher): | 53 class async_chat (asyncore.dispatcher): |
| 54 """This is an abstract class. You must derive from this class, and add | 54 """This is an abstract class. You must derive from this class, and add |
| 55 the two methods collect_incoming_data() and found_terminator()""" | 55 the two methods collect_incoming_data() and found_terminator()""" |
| 56 | 56 |
| 57 # these are overridable defaults | 57 # these are overridable defaults |
| 58 | 58 |
| 59 ac_in_buffer_size = 4096 | 59 ac_in_buffer_size = 4096 |
| 60 ac_out_buffer_size = 4096 | 60 ac_out_buffer_size = 4096 |
| 61 | 61 |
| 62 def __init__ (self, conn=None): | 62 def __init__ (self, conn=None): |
| 63 # for string terminator matching |
| 63 self.ac_in_buffer = '' | 64 self.ac_in_buffer = '' |
| 64 self.ac_out_buffer = '' | 65 |
| 65 self.producer_fifo = fifo() | 66 # we use a list here rather than cStringIO for a few reasons... |
| 67 # del lst[:] is faster than sio.truncate(0) |
| 68 # lst = [] is faster than sio.truncate(0) |
| 69 # cStringIO will be gaining unicode support in py3k, which |
| 70 # will negatively affect the performance of bytes compared to |
| 71 # a ''.join() equivalent |
| 72 self.incoming = [] |
| 73 |
| 74 # we toss the use of the "simple producer" and replace it with |
| 75 # a pure deque, which the original fifo was a wrapping of |
| 76 self.producer_fifo = deque() |
| 66 asyncore.dispatcher.__init__ (self, conn) | 77 asyncore.dispatcher.__init__ (self, conn) |
| 67 | 78 |
| 68 def collect_incoming_data(self, data): | 79 def collect_incoming_data(self, data): |
| 69 raise NotImplementedError, "must be implemented in subclass" | 80 raise NotImplementedError("must be implemented in subclass") |
| 81 |
| 82 def _collect_incoming_data(self, data): |
| 83 self.incoming.append(data) |
| 84 |
| 85 def _get_data(self): |
| 86 d = ''.join(self.incoming) |
| 87 del self.incoming[:] |
| 88 return d |
| 70 | 89 |
| 71 def found_terminator(self): | 90 def found_terminator(self): |
| 72 raise NotImplementedError, "must be implemented in subclass" | 91 raise NotImplementedError("must be implemented in subclass") |
| 73 | 92 |
| 74 def set_terminator (self, term): | 93 def set_terminator (self, term): |
| 75 "Set the input delimiter. Can be a fixed string of any length, an integ
er, or None" | 94 "Set the input delimiter. Can be a fixed string of any length, an integ
er, or None" |
| 76 self.terminator = term | 95 self.terminator = term |
| 77 | 96 |
| 78 def get_terminator (self): | 97 def get_terminator (self): |
| 79 return self.terminator | 98 return self.terminator |
| 80 | 99 |
| 81 # grab some more data from the socket, | 100 # grab some more data from the socket, |
| 82 # throw it to the collector method, | 101 # throw it to the collector method, |
| 83 # check for the terminator, | 102 # check for the terminator, |
| 84 # if found, transition to the next state. | 103 # if found, transition to the next state. |
| 85 | 104 |
| 86 def handle_read (self): | 105 def handle_read (self): |
| 87 | 106 |
| 88 try: | 107 try: |
| 89 data = self.recv (self.ac_in_buffer_size) | 108 data = self.recv (self.ac_in_buffer_size) |
| 90 except socket.error, why: | 109 except socket.error, why: |
| 91 self.handle_error() | 110 self.handle_error() |
| 92 return | 111 return |
| 93 | 112 |
| 94 self.ac_in_buffer = self.ac_in_buffer + data | 113 self.ac_in_buffer = self.ac_in_buffer + data |
| 95 | 114 |
| 96 # Continue to search for self.terminator in self.ac_in_buffer, | 115 # Continue to search for self.terminator in self.ac_in_buffer, |
| 97 # while calling self.collect_incoming_data. The while loop | 116 # while calling self.collect_incoming_data. The while loop |
| 98 # is necessary because we might read several data+terminator | 117 # is necessary because we might read several data+terminator |
| 99 # combos with a single recv(1024). | 118 # combos with a single recv(4096). |
| 100 | 119 |
| 101 while self.ac_in_buffer: | 120 while self.ac_in_buffer: |
| 102 lb = len(self.ac_in_buffer) | 121 lb = len(self.ac_in_buffer) |
| 103 terminator = self.get_terminator() | 122 terminator = self.get_terminator() |
| 104 if not terminator: | 123 if not terminator: |
| 105 # no terminator, collect it all | 124 # no terminator, collect it all |
| 106 self.collect_incoming_data (self.ac_in_buffer) | 125 self.collect_incoming_data (self.ac_in_buffer) |
| 107 self.ac_in_buffer = '' | 126 self.ac_in_buffer = '' |
| 108 elif isinstance(terminator, int) or isinstance(terminator, long): | 127 elif isinstance(terminator, int) or isinstance(terminator, long): |
| 109 # numeric terminator | 128 # numeric terminator |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 143 # we found a prefix, collect up to the prefix | 162 # we found a prefix, collect up to the prefix |
| 144 self.collect_incoming_data (self.ac_in_buffer[:-inde
x]) | 163 self.collect_incoming_data (self.ac_in_buffer[:-inde
x]) |
| 145 self.ac_in_buffer = self.ac_in_buffer[-index:] | 164 self.ac_in_buffer = self.ac_in_buffer[-index:] |
| 146 break | 165 break |
| 147 else: | 166 else: |
| 148 # no prefix, collect it all | 167 # no prefix, collect it all |
| 149 self.collect_incoming_data (self.ac_in_buffer) | 168 self.collect_incoming_data (self.ac_in_buffer) |
| 150 self.ac_in_buffer = '' | 169 self.ac_in_buffer = '' |
| 151 | 170 |
| 152 def handle_write (self): | 171 def handle_write (self): |
| 153 self.initiate_send () | 172 self.initiate_send() |
| 154 | 173 |
| 155 def handle_close (self): | 174 def handle_close (self): |
| 156 self.close() | 175 self.close() |
| 157 | 176 |
| 158 def push (self, data): | 177 def push (self, data): |
| 159 self.producer_fifo.push (simple_producer (data)) | 178 sabs = self.ac_out_buffer_size |
| 179 if len(data) > sabs: |
| 180 for i in xrange(0, len(data), sabs): |
| 181 self.producer_fifo.append(data[i:i+sabs]) |
| 182 else: |
| 183 self.producer_fifo.append(data) |
| 160 self.initiate_send() | 184 self.initiate_send() |
| 161 | 185 |
| 162 def push_with_producer (self, producer): | 186 def push_with_producer (self, producer): |
| 163 self.producer_fifo.push (producer) | 187 self.producer_fifo.append(producer) |
| 164 self.initiate_send() | 188 self.initiate_send() |
| 165 | 189 |
| 166 def readable (self): | 190 def readable (self): |
| 167 "predicate for inclusion in the readable for select()" | 191 "predicate for inclusion in the readable for select()" |
| 168 return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) | 192 # cannot use the old predicate, it violates the claim of the |
| 193 # set_terminator method. |
| 194 |
| 195 # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) |
| 196 return 1 |
| 169 | 197 |
| 170 def writable (self): | 198 def writable (self): |
| 171 "predicate for inclusion in the writable for select()" | 199 "predicate for inclusion in the writable for select()" |
| 172 # return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self
.connected) | 200 return self.producer_fifo or (not self.connected) |
| 173 # this is about twice as fast, though not as clear. | |
| 174 return not ( | |
| 175 (self.ac_out_buffer == '') and | |
| 176 self.producer_fifo.is_empty() and | |
| 177 self.connected | |
| 178 ) | |
| 179 | 201 |
| 180 def close_when_done (self): | 202 def close_when_done (self): |
| 181 "automatically close this channel once the outgoing queue is empty" | 203 "automatically close this channel once the outgoing queue is empty" |
| 182 self.producer_fifo.push (None) | 204 self.producer_fifo.append(None) |
| 183 | 205 |
| 184 # refill the outgoing buffer by calling the more() method | 206 def initiate_send(self): |
| 185 # of the first producer in the queue | 207 while self.producer_fifo and self.connected: |
| 186 def refill_buffer (self): | 208 first = self.producer_fifo[0] |
| 187 while 1: | 209 # handle empty string/buffer or None entry |
| 188 if len(self.producer_fifo): | 210 if not first: |
| 189 p = self.producer_fifo.first() | 211 del self.producer_fifo[0] |
| 190 # a 'None' in the producer fifo is a sentinel, | 212 if first is None: |
| 191 # telling us to close the channel. | 213 self.handle_close() |
| 192 if p is None: | |
| 193 if not self.ac_out_buffer: | |
| 194 self.producer_fifo.pop() | |
| 195 self.close() | |
| 196 return | 214 return |
| 197 elif isinstance(p, str): | 215 |
| 198 self.producer_fifo.pop() | 216 # handle classic producer behavior |
| 199 self.ac_out_buffer = self.ac_out_buffer + p | 217 obs = self.ac_out_buffer_size |
| 200 return | 218 try: |
| 201 data = p.more() | 219 data = buffer(first, 0, obs) |
| 220 except TypeError: |
| 221 data = first.more() |
| 202 if data: | 222 if data: |
| 203 self.ac_out_buffer = self.ac_out_buffer + data | 223 self.producer_fifo.appendleft(data) |
| 204 return | |
| 205 else: | 224 else: |
| 206 self.producer_fifo.pop() | 225 del self.producer_fifo[0] |
| 207 else: | 226 continue |
| 227 |
| 228 # send the data |
| 229 try: |
| 230 num_sent = self.send(data) |
| 231 except socket.error: |
| 232 self.handle_error() |
| 208 return | 233 return |
| 209 | 234 |
| 210 def initiate_send (self): | 235 if num_sent: |
| 211 obs = self.ac_out_buffer_size | 236 if num_sent < len(data) or obs < len(first): |
| 212 # try to refill the buffer | 237 self.producer_fifo[0] = first[num_sent:] |
| 213 if (len (self.ac_out_buffer) < obs): | 238 else: |
| 214 self.refill_buffer() | 239 del self.producer_fifo[0] |
| 215 | 240 # we tried to send some actual data |
| 216 if self.ac_out_buffer and self.connected: | 241 return |
| 217 # try to send the buffer | |
| 218 try: | |
| 219 num_sent = self.send (self.ac_out_buffer[:obs]) | |
| 220 if num_sent: | |
| 221 self.ac_out_buffer = self.ac_out_buffer[num_sent:] | |
| 222 | |
| 223 except socket.error, why: | |
| 224 self.handle_error() | |
| 225 return | |
| 226 | 242 |
| 227 def discard_buffers (self): | 243 def discard_buffers (self): |
| 228 # Emergencies only! | 244 # Emergencies only! |
| 229 self.ac_in_buffer = '' | 245 self.ac_in_buffer = '' |
| 230 self.ac_out_buffer = '' | 246 del self.incoming[:] |
| 231 while self.producer_fifo: | 247 self.producer_fifo.clear() |
| 232 self.producer_fifo.pop() | |
| 233 | |
| 234 | 248 |
| 235 class simple_producer: | 249 class simple_producer: |
| 236 | 250 |
| 237 def __init__ (self, data, buffer_size=512): | 251 def __init__ (self, data, buffer_size=512): |
| 238 self.data = data | 252 self.data = data |
| 239 self.buffer_size = buffer_size | 253 self.buffer_size = buffer_size |
| 240 | 254 |
| 241 def more (self): | 255 def more (self): |
| 242 if len (self.data) > self.buffer_size: | 256 if len (self.data) > self.buffer_size: |
| 243 result = self.data[:self.buffer_size] | 257 result = self.data[:self.buffer_size] |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 286 # new python: 28961/s | 300 # new python: 28961/s |
| 287 # old python: 18307/s | 301 # old python: 18307/s |
| 288 # re: 12820/s | 302 # re: 12820/s |
| 289 # regex: 14035/s | 303 # regex: 14035/s |
| 290 | 304 |
| 291 def find_prefix_at_end (haystack, needle): | 305 def find_prefix_at_end (haystack, needle): |
| 292 l = len(needle) - 1 | 306 l = len(needle) - 1 |
| 293 while l and not haystack.endswith(needle[:l]): | 307 while l and not haystack.endswith(needle[:l]): |
| 294 l -= 1 | 308 l -= 1 |
| 295 return l | 309 return l |
| OLD | NEW |