OLD | NEW |
1 import sys | 1 import sys |
2 import os | 2 import os |
3 import unittest | 3 import unittest |
4 from array import array | 4 from array import array |
5 from weakref import proxy | 5 from weakref import proxy |
6 | 6 |
| 7 import io |
| 8 import _pyio as pyio |
| 9 |
7 from test.support import TESTFN, findfile, run_unittest | 10 from test.support import TESTFN, findfile, run_unittest |
8 from collections import UserList | 11 from collections import UserList |
9 | 12 |
10 class AutoFileTests(unittest.TestCase): | 13 class AutoFileTests(unittest.TestCase): |
11 # file tests for which a test file is automatically set up | 14 # file tests for which a test file is automatically set up |
12 | 15 |
13 def setUp(self): | 16 def setUp(self): |
14 self.f = open(TESTFN, 'wb') | 17 self.f = self.open(TESTFN, 'wb') |
15 | 18 |
16 def tearDown(self): | 19 def tearDown(self): |
17 if self.f: | 20 if self.f: |
18 self.f.close() | 21 self.f.close() |
19 os.remove(TESTFN) | 22 os.remove(TESTFN) |
20 | 23 |
21 def testWeakRefs(self): | 24 def testWeakRefs(self): |
22 # verify weak references | 25 # verify weak references |
23 p = proxy(self.f) | 26 p = proxy(self.f) |
24 p.write(b'teststring') | 27 p.write(b'teststring') |
25 self.assertEquals(self.f.tell(), p.tell()) | 28 self.assertEquals(self.f.tell(), p.tell()) |
26 self.f.close() | 29 self.f.close() |
27 self.f = None | 30 self.f = None |
28 self.assertRaises(ReferenceError, getattr, p, 'tell') | 31 self.assertRaises(ReferenceError, getattr, p, 'tell') |
29 | 32 |
30 def testAttributes(self): | 33 def testAttributes(self): |
31 # verify expected attributes exist | 34 # verify expected attributes exist |
32 f = self.f | 35 f = self.f |
33 f.name # merely shouldn't blow up | 36 f.name # merely shouldn't blow up |
34 f.mode # ditto | 37 f.mode # ditto |
35 f.closed # ditto | 38 f.closed # ditto |
36 | 39 |
37 def testReadinto(self): | 40 def testReadinto(self): |
38 # verify readinto | 41 # verify readinto |
39 self.f.write(b'12') | 42 self.f.write(b'12') |
40 self.f.close() | 43 self.f.close() |
41 a = array('b', b'x'*10) | 44 a = array('b', b'x'*10) |
42 self.f = open(TESTFN, 'rb') | 45 self.f = self.open(TESTFN, 'rb') |
43 n = self.f.readinto(a) | 46 n = self.f.readinto(a) |
44 self.assertEquals(b'12', a.tostring()[:n]) | 47 self.assertEquals(b'12', a.tostring()[:n]) |
45 | 48 |
46 def testReadinto_text(self): | 49 def testReadinto_text(self): |
47 # verify readinto refuses text files | 50 # verify readinto refuses text files |
48 a = array('b', b'x'*10) | 51 a = array('b', b'x'*10) |
49 self.f.close() | 52 self.f.close() |
50 self.f = open(TESTFN, 'r') | 53 self.f = self.open(TESTFN, 'r') |
51 if hasattr(self.f, "readinto"): | 54 if hasattr(self.f, "readinto"): |
52 self.assertRaises(TypeError, self.f.readinto, a) | 55 self.assertRaises(TypeError, self.f.readinto, a) |
53 | 56 |
54 def testWritelinesUserList(self): | 57 def testWritelinesUserList(self): |
55 # verify writelines with instance sequence | 58 # verify writelines with instance sequence |
56 l = UserList([b'1', b'2']) | 59 l = UserList([b'1', b'2']) |
57 self.f.writelines(l) | 60 self.f.writelines(l) |
58 self.f.close() | 61 self.f.close() |
59 self.f = open(TESTFN, 'rb') | 62 self.f = self.open(TESTFN, 'rb') |
60 buf = self.f.read() | 63 buf = self.f.read() |
61 self.assertEquals(buf, b'12') | 64 self.assertEquals(buf, b'12') |
62 | 65 |
63 def testWritelinesIntegers(self): | 66 def testWritelinesIntegers(self): |
64 # verify writelines with integers | 67 # verify writelines with integers |
65 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3]) | 68 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3]) |
66 | 69 |
67 def testWritelinesIntegersUserList(self): | 70 def testWritelinesIntegersUserList(self): |
68 # verify writelines with integers in UserList | 71 # verify writelines with integers in UserList |
69 l = UserList([1,2,3]) | 72 l = UserList([1,2,3]) |
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
119 self.assertEquals(self.f.__exit__(None, None, None), None) | 122 self.assertEquals(self.f.__exit__(None, None, None), None) |
120 # it must also return None if an exception was given | 123 # it must also return None if an exception was given |
121 try: | 124 try: |
122 1/0 | 125 1/0 |
123 except: | 126 except: |
124 self.assertEquals(self.f.__exit__(*sys.exc_info()), None) | 127 self.assertEquals(self.f.__exit__(*sys.exc_info()), None) |
125 | 128 |
126 def testReadWhenWriting(self): | 129 def testReadWhenWriting(self): |
127 self.assertRaises(IOError, self.f.read) | 130 self.assertRaises(IOError, self.f.read) |
128 | 131 |
| 132 class CAutoFileTests(AutoFileTests): |
| 133 open = io.open |
| 134 |
| 135 class PyAutoFileTests(AutoFileTests): |
| 136 open = staticmethod(pyio.open) |
| 137 |
| 138 |
129 class OtherFileTests(unittest.TestCase): | 139 class OtherFileTests(unittest.TestCase): |
130 | 140 |
131 def testModeStrings(self): | 141 def testModeStrings(self): |
132 # check invalid mode strings | 142 # check invalid mode strings |
133 for mode in ("", "aU", "wU+"): | 143 for mode in ("", "aU", "wU+"): |
134 try: | 144 try: |
135 f = open(TESTFN, mode) | 145 f = self.open(TESTFN, mode) |
136 except ValueError: | 146 except ValueError: |
137 pass | 147 pass |
138 else: | 148 else: |
139 f.close() | 149 f.close() |
140 self.fail('%r is an invalid file mode' % mode) | 150 self.fail('%r is an invalid file mode' % mode) |
141 | 151 |
142 def testStdin(self): | 152 def testStdin(self): |
143 # This causes the interpreter to exit on OSF1 v5.1. | 153 # This causes the interpreter to exit on OSF1 v5.1. |
144 if sys.platform != 'osf1V5': | 154 if sys.platform != 'osf1V5': |
145 self.assertRaises((IOError, ValueError), sys.stdin.seek, -1) | 155 self.assertRaises((IOError, ValueError), sys.stdin.seek, -1) |
146 else: | 156 else: |
147 print(( | 157 print(( |
148 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.' | 158 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.' |
149 ' Test manually.'), file=sys.__stdout__) | 159 ' Test manually.'), file=sys.__stdout__) |
150 self.assertRaises((IOError, ValueError), sys.stdin.truncate) | 160 self.assertRaises((IOError, ValueError), sys.stdin.truncate) |
151 | 161 |
152 def testBadModeArgument(self): | 162 def testBadModeArgument(self): |
153 # verify that we get a sensible error message for bad mode argument | 163 # verify that we get a sensible error message for bad mode argument |
154 bad_mode = "qwerty" | 164 bad_mode = "qwerty" |
155 try: | 165 try: |
156 f = open(TESTFN, bad_mode) | 166 f = self.open(TESTFN, bad_mode) |
157 except ValueError as msg: | 167 except ValueError as msg: |
158 if msg.args[0] != 0: | 168 if msg.args[0] != 0: |
159 s = str(msg) | 169 s = str(msg) |
160 if s.find(TESTFN) != -1 or s.find(bad_mode) == -1: | 170 if s.find(TESTFN) != -1 or s.find(bad_mode) == -1: |
161 self.fail("bad error message for invalid mode: %s" % s) | 171 self.fail("bad error message for invalid mode: %s" % s) |
162 # if msg.args[0] == 0, we're probably on Windows where there may be | 172 # if msg.args[0] == 0, we're probably on Windows where there may be |
163 # no obvious way to discover why open() failed. | 173 # no obvious way to discover why open() failed. |
164 else: | 174 else: |
165 f.close() | 175 f.close() |
166 self.fail("no error for invalid mode: %s" % bad_mode) | 176 self.fail("no error for invalid mode: %s" % bad_mode) |
167 | 177 |
168 def testSetBufferSize(self): | 178 def testSetBufferSize(self): |
169 # make sure that explicitly setting the buffer size doesn't cause | 179 # make sure that explicitly setting the buffer size doesn't cause |
170 # misbehaviour especially with repeated close() calls | 180 # misbehaviour especially with repeated close() calls |
171 for s in (-1, 0, 1, 512): | 181 for s in (-1, 0, 1, 512): |
172 try: | 182 try: |
173 f = open(TESTFN, 'wb', s) | 183 f = self.open(TESTFN, 'wb', s) |
174 f.write(str(s).encode("ascii")) | 184 f.write(str(s).encode("ascii")) |
175 f.close() | 185 f.close() |
176 f.close() | 186 f.close() |
177 f = open(TESTFN, 'rb', s) | 187 f = self.open(TESTFN, 'rb', s) |
178 d = int(f.read().decode("ascii")) | 188 d = int(f.read().decode("ascii")) |
179 f.close() | 189 f.close() |
180 f.close() | 190 f.close() |
181 except IOError as msg: | 191 except IOError as msg: |
182 self.fail('error setting buffer size %d: %s' % (s, str(msg))) | 192 self.fail('error setting buffer size %d: %s' % (s, str(msg))) |
183 self.assertEquals(d, s) | 193 self.assertEquals(d, s) |
184 | 194 |
185 def testTruncateOnWindows(self): | 195 def testTruncateOnWindows(self): |
186 # SF bug <http://www.python.org/sf/801631> | 196 # SF bug <http://www.python.org/sf/801631> |
187 # "file.truncate fault on windows" | 197 # "file.truncate fault on windows" |
188 | 198 |
189 os.unlink(TESTFN) | 199 os.unlink(TESTFN) |
190 f = open(TESTFN, 'wb') | 200 f = self.open(TESTFN, 'wb') |
191 | 201 |
192 try: | 202 try: |
193 f.write(b'12345678901') # 11 bytes | 203 f.write(b'12345678901') # 11 bytes |
194 f.close() | 204 f.close() |
195 | 205 |
196 f = open(TESTFN,'rb+') | 206 f = self.open(TESTFN,'rb+') |
197 data = f.read(5) | 207 data = f.read(5) |
198 if data != b'12345': | 208 if data != b'12345': |
199 self.fail("Read on file opened for update failed %r" % data) | 209 self.fail("Read on file opened for update failed %r" % data) |
200 if f.tell() != 5: | 210 if f.tell() != 5: |
201 self.fail("File pos after read wrong %d" % f.tell()) | 211 self.fail("File pos after read wrong %d" % f.tell()) |
202 | 212 |
203 f.truncate() | 213 f.truncate() |
204 if f.tell() != 5: | 214 if f.tell() != 5: |
205 self.fail("File pos after ftruncate wrong %d" % f.tell()) | 215 self.fail("File pos after ftruncate wrong %d" % f.tell()) |
206 | 216 |
(...skipping 19 matching lines...) Expand all Loading... |
226 b"saussages, spam, spam and eggs\n", | 236 b"saussages, spam, spam and eggs\n", |
227 b"spam, ham, spam and eggs\n", | 237 b"spam, ham, spam and eggs\n", |
228 b"spam, spam, spam, spam, spam, ham, spam\n", | 238 b"spam, spam, spam, spam, spam, ham, spam\n", |
229 b"wonderful spaaaaaam.\n" | 239 b"wonderful spaaaaaam.\n" |
230 ] | 240 ] |
231 methods = [("readline", ()), ("read", ()), ("readlines", ()), | 241 methods = [("readline", ()), ("read", ()), ("readlines", ()), |
232 ("readinto", (array("b", b" "*100),))] | 242 ("readinto", (array("b", b" "*100),))] |
233 | 243 |
234 try: | 244 try: |
235 # Prepare the testfile | 245 # Prepare the testfile |
236 bag = open(TESTFN, "wb") | 246 bag = self.open(TESTFN, "wb") |
237 bag.write(filler * nchunks) | 247 bag.write(filler * nchunks) |
238 bag.writelines(testlines) | 248 bag.writelines(testlines) |
239 bag.close() | 249 bag.close() |
240 # Test for appropriate errors mixing read* and iteration | 250 # Test for appropriate errors mixing read* and iteration |
241 for methodname, args in methods: | 251 for methodname, args in methods: |
242 f = open(TESTFN, 'rb') | 252 f = self.open(TESTFN, 'rb') |
243 if next(f) != filler: | 253 if next(f) != filler: |
244 self.fail, "Broken testfile" | 254 self.fail, "Broken testfile" |
245 meth = getattr(f, methodname) | 255 meth = getattr(f, methodname) |
246 meth(*args) # This simply shouldn't fail | 256 meth(*args) # This simply shouldn't fail |
247 f.close() | 257 f.close() |
248 | 258 |
249 # Test to see if harmless (by accident) mixing of read* and | 259 # Test to see if harmless (by accident) mixing of read* and |
250 # iteration still works. This depends on the size of the internal | 260 # iteration still works. This depends on the size of the internal |
251 # iteration buffer (currently 8192,) but we can test it in a | 261 # iteration buffer (currently 8192,) but we can test it in a |
252 # flexible manner. Each line in the bag o' ham is 4 bytes | 262 # flexible manner. Each line in the bag o' ham is 4 bytes |
253 # ("h", "a", "m", "\n"), so 4096 lines of that should get us | 263 # ("h", "a", "m", "\n"), so 4096 lines of that should get us |
254 # exactly on the buffer boundary for any power-of-2 buffersize | 264 # exactly on the buffer boundary for any power-of-2 buffersize |
255 # between 4 and 16384 (inclusive). | 265 # between 4 and 16384 (inclusive). |
256 f = open(TESTFN, 'rb') | 266 f = self.open(TESTFN, 'rb') |
257 for i in range(nchunks): | 267 for i in range(nchunks): |
258 next(f) | 268 next(f) |
259 testline = testlines.pop(0) | 269 testline = testlines.pop(0) |
260 try: | 270 try: |
261 line = f.readline() | 271 line = f.readline() |
262 except ValueError: | 272 except ValueError: |
263 self.fail("readline() after next() with supposedly empty " | 273 self.fail("readline() after next() with supposedly empty " |
264 "iteration-buffer failed anyway") | 274 "iteration-buffer failed anyway") |
265 if line != testline: | 275 if line != testline: |
266 self.fail("readline() after next() with empty buffer " | 276 self.fail("readline() after next() with empty buffer " |
(...skipping 21 matching lines...) Expand all Loading... |
288 "failed. Got %r, expected %r" % (line, testline)) | 298 "failed. Got %r, expected %r" % (line, testline)) |
289 try: | 299 try: |
290 lines = f.readlines() | 300 lines = f.readlines() |
291 except ValueError: | 301 except ValueError: |
292 self.fail("readlines() after next() with supposedly empty " | 302 self.fail("readlines() after next() with supposedly empty " |
293 "iteration-buffer failed anyway") | 303 "iteration-buffer failed anyway") |
294 if lines != testlines: | 304 if lines != testlines: |
295 self.fail("readlines() after next() with empty buffer " | 305 self.fail("readlines() after next() with empty buffer " |
296 "failed. Got %r, expected %r" % (line, testline)) | 306 "failed. Got %r, expected %r" % (line, testline)) |
297 # Reading after iteration hit EOF shouldn't hurt either | 307 # Reading after iteration hit EOF shouldn't hurt either |
298 f = open(TESTFN, 'rb') | 308 f = self.open(TESTFN, 'rb') |
299 try: | 309 try: |
300 for line in f: | 310 for line in f: |
301 pass | 311 pass |
302 try: | 312 try: |
303 f.readline() | 313 f.readline() |
304 f.readinto(buf) | 314 f.readinto(buf) |
305 f.read() | 315 f.read() |
306 f.readlines() | 316 f.readlines() |
307 except ValueError: | 317 except ValueError: |
308 self.fail("read* failed after next() consumed file") | 318 self.fail("read* failed after next() consumed file") |
309 finally: | 319 finally: |
310 f.close() | 320 f.close() |
311 finally: | 321 finally: |
312 os.unlink(TESTFN) | 322 os.unlink(TESTFN) |
313 | 323 |
| 324 class COtherFileTests(OtherFileTests): |
| 325 open = io.open |
| 326 |
| 327 class PyOtherFileTests(OtherFileTests): |
| 328 open = staticmethod(pyio.open) |
| 329 |
314 | 330 |
315 def test_main(): | 331 def test_main(): |
316 # Historically, these tests have been sloppy about removing TESTFN. | 332 # Historically, these tests have been sloppy about removing TESTFN. |
317 # So get rid of it no matter what. | 333 # So get rid of it no matter what. |
318 try: | 334 try: |
319 run_unittest(AutoFileTests, OtherFileTests) | 335 run_unittest(CAutoFileTests, PyAutoFileTests, |
| 336 COtherFileTests, PyOtherFileTests) |
320 finally: | 337 finally: |
321 if os.path.exists(TESTFN): | 338 if os.path.exists(TESTFN): |
322 os.unlink(TESTFN) | 339 os.unlink(TESTFN) |
323 | 340 |
324 if __name__ == '__main__': | 341 if __name__ == '__main__': |
325 test_main() | 342 test_main() |
OLD | NEW |