1 | # This Python file uses the following encoding: utf-8 |
---|
2 | """ |
---|
3 | header set implementation tests |
---|
4 | |
---|
5 | $Id: test_header_set.py 64214 2008-05-03 02:49:17Z newbery $ |
---|
6 | """ |
---|
7 | |
---|
8 | from base import CacheFuTestCase |
---|
9 | |
---|
10 | from AccessControl import Unauthorized |
---|
11 | from Interface.Verify import verifyObject |
---|
12 | from Products.Archetypes.interfaces.base import IBaseContent |
---|
13 | from Products.CMFCore.utils import getToolByName |
---|
14 | from Products.CMFPlone.utils import _createObjectByType |
---|
15 | |
---|
16 | from Products.CacheSetup.config import * |
---|
17 | from Products.CacheSetup.content.nocatalog import NoCatalog |
---|
18 | |
---|
19 | class TestHeaderSet(CacheFuTestCase): |
---|
20 | |
---|
21 | def afterSetUp(self): |
---|
22 | _createObjectByType('HeaderSet', self.folder, 'hs') |
---|
23 | self.hs = getattr(self.folder, 'hs') |
---|
24 | |
---|
25 | def testIsNoCatalog(self): |
---|
26 | self.failUnless(isinstance(self.hs, NoCatalog)) |
---|
27 | |
---|
28 | def testImplementsBaseContent(self): |
---|
29 | iface = IBaseContent |
---|
30 | self.failUnless(iface.isImplementedBy(self.hs)) |
---|
31 | self.failUnless(verifyObject(iface, self.hs)) |
---|
32 | |
---|
33 | def testTypeInfo(self): |
---|
34 | ti = self.hs.getTypeInfo() |
---|
35 | self.failUnlessEqual(ti.Title(), 'Response Header Set') |
---|
36 | self.failUnlessEqual(ti.getId(), 'HeaderSet') |
---|
37 | self.failUnlessEqual(ti.Metatype(), 'HeaderSet') |
---|
38 | self.failUnlessEqual(ti.globalAllow(), 0) |
---|
39 | self.failUnlessEqual(ti.getMethodAliases(), {'(Default)': 'cache_policy_item_config', |
---|
40 | 'view': 'cache_policy_item_config', |
---|
41 | 'edit': 'cache_policy_item_config'}) |
---|
42 | |
---|
43 | def testActions(self): |
---|
44 | # not pretty sure about this |
---|
45 | actions = ('object/view',) |
---|
46 | ttool = getToolByName(self.portal, 'portal_types') |
---|
47 | hs = ttool['HeaderSet'] |
---|
48 | # actions have ManagePortal permission set |
---|
49 | self.assertRaises(Unauthorized, hs.getActionInfo, actions) |
---|
50 | self.setRoles(['Manager', 'Member']) |
---|
51 | info = hs.getActionInfo(actions) |
---|
52 | self.failUnless(info is not None) |
---|
53 | self.failUnlessEqual(info['url'], '') |
---|
54 | |
---|
55 | class TestHeaderSetMethods(CacheFuTestCase): |
---|
56 | |
---|
57 | def afterSetUp(self): |
---|
58 | _createObjectByType('HeaderSet', self.folder, 'hs') |
---|
59 | self.hs = getattr(self.folder, 'hs') |
---|
60 | |
---|
61 | def _test_validate_expression(self): |
---|
62 | self.fail('not yet implemented...') |
---|
63 | |
---|
64 | def _test_getLastModifiedValue(self): |
---|
65 | self.fail('not yet implemented...') |
---|
66 | |
---|
67 | def _test_getVaryValue(self): |
---|
68 | self.fail('not yet implemented...') |
---|
69 | |
---|
70 | def _test_getPageCacheKey(self): |
---|
71 | self.fail('not yet implemented...') |
---|
72 | |
---|
73 | def _test_getEtagValue(self): |
---|
74 | self.fail('not yet implemented...') |
---|
75 | |
---|
76 | def _test_getEtagValue(self): |
---|
77 | self.fail('not yet implemented...') |
---|
78 | |
---|
79 | def _test_getHeaders(self): |
---|
80 | self.fail('not yet implemented...') |
---|
81 | |
---|
82 | from App.Common import rfc1123_date |
---|
83 | |
---|
84 | # util for making content in a container |
---|
85 | def makeContent(container, id, portal_type, title=None): |
---|
86 | container.invokeFactory(id=id, type_name=portal_type) |
---|
87 | o = getattr(container, id) |
---|
88 | if title is not None: |
---|
89 | o.setTitle(title) |
---|
90 | return o |
---|
91 | |
---|
92 | class TestHeaderSetOld(CacheFuTestCase): |
---|
93 | USER1 = 'user1' |
---|
94 | |
---|
95 | def afterSetUp(self): |
---|
96 | CacheFuTestCase.afterSetUp(self) |
---|
97 | |
---|
98 | # Add a couple of users |
---|
99 | self.portal.acl_users._doAddUser('manager', 'secret', ['Manager'], []) |
---|
100 | self.portal.acl_users._doAddUser(self.USER1, 'secret', ['Member'], []) |
---|
101 | self.login('manager') |
---|
102 | |
---|
103 | self.portal.portal_quickinstaller.installProducts(['CacheSetup']) |
---|
104 | |
---|
105 | # We have added a skin so we need to rebuild the skin object |
---|
106 | # (since the object is cached in the current request) |
---|
107 | self._refreshSkinData() |
---|
108 | |
---|
109 | self.folder.invokeFactory(id='doc', type_name='Document') |
---|
110 | pcs = self.portal.portal_cache_settings |
---|
111 | pcs.setEnabled(True) |
---|
112 | |
---|
113 | headers = pcs.getHeaderSets() |
---|
114 | headers.invokeFactory(id='my_hs', type_name='HeaderSet') |
---|
115 | rules = pcs.getRules() |
---|
116 | rules.invokeFactory(id='my_rule', type_name='ContentCacheRule') |
---|
117 | |
---|
118 | def test_get_etag(self): |
---|
119 | pcs = self.portal.portal_cache_settings |
---|
120 | h = getattr(pcs.getHeaderSets(), 'my_hs') |
---|
121 | rule = getattr(pcs.getRules(), 'my_rule') |
---|
122 | doc = getattr(self.folder, 'doc') |
---|
123 | member = pcs.getMember() |
---|
124 | expr_context = rule._getExpressionContext(doc.REQUEST, doc, 'document_view', member) |
---|
125 | |
---|
126 | rule.setEtagComponents(['expression']) |
---|
127 | rule.setEtagExpression('string:my_etag') |
---|
128 | rule.setEtagRequestValues([]) |
---|
129 | rule.setEtagTimeout(None) |
---|
130 | |
---|
131 | h.setEtag(True) |
---|
132 | self.assertEqual(h.getEtagValue(expr_context), '|my_etag') |
---|
133 | h.setEtag(False) |
---|
134 | self.assertEqual(h.getEtagValue(expr_context), None) |
---|
135 | |
---|
136 | def _getCacheControl(self, hdrs): |
---|
137 | d = self._parseHeaders(hdrs[0]) |
---|
138 | cc = d['Cache-control'] |
---|
139 | cc = [cc.strip() for cc in cc.split(',')] |
---|
140 | d = {} |
---|
141 | for part in cc: |
---|
142 | sp = part.split('=') |
---|
143 | token = sp[0].strip() |
---|
144 | if len(sp) > 1: |
---|
145 | value = sp[1] |
---|
146 | else: |
---|
147 | value = None |
---|
148 | self.failIf(d.has_key(token)) |
---|
149 | d[token] = value |
---|
150 | return d |
---|
151 | |
---|
152 | def _parseHeaders(self, hdrs): |
---|
153 | d = {} |
---|
154 | for (k,v) in hdrs: |
---|
155 | self.failUnless(not d.has_key(k)) |
---|
156 | d[k] = v |
---|
157 | return d |
---|
158 | |
---|
159 | def test_headers_last_modified(self): |
---|
160 | pcs = self.portal.portal_cache_settings |
---|
161 | h = getattr(pcs.getHeaderSets(), 'my_hs') |
---|
162 | rule = getattr(pcs.getRules(), 'my_rule') |
---|
163 | doc = getattr(self.folder, 'doc') |
---|
164 | member = pcs.getMember() |
---|
165 | expr_context = rule._getExpressionContext(doc.REQUEST, doc, 'document_view', member) |
---|
166 | |
---|
167 | h.setLastModified('delete') |
---|
168 | (headers_to_add, headers_to_remove) = h.getHeaders(expr_context) |
---|
169 | self.assertEqual(headers_to_remove, ['Last-modified']) |
---|
170 | h.setLastModified('no') |
---|
171 | (headers_to_add, headers_to_remove) = h.getHeaders(expr_context) |
---|
172 | self.assertEqual(headers_to_remove, []) |
---|
173 | self.failUnless('Last-modified' not in [hdr[0] for hdr in headers_to_add]) |
---|
174 | h.setLastModified('yes') |
---|
175 | (headers_to_add, headers_to_remove) = h.getHeaders(expr_context) |
---|
176 | self.assertEqual(headers_to_remove, []) |
---|
177 | d = self._parseHeaders(headers_to_add) |
---|
178 | self.failUnless(d.has_key('Last-modified')) |
---|
179 | mod_time = rfc1123_date(doc.modified().timeTime()) |
---|
180 | self.failUnlessEqual(d['Last-modified'], mod_time) |
---|
181 | |
---|
182 | def test_headers_etag(self): |
---|
183 | pcs = self.portal.portal_cache_settings |
---|
184 | h = getattr(pcs.getHeaderSets(), 'my_hs') |
---|
185 | rule = getattr(pcs.getRules(), 'my_rule') |
---|
186 | doc = getattr(self.folder, 'doc') |
---|
187 | member = pcs.getMember() |
---|
188 | expr_context = rule._getExpressionContext(doc.REQUEST, doc, 'document_view', member) |
---|
189 | |
---|
190 | rule.setEtagComponents(['expression']) |
---|
191 | rule.setEtagExpression('string:my_etag') |
---|
192 | rule.setEtagRequestValues([]) |
---|
193 | rule.setEtagTimeout(None) |
---|
194 | |
---|
195 | h.setEtag(True) |
---|
196 | (headers_to_add, headers_to_remove) = h.getHeaders(expr_context) |
---|
197 | d = self._parseHeaders(headers_to_add) |
---|
198 | self.assertEqual(d['ETag'], '|my_etag') |
---|
199 | h.setEtag(False) |
---|
200 | (headers_to_add, headers_to_remove) = h.getHeaders(expr_context) |
---|
201 | d = self._parseHeaders(headers_to_add) |
---|
202 | self.failIf(d.has_key('ETag')) |
---|
203 | |
---|
204 | def test_headers_vary(self): |
---|
205 | pcs = self.portal.portal_cache_settings |
---|
206 | h = getattr(pcs.getHeaderSets(), 'my_hs') |
---|
207 | rule = getattr(pcs.getRules(), 'my_rule') |
---|
208 | doc = getattr(self.folder, 'doc') |
---|
209 | member = pcs.getMember() |
---|
210 | expr_context = rule._getExpressionContext(doc.REQUEST, doc, 'document_view', member) |
---|
211 | |
---|
212 | #h.setVary('vary_str') |
---|
213 | pcs.setVaryHeader('vary_str') |
---|
214 | (headers_to_add, headers_to_remove) = h.getHeaders(expr_context) |
---|
215 | d = self._parseHeaders(headers_to_add) |
---|
216 | self.assertEqual(d['Vary'], 'vary_str') |
---|
217 | |
---|
218 | h.setVary('') |
---|
219 | (headers_to_add, headers_to_remove) = h.getHeaders(expr_context) |
---|
220 | d = self._parseHeaders(headers_to_add) |
---|
221 | self.failUnless(not d.has_key('Vary')) |
---|
222 | |
---|
223 | def test_headers_cache_control(self): |
---|
224 | pcs = self.portal.portal_cache_settings |
---|
225 | h = getattr(pcs.getHeaderSets(), 'my_hs') |
---|
226 | rule = getattr(pcs.getRules(), 'my_rule') |
---|
227 | doc = getattr(self.folder, 'doc') |
---|
228 | member = pcs.getMember() |
---|
229 | from DateTime import DateTime |
---|
230 | now = DateTime() |
---|
231 | expr_context = rule._getExpressionContext(doc.REQUEST, doc, 'document_view', member, time=now) |
---|
232 | |
---|
233 | (headers_to_add, headers_to_remove) = h.getHeaders(expr_context) |
---|
234 | d = self._parseHeaders(headers_to_add) |
---|
235 | self.failIf(d.has_key('Cache-control')) |
---|
236 | |
---|
237 | h.setPublic(True) # add a token to ensure we have a cache-control header |
---|
238 | |
---|
239 | h.setMaxAge(30) |
---|
240 | (headers_to_add, headers_to_remove) = h.getHeaders(expr_context) |
---|
241 | d = self._getCacheControl((headers_to_add, headers_to_remove)) |
---|
242 | self.assertEqual(d['max-age'], '30') |
---|
243 | d = self._parseHeaders(headers_to_add) |
---|
244 | texpires = rfc1123_date(now.timeTime()+30) |
---|
245 | self.assertEqual(d['Expires'], texpires) |
---|
246 | h.setMaxAge(0) |
---|
247 | (headers_to_add, headers_to_remove) = h.getHeaders(expr_context) |
---|
248 | d = self._getCacheControl((headers_to_add, headers_to_remove)) |
---|
249 | self.assertEqual(d['max-age'], '0') |
---|
250 | d = self._parseHeaders(headers_to_add) |
---|
251 | texpires = rfc1123_date(now.timeTime()-10*365*24*3600) |
---|
252 | self.assertEqual(d['Expires'], texpires) |
---|
253 | h.setMaxAge(None) |
---|
254 | (headers_to_add, headers_to_remove) = h.getHeaders(expr_context) |
---|
255 | d = self._getCacheControl((headers_to_add, headers_to_remove)) |
---|
256 | self.failIf(d.has_key('max_age')) |
---|
257 | d = self._parseHeaders(headers_to_add) |
---|
258 | self.failIf(d.has_key('Expires')) |
---|
259 | |
---|
260 | # We've removed the purgeable magic overides from |
---|
261 | # the code so no need to test this anymore |
---|
262 | # |
---|
263 | ## purgeable proxy, set s-maxage |
---|
264 | ##pcs.setCacheConfig('squid') |
---|
265 | #pcs.setProxyPurgeConfig('no-rewrite') |
---|
266 | #self.failUnless(pcs.hasPurgeableProxy()) |
---|
267 | #h.setSMaxAge(30) |
---|
268 | #d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
269 | #self.assertEqual(d['s-maxage'], '30') |
---|
270 | #h.setSMaxAge(None) |
---|
271 | #d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
272 | #self.failIf(d.has_key('s-maxage')) |
---|
273 | |
---|
274 | # We've removed the purgeable magic overides from |
---|
275 | # the code so no need to test this anymore |
---|
276 | # |
---|
277 | ## when no purgeable proxy, don't set s-maxage |
---|
278 | ##pcs.setCacheConfig('zserver') |
---|
279 | #pcs.setProxyPurgeConfig('no-purge') |
---|
280 | #self.failIf(pcs.hasPurgeableProxy()) |
---|
281 | #h.setSMaxAge(30) |
---|
282 | #d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
283 | #self.failIf(d.has_key('s-maxage')) |
---|
284 | #h.setSMaxAge(None) |
---|
285 | #d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
286 | #self.failIf(d.has_key('s-maxage')) |
---|
287 | |
---|
288 | h.setNoCache(True) |
---|
289 | (headers_to_add, headers_to_remove) = h.getHeaders(expr_context) |
---|
290 | d = self._getCacheControl((headers_to_add, headers_to_remove)) |
---|
291 | self.assertEqual(d['no-cache'], None) |
---|
292 | d = self._parseHeaders(headers_to_add) |
---|
293 | self.assertEqual(d['Pragma'], 'no-cache') |
---|
294 | h.setNoCache(False) |
---|
295 | (headers_to_add, headers_to_remove) = h.getHeaders(expr_context) |
---|
296 | d = self._getCacheControl((headers_to_add, headers_to_remove)) |
---|
297 | self.failIf(d.has_key('no-cache')) |
---|
298 | d = self._parseHeaders(headers_to_add) |
---|
299 | self.failIf(d.has_key('Pragma')) |
---|
300 | |
---|
301 | h.setNoStore(True) |
---|
302 | d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
303 | self.assertEqual(d['no-store'], None) |
---|
304 | h.setNoStore(False) |
---|
305 | d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
306 | self.failIf(d.has_key('no-store')) |
---|
307 | |
---|
308 | h.setPrivate(True) # set a new token to make sure we have the header |
---|
309 | h.setMaxAge(0) |
---|
310 | |
---|
311 | h.setPublic(True) |
---|
312 | d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
313 | self.assertEqual(d['public'], None) |
---|
314 | h.setPublic(False) |
---|
315 | d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
316 | self.failIf(d.has_key('public')) |
---|
317 | |
---|
318 | h.setPrivate(True) |
---|
319 | d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
320 | self.assertEqual(d['private'], None) |
---|
321 | h.setPrivate(False) |
---|
322 | d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
323 | self.failIf(d.has_key('public')) |
---|
324 | |
---|
325 | h.setMustRevalidate(True) |
---|
326 | d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
327 | self.assertEqual(d['must-revalidate'], None) |
---|
328 | h.setMustRevalidate(False) |
---|
329 | d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
330 | self.failIf(d.has_key('must-revalidate')) |
---|
331 | |
---|
332 | # We've removed the purgeable magic overides from |
---|
333 | # the code so no need to test this anymore |
---|
334 | # |
---|
335 | ## purgeable proxy, set proxy-revalidate |
---|
336 | ##pcs.setCacheConfig('squid') |
---|
337 | #pcs.setProxyPurgeConfig('no-rewrite') |
---|
338 | #h.setProxyRevalidate(True) |
---|
339 | #d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
340 | #self.assertEqual(d['proxy-revalidate'], None) |
---|
341 | #h.setProxyRevalidate(False) |
---|
342 | #d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
343 | #self.failIf(d.has_key('proxy-revalidate')) |
---|
344 | |
---|
345 | # We've removed the purgeable magic overides from |
---|
346 | # the code so no need to test this anymore |
---|
347 | # |
---|
348 | ## no purgeable proxy, don't set proxy-revalidate |
---|
349 | ##pcs.setCacheConfig('apache') |
---|
350 | #pcs.setProxyPurgeConfig('no-purge') |
---|
351 | #h.setProxyRevalidate(True) |
---|
352 | #d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
353 | #self.assertEqual(d.get('proxy-revalidate',None), None) |
---|
354 | #h.setProxyRevalidate(False) |
---|
355 | #d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
356 | #self.failIf(d.has_key('proxy-revalidate')) |
---|
357 | |
---|
358 | h.setNoTransform(True) |
---|
359 | d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
360 | self.assertEqual(d['no-transform'], None) |
---|
361 | h.setNoTransform(False) |
---|
362 | d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
363 | self.failIf(d.has_key('no-transform')) |
---|
364 | |
---|
365 | h.setPreCheck(5) |
---|
366 | d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
367 | self.assertEqual(d['pre-check'],'5') |
---|
368 | h.setPreCheck(None) |
---|
369 | d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
370 | self.failIf(d.has_key('pre-check')) |
---|
371 | |
---|
372 | h.setPostCheck(5) |
---|
373 | d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
374 | self.assertEqual(d['post-check'],'5') |
---|
375 | h.setPostCheck(None) |
---|
376 | d = self._getCacheControl(h.getHeaders(expr_context)) |
---|
377 | self.failIf(d.has_key('post-check')) |
---|
378 | |
---|
379 | def test_headers_debug(self): |
---|
380 | pcs = self.portal.portal_cache_settings |
---|
381 | h = getattr(pcs.getHeaderSets(), 'my_hs') |
---|
382 | rule = getattr(pcs.getRules(), 'my_rule') |
---|
383 | doc = getattr(self.folder, 'doc') |
---|
384 | member = pcs.getMember() |
---|
385 | expr_context = rule._getExpressionContext(doc.REQUEST, doc, 'document_view', member) |
---|
386 | (headers_to_add, headers_to_remove) = h.getHeaders(expr_context) |
---|
387 | d = self._parseHeaders(headers_to_add) |
---|
388 | self.assertEqual(d['X-Caching-Rule-Id'], rule.getId()) |
---|
389 | self.assertEqual(d['X-Header-Set-Id'], h.getId()) |
---|
390 | |
---|
391 | def test_suite(): |
---|
392 | from unittest import TestSuite, makeSuite |
---|
393 | suite = TestSuite() |
---|
394 | suite.addTest(makeSuite(TestHeaderSet)) |
---|
395 | suite.addTest(makeSuite(TestHeaderSetMethods)) |
---|
396 | suite.addTest(makeSuite(TestHeaderSetOld)) |
---|
397 | return suite |
---|