Left: | ||
Right: |
LEFT | RIGHT |
---|---|
(no file at all) | |
1 # Copyright: 2011 MoinMoin:ThomasWaldmann | 1 # Copyright: 2011 MoinMoin:ThomasWaldmann |
2 # License: GNU GPL v2 (or any later version), see LICENSE.txt for details. | 2 # License: GNU GPL v2 (or any later version), see LICENSE.txt for details. |
3 | 3 |
4 """ | 4 """ |
5 MoinMoin - indexing middleware tests | 5 MoinMoin - indexing middleware tests |
6 """ | 6 """ |
7 | 7 |
8 | 8 |
9 from __future__ import absolute_import, division | 9 from __future__ import absolute_import, division |
10 | 10 |
11 from StringIO import StringIO | 11 from StringIO import StringIO |
12 import hashlib | 12 import hashlib |
13 | 13 |
14 import pytest | 14 import pytest |
15 | 15 |
16 from flask import g as flaskg | 16 from flask import g as flaskg |
17 | 17 |
18 from MoinMoin.config import NAME, SIZE, ITEMID, REVID, DATAID, HASH_ALGORITHM, C ONTENT, COMMENT, \ | 18 from MoinMoin.config import NAME, SIZE, ITEMID, REVID, DATAID, HASH_ALGORITHM, C ONTENT, COMMENT, \ |
19 LATEST_REVS, ALL_REVS | 19 ALL_REVS, MASTER_BRANCH, PARENTID, CURRENT, \ |
20 BRANCH, TAG, USERHEAD | |
21 from MoinMoin.user import User | |
20 | 22 |
21 from ..indexing import IndexingMiddleware | 23 from ..indexing import IndexingMiddleware |
22 | 24 |
23 from MoinMoin.auth import GivenAuth | 25 from MoinMoin.auth import GivenAuth |
24 from MoinMoin._tests import wikiconfig | 26 from MoinMoin._tests import wikiconfig |
25 from MoinMoin.storage.backends.stores import MutableBackend | 27 from MoinMoin.storage.backends.stores import MutableBackend |
26 from MoinMoin.storage.stores.memory import BytesStore as MemoryBytesStore | 28 from MoinMoin.storage.stores.memory import BytesStore as MemoryBytesStore |
27 from MoinMoin.storage.stores.memory import FileStore as MemoryFileStore | 29 from MoinMoin.storage.stores.memory import FileStore as MemoryFileStore |
28 from MoinMoin.storage import create_simple_mapping | 30 from MoinMoin.storage import create_simple_mapping |
29 from MoinMoin.storage.middleware import routing | 31 from MoinMoin.storage.middleware import routing |
32 from MoinMoin.util.crypto import make_uuid | |
30 | 33 |
31 | 34 |
32 def dumper(indexer, idx_name): | 35 def dumper(indexer, idx_name): |
33 print "*** %s ***" % idx_name | 36 print "*** %s ***" % idx_name |
34 for kvs in indexer.dump(idx_name=idx_name): | 37 for kvs in indexer.dump(idx_name=idx_name): |
35 for k, v in kvs: | 38 for k, v in kvs: |
36 print k, repr(v)[:70] | 39 print k, repr(v)[:70] |
37 print | 40 print |
38 | 41 |
39 | 42 |
40 class TestIndexingMiddleware(object): | 43 class TestIndexingMiddleware(object): |
41 reinit_storage = True # cleanup after each test method | 44 reinit_storage = True # cleanup after each test method |
42 | 45 |
43 def setup_method(self, method): | 46 def setup_method(self, method): |
44 self.imw = flaskg.unprotected_storage | 47 self.imw = flaskg.unprotected_storage |
45 | 48 |
46 def teardown_method(self, method): | 49 def teardown_method(self, method): |
47 pass | 50 pass |
48 | 51 |
49 def test_nonexisting_item(self): | 52 def test_nonexisting_item(self): |
50 item = self.imw[u'foo'] | 53 item = self.imw[u'foo'] |
51 assert not item # does not exist | 54 assert not item # does not exist |
52 | 55 |
53 def test_store_revision(self): | 56 def test_store_revision(self): |
54 item_name = u'foo' | 57 item_name = u'foo' |
55 data = 'bar' | 58 data = 'bar' |
56 item = self.imw[item_name] | 59 branch = MASTER_BRANCH |
57 rev = item.store_revision(dict(name=item_name), StringIO(data)) | 60 item = self.imw[item_name] |
61 rev = item.store_revision(dict(name=item_name), StringIO(data), branch) | |
58 revid = rev.revid | 62 revid = rev.revid |
59 # check if we have the revision now: | 63 # check if we have the revision now: |
60 item = self.imw[item_name] | 64 item = self.imw[item_name] |
61 assert item # does exist | 65 assert item # does exist |
62 rev = item.get_revision(revid) | 66 rev = item.get_revision(revid) |
63 assert rev.meta[NAME] == item_name | 67 assert rev.meta[NAME] == item_name |
64 assert rev.data.read() == data | 68 assert rev.data.read() == data |
65 revids = [rev.revid for rev in item.iter_revs()] | 69 revids = [rev.revid for rev in item.iter_revs()] |
66 assert revids == [revid] | 70 assert revids == [revid] |
67 | 71 |
72 def test_nonmaster_branch(self): | |
73 item_name = u'foo' | |
74 data1 = 'bar' | |
75 data2 = 'baz' | |
76 branch = u'branch1' | |
77 item = self.imw[item_name] | |
78 rev1 = item.store_revision(dict(name=item_name), StringIO(data1), | |
79 MASTER_BRANCH) | |
80 revid1 = rev1.revid | |
81 rev2 = item.store_revision(dict(name=item_name), StringIO(data2), | |
82 branch) | |
83 revid2 = rev2.revid | |
84 item = self.imw[item_name] | |
85 assert item | |
86 rev1 = item.get_revision(revid1) | |
87 rev2 = item.get_revision(revid2) | |
88 assert rev1.meta[NAME] == rev2.meta[NAME] == item_name | |
89 assert rev1.data.read() == data1 | |
90 assert rev2.data.read() == data2 | |
91 | |
68 def test_overwrite_revision(self): | 92 def test_overwrite_revision(self): |
69 item_name = u'foo' | 93 item_name = u'foo' |
70 data = 'bar' | 94 data = 'bar' |
71 newdata = 'baz' | 95 newdata = 'baz' |
72 item = self.imw[item_name] | 96 item = self.imw[item_name] |
73 rev = item.store_revision(dict(name=item_name, comment=u'spam'), StringI O(data)) | 97 rev = item.store_revision(dict(name=item_name, comment=u'spam'), StringI O(data), MASTER_BRANCH) |
74 revid = rev.revid | 98 revid = rev.revid |
75 # clear revision: | 99 # clear revision: |
76 item.store_revision(dict(name=item_name, revid=revid, comment=u'no spam' ), StringIO(newdata), overwrite=True) | 100 item.store_revision(dict(name=item_name, revid=revid, comment=u'no spam' ), StringIO(newdata), MASTER_BRANCH, overwrite=True) |
77 # check if the revision was overwritten: | 101 # check if the revision was overwritten: |
78 item = self.imw[item_name] | 102 item = self.imw[item_name] |
79 rev = item.get_revision(revid) | 103 rev = item.get_revision(revid) |
80 assert rev.meta[NAME] == item_name | 104 assert rev.meta[NAME] == item_name |
81 assert rev.meta[COMMENT] == u'no spam' | 105 assert rev.meta[COMMENT] == u'no spam' |
82 assert rev.data.read() == newdata | 106 assert rev.data.read() == newdata |
83 revids = [rev.revid for rev in item.iter_revs()] | 107 revids = [rev.revid for rev in item.iter_revs()] |
84 assert len(revids) == 1 # we still have the revision, cleared | 108 assert len(revids) == 1 # we still have the revision, cleared |
85 assert revid in revids # it is still same revid | 109 assert revid in revids # it is still same revid |
86 | 110 |
87 def test_destroy_revision(self): | 111 def test_destroy_revision(self): |
88 item_name = u'foo' | 112 item_name = u'foo' |
89 item = self.imw[item_name] | 113 item = self.imw[item_name] |
90 rev = item.store_revision(dict(name=item_name, mtime=1), | 114 rev = item.store_revision(dict(name=item_name, mtime=1), |
91 StringIO('bar'), trusted=True) | 115 StringIO('bar'), MASTER_BRANCH, |
116 trusted=True) | |
92 revid0 = rev.revid | 117 revid0 = rev.revid |
93 rev = item.store_revision(dict(name=item_name, mtime=2), | 118 rev = item.store_revision(dict(name=item_name, mtime=2), |
94 StringIO('baz'), trusted=True) | 119 StringIO('baz'), MASTER_BRANCH, |
120 trusted=True) | |
95 revid1 = rev.revid | 121 revid1 = rev.revid |
96 rev = item.store_revision(dict(name=item_name, mtime=3), | 122 rev = item.store_revision(dict(name=item_name, mtime=3), |
97 StringIO('...'), trusted=True) | 123 StringIO('...'), MASTER_BRANCH, |
124 trusted=True) | |
98 revid2 = rev.revid | 125 revid2 = rev.revid |
99 print "revids:", revid0, revid1, revid2 | 126 print "revids:", revid0, revid1, revid2 |
100 # destroy a non-current revision: | 127 # destroy a non-current revision: |
101 item.destroy_revision(revid0) | 128 item.destroy_revision(revid0) |
102 # check if the revision was destroyed: | 129 # check if the revision was destroyed: |
103 item = self.imw[item_name] | 130 item = self.imw[item_name] |
104 with pytest.raises(KeyError): | 131 with pytest.raises(KeyError): |
105 item.get_revision(revid0) | 132 item.get_revision(revid0) |
106 revids = [rev.revid for rev in item.iter_revs()] | 133 revids = [rev.revid for rev in item.iter_revs()] |
107 print "after destroy revid0", revids | 134 print "after destroy revid0", revids |
(...skipping 15 matching lines...) Expand all Loading... | |
123 item.get_revision(revid1) | 150 item.get_revision(revid1) |
124 revids = [rev.revid for rev in item.iter_revs()] | 151 revids = [rev.revid for rev in item.iter_revs()] |
125 print "after destroy revid1", revids | 152 print "after destroy revid1", revids |
126 assert sorted(revids) == sorted([]) | 153 assert sorted(revids) == sorted([]) |
127 | 154 |
128 def test_destroy_item(self): | 155 def test_destroy_item(self): |
129 revids = [] | 156 revids = [] |
130 item_name = u'foo' | 157 item_name = u'foo' |
131 item = self.imw[item_name] | 158 item = self.imw[item_name] |
132 rev = item.store_revision(dict(name=item_name, mtime=1), | 159 rev = item.store_revision(dict(name=item_name, mtime=1), |
133 StringIO('bar'), trusted=True) | 160 StringIO('bar'), MASTER_BRANCH, |
161 trusted=True) | |
134 revids.append(rev.revid) | 162 revids.append(rev.revid) |
135 rev = item.store_revision(dict(name=item_name, mtime=2), | 163 rev = item.store_revision(dict(name=item_name, mtime=2), |
136 StringIO('baz'), trusted=True) | 164 StringIO('baz'), MASTER_BRANCH, |
165 trusted=True) | |
137 revids.append(rev.revid) | 166 revids.append(rev.revid) |
138 # destroy item: | 167 # destroy item: |
139 item.destroy_all_revisions() | 168 item.destroy_all_revisions() |
140 # check if the item was destroyed: | 169 # check if the item was destroyed: |
141 item = self.imw[item_name] | 170 item = self.imw[item_name] |
142 assert not item # does not exist | 171 assert not item # does not exist |
143 | 172 |
144 def test_all_revisions(self): | 173 def test_all_revisions(self): |
145 item_name = u'foo' | 174 item_name = u'foo' |
146 item = self.imw[item_name] | 175 item = self.imw[item_name] |
147 item.store_revision(dict(name=item_name), StringIO('does not count, diff erent name')) | 176 item.store_revision(dict(name=item_name), |
177 StringIO('does not count, different name'), | |
178 MASTER_BRANCH) | |
148 item_name = u'bar' | 179 item_name = u'bar' |
149 item = self.imw[item_name] | 180 item = self.imw[item_name] |
150 item.store_revision(dict(name=item_name), StringIO('1st')) | 181 item.store_revision(dict(name=item_name), StringIO('1st'), MASTER_BRANCH ) |
151 item.store_revision(dict(name=item_name), StringIO('2nd')) | 182 item.store_revision(dict(name=item_name), StringIO('2nd'), MASTER_BRANCH ) |
152 item = self.imw[item_name] | 183 item = self.imw[item_name] |
153 revs = [rev.data.read() for rev in item.iter_revs()] | 184 revs = [rev.data.read() for rev in item.iter_revs()] |
154 assert len(revs) == 2 | 185 assert len(revs) == 2 |
155 assert set(revs) == set(['1st', '2nd']) | 186 assert set(revs) == set(['1st', '2nd']) |
156 | 187 |
157 def test_latest_revision(self): | 188 def test_branches_update(self): |
158 item_name = u'foo' | 189 item_name = u'foo' |
159 item = self.imw[item_name] | 190 item = self.imw[item_name] |
160 item.store_revision(dict(name=item_name), StringIO('does not count, diff erent name')) | 191 item.store_revision(dict(name=item_name), StringIO('does not count, diff erent name'), |
192 MASTER_BRANCH) | |
161 item_name = u'bar' | 193 item_name = u'bar' |
162 item = self.imw[item_name] | 194 item = self.imw[item_name] |
163 item.store_revision(dict(name=item_name), StringIO('1st')) | 195 item.store_revision(dict(name=item_name), StringIO('1st'), MASTER_BRANCH ) |
164 expected_rev = item.store_revision(dict(name=item_name), StringIO('2nd') ) | 196 item.store_revision(dict(name=item_name), StringIO('does not count, diff erent branch'), |
165 revs = list(self.imw.documents(name=item_name)) | 197 u'branch1') |
166 assert len(revs) == 1 # there is only 1 latest revision | 198 expected_rev = item.store_revision(dict(name=item_name), StringIO('2nd') , |
167 assert expected_rev.revid == revs[0].revid # it is really the latest on e | 199 MASTER_BRANCH) |
200 item = self.imw[item_name] | |
201 rev = item.get_revision(None, MASTER_BRANCH) | |
202 assert rev | |
203 assert expected_rev.revid == rev.revid # it is really the latest one | |
168 | 204 |
169 def test_auto_meta(self): | 205 def test_auto_meta(self): |
170 item_name = u'foo' | 206 item_name = u'foo' |
171 data = 'bar' | 207 data = 'bar' |
172 item = self.imw[item_name] | 208 item = self.imw[item_name] |
173 rev = item.store_revision(dict(name=item_name), StringIO(data)) | 209 rev = item.store_revision(dict(name=item_name), StringIO(data), |
210 MASTER_BRANCH) | |
174 print repr(rev.meta) | 211 print repr(rev.meta) |
175 assert rev.meta[NAME] == item_name | 212 assert rev.meta[NAME] == item_name |
176 assert rev.meta[SIZE] == len(data) | 213 assert rev.meta[SIZE] == len(data) |
177 assert rev.meta[HASH_ALGORITHM] == hashlib.new(HASH_ALGORITHM, data).hex digest() | 214 assert rev.meta[HASH_ALGORITHM] == hashlib.new(HASH_ALGORITHM, data).hex digest() |
178 assert ITEMID in rev.meta | 215 assert ITEMID in rev.meta |
179 assert REVID in rev.meta | 216 assert REVID in rev.meta |
180 assert DATAID in rev.meta | 217 assert DATAID in rev.meta |
181 | 218 |
182 def test_documents(self): | 219 def test_documents(self): |
183 item_name = u'foo' | 220 item_name = u'foo' |
184 item = self.imw[item_name] | 221 item = self.imw[item_name] |
185 rev1 = item.store_revision(dict(name=item_name), StringIO('x')) | 222 rev1 = item.store_revision(dict(name=item_name), StringIO('x'), MASTER_B RANCH) |
186 rev2 = item.store_revision(dict(name=item_name), StringIO('xx')) | 223 rev2 = item.store_revision(dict(name=item_name), StringIO('xx'), MASTER_ BRANCH) |
187 rev3 = item.store_revision(dict(name=item_name), StringIO('xxx')) | 224 rev3 = item.store_revision(dict(name=item_name), StringIO('xxx'), MASTER _BRANCH) |
188 rev = self.imw.document(idx_name=ALL_REVS, size=2) | |
189 assert rev | |
190 assert rev.revid == rev2.revid | |
191 revs = list(self.imw.documents(idx_name=ALL_REVS, size=2)) | 225 revs = list(self.imw.documents(idx_name=ALL_REVS, size=2)) |
192 assert len(revs) == 1 | 226 assert len(revs) == 1 |
193 assert revs[0].revid == rev2.revid | 227 assert revs[0].revid == rev2.revid |
194 | 228 |
195 def test_index_rebuild(self): | 229 def test_index_rebuild(self): |
196 # first we index some stuff the slow "on-the-fly" way: | 230 # first we index some stuff the slow "on-the-fly" way: |
197 expected_latest_revids = [] | 231 expected_latest_revids = [] |
198 item_name = u'foo' | 232 item_name = u'foo' |
199 item = self.imw[item_name] | 233 item = self.imw[item_name] |
200 r = item.store_revision(dict(name=item_name, mtime=1), | 234 r = item.store_revision(dict(name=item_name, mtime=1), |
201 StringIO('does not count, different name'), trus ted=True) | 235 StringIO('does not count, different name'), |
202 expected_latest_revids.append(r.revid) | 236 MASTER_BRANCH, trusted=True) |
203 item_name = u'bar' | 237 item_name = u'bar' |
204 item = self.imw[item_name] | 238 item = self.imw[item_name] |
205 item.store_revision(dict(name=item_name, mtime=1), | 239 item.store_revision(dict(name=item_name, mtime=1), |
206 StringIO('1st'), trusted=True) | 240 StringIO('1st'), MASTER_BRANCH, trusted=True) |
207 r = item.store_revision(dict(name=item_name, mtime=2), | 241 r = item.store_revision(dict(name=item_name, mtime=2), |
208 StringIO('2nd'), trusted=True) | 242 StringIO('2nd'), MASTER_BRANCH, trusted=True) |
209 expected_latest_revids.append(r.revid) | |
210 | 243 |
211 # now we remember the index contents built that way: | 244 # now we remember the index contents built that way: |
212 expected_latest_revs = list(self.imw.documents()) | |
213 expected_all_revs = list(self.imw.documents(idx_name=ALL_REVS)) | 245 expected_all_revs = list(self.imw.documents(idx_name=ALL_REVS)) |
214 | 246 |
215 print "*** all on-the-fly:" | 247 print "*** all on-the-fly:" |
216 self.imw.dump(idx_name=ALL_REVS) | 248 self.imw.dump(idx_name=ALL_REVS) |
217 print "*** latest on-the-fly:" | |
218 self.imw.dump(idx_name=LATEST_REVS) | |
219 | 249 |
220 # now kill the index and do a full rebuild | 250 # now kill the index and do a full rebuild |
221 self.imw.close() | 251 self.imw.close() |
222 self.imw.destroy() | 252 self.imw.destroy() |
223 self.imw.create() | 253 self.imw.create() |
224 self.imw.rebuild() | 254 self.imw.rebuild() |
225 self.imw.open() | 255 self.imw.open() |
226 | 256 |
227 # read the index contents built that way: | 257 # read the index contents built that way: |
228 all_revs = list(self.imw.documents(idx_name=ALL_REVS)) | 258 all_revs = list(self.imw.documents(idx_name=ALL_REVS)) |
229 latest_revs = list(self.imw.documents()) | |
230 latest_revids = [rev.revid for rev in latest_revs] | |
231 | 259 |
232 print "*** all rebuilt:" | 260 print "*** all rebuilt:" |
233 self.imw.dump(idx_name=ALL_REVS) | 261 self.imw.dump(idx_name=ALL_REVS) |
234 print "*** latest rebuilt:" | |
235 self.imw.dump(idx_name=LATEST_REVS) | |
236 | 262 |
237 # should be all the same, order does not matter: | 263 # should be all the same, order does not matter: |
238 assert sorted(expected_all_revs) == sorted(all_revs) | 264 assert sorted(expected_all_revs) == sorted(all_revs) |
239 assert sorted(expected_latest_revs) == sorted(latest_revs) | |
240 assert sorted(latest_revids) == sorted(expected_latest_revids) | |
241 | 265 |
242 def test_index_update(self): | 266 def test_index_update(self): |
243 # first we index some stuff the slow "on-the-fly" way: | 267 # first we index some stuff the slow "on-the-fly" way: |
244 expected_all_revids = [] | 268 expected_all_revids = [] |
245 expected_latest_revids = [] | 269 expected_latest_revids = [] |
246 missing_revids = [] | 270 missing_revids = [] |
247 item_name = u'updated' | 271 item_name = u'updated' |
248 item = self.imw[item_name] | 272 item = self.imw[item_name] |
249 r = item.store_revision(dict(name=item_name, mtime=1), | 273 r = item.store_revision(dict(name=item_name, mtime=1), |
250 StringIO('updated 1st'), trusted=True) | 274 StringIO('updated 1st'), |
275 MASTER_BRANCH, trusted=True) | |
251 expected_all_revids.append(r.revid) | 276 expected_all_revids.append(r.revid) |
252 # we update this item below, so we don't add it to expected_latest_revid s | 277 |
253 item_name = u'destroyed' | 278 item_name = u'destroyed' |
254 item = self.imw[item_name] | 279 item = self.imw[item_name] |
255 r = item.store_revision(dict(name=item_name, mtime=1), | 280 r = item.store_revision(dict(name=item_name, mtime=1), |
256 StringIO('destroyed 1st'), trusted=True) | 281 StringIO('destroyed 1st'), |
282 MASTER_BRANCH, trusted=True) | |
257 destroy_revid = r.revid | 283 destroy_revid = r.revid |
258 # we destroy this item below, so we don't add it to expected_all_revids | 284 # we destroy this item below, so we don't add it to expected_all_revids |
259 # we destroy this item below, so we don't add it to expected_latest_revi ds | 285 |
260 item_name = u'stayssame' | 286 item_name = u'stayssame' |
261 item = self.imw[item_name] | 287 item = self.imw[item_name] |
262 r = item.store_revision(dict(name=item_name, mtime=1), | 288 r = item.store_revision(dict(name=item_name, mtime=1), |
263 StringIO('stayssame 1st'), trusted=True) | 289 StringIO('stayssame 1st'), |
290 MASTER_BRANCH, trusted=True) | |
264 expected_all_revids.append(r.revid) | 291 expected_all_revids.append(r.revid) |
265 # we update this item below, so we don't add it to expected_latest_revid s | 292 |
266 r = item.store_revision(dict(name=item_name, mtime=2), | 293 r = item.store_revision(dict(name=item_name, mtime=2), |
267 StringIO('stayssame 2nd'), trusted=True) | 294 StringIO('stayssame 2nd'), |
295 MASTER_BRANCH, trusted=True) | |
268 expected_all_revids.append(r.revid) | 296 expected_all_revids.append(r.revid) |
269 expected_latest_revids.append(r.revid) | |
270 | 297 |
271 dumper(self.imw, ALL_REVS) | 298 dumper(self.imw, ALL_REVS) |
272 dumper(self.imw, LATEST_REVS) | |
273 | 299 |
274 # now build a fresh index at tmp location: | 300 # now build a fresh index at tmp location: |
275 self.imw.create(tmp=True) | 301 self.imw.create(tmp=True) |
276 self.imw.rebuild(tmp=True) | 302 self.imw.rebuild(tmp=True) |
277 | 303 |
278 # while the fresh index still sits at the tmp location, we update and ad d some items. | 304 # while the fresh index still sits at the tmp location, we update and ad d some items. |
279 # this will not change the fresh index, but the old index we are still u sing. | 305 # this will not change the fresh index, but the old index we are still u sing. |
280 item_name = u'updated' | 306 item_name = u'updated' |
281 item = self.imw[item_name] | 307 item = self.imw[item_name] |
282 r = item.store_revision(dict(name=item_name, mtime=2), | 308 r = item.store_revision(dict(name=item_name, mtime=2), |
283 StringIO('updated 2nd'), trusted=True) | 309 StringIO('updated 2nd'), |
310 MASTER_BRANCH, trusted=True) | |
284 expected_all_revids.append(r.revid) | 311 expected_all_revids.append(r.revid) |
285 expected_latest_revids.append(r.revid) | |
286 missing_revids.append(r.revid) | 312 missing_revids.append(r.revid) |
287 item_name = u'added' | 313 item_name = u'added' |
288 item = self.imw[item_name] | 314 item = self.imw[item_name] |
289 r = item.store_revision(dict(name=item_name, mtime=1), | 315 r = item.store_revision(dict(name=item_name, mtime=1), |
290 StringIO('added 1st'), trusted=True) | 316 StringIO('added 1st'), |
317 MASTER_BRANCH, trusted=True) | |
291 expected_all_revids.append(r.revid) | 318 expected_all_revids.append(r.revid) |
292 expected_latest_revids.append(r.revid) | |
293 missing_revids.append(r.revid) | 319 missing_revids.append(r.revid) |
294 item_name = u'destroyed' | 320 item_name = u'destroyed' |
295 item = self.imw[item_name] | 321 item = self.imw[item_name] |
296 item.destroy_revision(destroy_revid) | 322 item.destroy_revision(destroy_revid) |
297 | 323 |
298 # now switch to the not-quite-fresh-any-more index we have built: | 324 # now switch to the not-quite-fresh-any-more index we have built: |
299 self.imw.close() | 325 self.imw.close() |
300 self.imw.move_index() | 326 self.imw.move_index() |
301 self.imw.open() | 327 self.imw.open() |
302 | 328 |
303 dumper(self.imw, ALL_REVS) | 329 dumper(self.imw, ALL_REVS) |
304 dumper(self.imw, LATEST_REVS) | |
305 | 330 |
306 # read the index contents we have now: | 331 # read the index contents we have now: |
307 all_revids = [doc[REVID] for doc in self.imw._documents(idx_name=ALL_REV S)] | 332 all_revids = [doc[REVID] for doc in self.imw._documents(idx_name=ALL_REV S)] |
308 latest_revids = [doc[REVID] for doc in self.imw._documents()] | |
309 | 333 |
310 # this index is outdated: | 334 # this index is outdated: |
311 for missing_revid in missing_revids: | 335 for missing_revid in missing_revids: |
312 assert missing_revid not in all_revids | 336 assert missing_revid not in all_revids |
313 assert missing_revid not in latest_revids | |
314 | 337 |
315 # update the index: | 338 # update the index: |
316 self.imw.close() | 339 self.imw.close() |
317 self.imw.update() | 340 self.imw.update() |
318 self.imw.open() | 341 self.imw.open() |
319 | 342 |
320 dumper(self.imw, ALL_REVS) | 343 dumper(self.imw, ALL_REVS) |
321 dumper(self.imw, LATEST_REVS) | |
322 | 344 |
323 # read the index contents we have now: | 345 # read the index contents we have now: |
324 all_revids = [doc[REVID] for doc in self.imw._documents(idx_name=ALL_REV S)] | 346 all_revids = [doc[REVID] for doc in self.imw._documents(idx_name=ALL_REV S)] |
325 latest_revids = [doc[REVID] for doc in self.imw._documents()] | |
326 | 347 |
327 # now it should have the previously missing rev and all should be as exp ected: | 348 # now it should have the previously missing rev and all should be as exp ected: |
328 for missing_revid in missing_revids: | 349 for missing_revid in missing_revids: |
329 assert missing_revid in all_revids | 350 assert missing_revid in all_revids |
330 assert missing_revid in latest_revids | |
331 assert sorted(all_revids) == sorted(expected_all_revids) | 351 assert sorted(all_revids) == sorted(expected_all_revids) |
332 assert sorted(latest_revids) == sorted(expected_latest_revids) | |
333 | 352 |
334 def test_revision_contextmanager(self): | 353 def test_revision_contextmanager(self): |
335 # check if rev.data is closed after leaving the with-block | 354 # check if rev.data is closed after leaving the with-block |
336 item_name = u'foo' | 355 item_name = u'foo' |
337 meta = dict(name=item_name) | 356 meta = dict(name=item_name) |
338 data = 'some test content' | 357 data = 'some test content' |
339 item = self.imw[item_name] | 358 item = self.imw[item_name] |
340 data_file = StringIO(data) | 359 data_file = StringIO(data) |
341 with item.store_revision(meta, data_file) as rev: | 360 with item.store_revision(meta, data_file, MASTER_BRANCH) as rev: |
342 assert rev.data.read() == data | 361 assert rev.data.read() == data |
343 revid = rev.revid | 362 revid = rev.revid |
344 with pytest.raises(ValueError): | 363 with pytest.raises(ValueError): |
345 rev.data.read() | 364 rev.data.read() |
346 with item.get_revision(revid) as rev: | 365 with item.get_revision(revid) as rev: |
347 assert rev.data.read() == data | 366 assert rev.data.read() == data |
348 with pytest.raises(ValueError): | 367 with pytest.raises(ValueError): |
349 rev.data.read() | 368 rev.data.read() |
350 | 369 |
370 def test_get_head_revid_by_branch(self): | |
371 # first test with normal (named) branch | |
372 item_name = u'foo' | |
373 branch = u'branch' | |
374 item = self.imw[item_name] | |
375 item.store_revision(dict(name=item_name), StringIO(u'data2'), branch) | |
376 expected_revid = item.store_revision(dict(name=item_name), | |
377 StringIO(u'data3'), branch).revid | |
378 item.store_revision(dict(name=item_name), StringIO(u'data1'), | |
379 MASTER_BRANCH) | |
380 revid = item.get_head_revid_by_branch(branch) | |
381 assert revid == expected_revid | |
382 | |
383 # now test with pointer to the revid ($-started) | |
384 revid = item.get_head_revid_by_branch(u'$' + expected_revid) | |
385 assert revid == expected_revid | |
351 | 386 |
352 def test_indexed_content(self): | 387 def test_indexed_content(self): |
353 # TODO: this is a very simple check that assumes that data is put 1:1 | 388 # TODO: this is a very simple check that assumes that data is put 1:1 |
354 # into index' CONTENT field. | 389 # into index' CONTENT field. |
355 item_name = u'foo' | 390 item_name = u'foo' |
356 meta = dict(name=item_name, contenttype=u'text/plain') | 391 meta = dict(name=item_name, contenttype=u'text/plain') |
357 data = 'some test content\n' | 392 data = 'some test content\n' |
358 item = self.imw[item_name] | 393 item = self.imw[item_name] |
359 data_file = StringIO(data) | 394 data_file = StringIO(data) |
360 with item.store_revision(meta, data_file) as rev: | 395 with item.store_revision(meta, data_file, MASTER_BRANCH) as rev: |
361 expected_revid = rev.revid | 396 expected_revid = rev.revid |
362 doc = self.imw._document(content=u'test') | 397 doc = self.imw._document(content=u'test') |
363 assert doc is not None | 398 assert doc is not None |
364 assert expected_revid == doc[REVID] | 399 assert expected_revid == doc[REVID] |
365 assert unicode(data) == doc[CONTENT] | 400 assert unicode(data) == doc[CONTENT] |
366 | 401 |
402 def test_current_user_get(self): | |
ThomasJWaldmann
2012/08/07 20:29:51
for non-trivial tests add a short docstring and de
| |
403 item_name = u'foo' | |
404 meta = dict(name=item_name, contenttype=u'text/plain') | |
405 flaskg.user.name = u'anonymous' | |
406 branch = u'branch1' | |
407 item = self.imw[item_name] | |
408 rev1 = item.store_revision(meta, StringIO('data1'), MASTER_BRANCH) | |
409 meta[PARENTID] = rev1.revid | |
410 rev2 = item.store_revision(meta, StringIO('data2'), MASTER_BRANCH) | |
411 # switch the user and create his branch | |
412 flaskg.user.name = u'non-anon' | |
413 flaskg.user.valid = True | |
414 uuid = flaskg.user.itemid = make_uuid() | |
415 rev3 = item.store_revision(meta, StringIO('data3'), branch) | |
416 # now switch back to anonymous; he should receive rev2 | |
417 flaskg.user.name = u'anonymous' | |
418 flaskg.user.valid = False | |
419 flaskg.user.itemid = make_uuid() # anon users have random uuid in Moin | |
420 rev = self.imw[item_name][CURRENT] | |
421 assert rev.revid == rev2.revid | |
422 assert rev.data.read() == 'data2' | |
423 # switch the known user; he should receive his rev3 | |
424 flaskg.user.name = u'non-anon' | |
425 flaskg.user.valid = True | |
426 flaskg.user.itemid = uuid | |
427 rev = self.imw[item_name][CURRENT] | |
428 assert rev.revid == rev3.revid | |
429 assert rev.data.read() == 'data3' | |
430 | |
367 class TestProtectedIndexingMiddleware(object): | 431 class TestProtectedIndexingMiddleware(object): |
368 reinit_storage = True # cleanup after each test method | 432 reinit_storage = True # cleanup after each test method |
369 | 433 |
434 def __init__(self, **kwargs): | |
435 flaskg.user = User() | |
ThomasJWaldmann
2012/08/07 20:29:51
why is this needed?
| |
436 return super(TestProtectedIndexingMiddleware, self).__init__(**kwargs) | |
437 | |
370 class Config(wikiconfig.Config): | 438 class Config(wikiconfig.Config): |
371 auth = [GivenAuth(user_name=u'joe', autocreate=True), ] | 439 auth = [GivenAuth(user_name=u'joe', autocreate=True), ] |
372 | 440 |
373 def setup_method(self, method): | 441 def setup_method(self, method): |
374 self.imw = flaskg.storage | 442 self.imw = flaskg.storage |
375 | 443 |
376 def teardown_method(self, method): | 444 def teardown_method(self, method): |
377 pass | 445 pass |
378 | 446 |
379 def test_documents(self): | 447 def test_documents(self): |
380 item_name = u'public' | 448 item_name = u'public' |
381 item = self.imw[item_name] | 449 item = self.imw[item_name] |
382 r = item.store_revision(dict(name=item_name, acl=u'joe:read'), StringIO( 'public content')) | 450 r = item.store_revision(dict(name=item_name, acl=u'joe:read'), |
451 StringIO('public content'), MASTER_BRANCH) | |
383 revid_public = r.revid | 452 revid_public = r.revid |
384 revids = [rev.revid for rev in self.imw.documents() | 453 revids = [rev.revid for rev in self.imw.documents() |
385 if rev.meta[NAME] != u'joe'] # the user profile is a revision in the backend | 454 if rev.meta[NAME] != u'joe'] # the user profile is a revision in the backend |
386 assert revids == [revid_public] | 455 assert revids == [revid_public] |
387 | 456 |
388 def test_getitem(self): | 457 def test_getitem(self): |
389 item_name = u'public' | 458 item_name = u'public' |
390 item = self.imw[item_name] | 459 item = self.imw[item_name] |
391 r = item.store_revision(dict(name=item_name, acl=u'joe:read'), StringIO( 'public content')) | 460 r = item.store_revision(dict(name=item_name, acl=u'joe:read'), |
461 StringIO('public content'), MASTER_BRANCH) | |
392 revid_public = r.revid | 462 revid_public = r.revid |
393 # now testing: | 463 # now testing: |
394 item_name = u'public' | 464 item_name = u'public' |
395 item = self.imw[item_name] | 465 item = self.imw[item_name] |
396 r = item[revid_public] | 466 r = item[revid_public] |
397 assert r.data.read() == 'public content' | 467 assert r.data.read() == 'public content' |
398 | 468 |
399 def test_perf_create_only(self): | 469 def test_perf_create_only(self): |
400 pytest.skip("usually we do no performance tests") | 470 pytest.skip("usually we do no performance tests") |
401 # determine create revisions performance | 471 # determine create revisions performance |
(...skipping 10 matching lines...) Expand all Loading... | |
412 # doing index lookups name -> itemid, itemid -> revids list | 482 # doing index lookups name -> itemid, itemid -> revids list |
413 item_name = u'foo' | 483 item_name = u'foo' |
414 item = self.imw[item_name] | 484 item = self.imw[item_name] |
415 for i in xrange(100): | 485 for i in xrange(100): |
416 item.store_revision(dict(name=item_name, acl=u'joe:create joe:read') , StringIO('rev number {0}'.format(i))) | 486 item.store_revision(dict(name=item_name, acl=u'joe:create joe:read') , StringIO('rev number {0}'.format(i))) |
417 for r in item.iter_revs(): | 487 for r in item.iter_revs(): |
418 #print r.meta | 488 #print r.meta |
419 #print r.data.read() | 489 #print r.data.read() |
420 pass | 490 pass |
421 | 491 |
LEFT | RIGHT |