1 | import unittest |
---|
2 | import transaction |
---|
3 | |
---|
4 | from zope.interface import Interface |
---|
5 | from zope.component import queryUtility |
---|
6 | from zope.component import provideAdapter |
---|
7 | |
---|
8 | from Testing import ZopeTestCase as ztc |
---|
9 | |
---|
10 | from Products.Five import zcml |
---|
11 | from Products.Five import fiveconfigure |
---|
12 | from Products.CMFCore.utils import getToolByName |
---|
13 | from Products.PloneTestCase import PloneTestCase as ptc |
---|
14 | from Products.PloneTestCase.layer import PloneSite |
---|
15 | from Products.Archetypes.tests.utils import makeContent |
---|
16 | |
---|
17 | from quintagroup.catalogupdater.utility import ICatalogUpdater |
---|
18 | |
---|
19 | try: |
---|
20 | from plone.indexer.decorator import indexer |
---|
21 | except ImportError: |
---|
22 | IS_NEW = False |
---|
23 | from Products.CMFPlone.CatalogTool import registerIndexableAttribute |
---|
24 | else: |
---|
25 | IS_NEW = True |
---|
26 | |
---|
27 | |
---|
28 | class TestCase(ptc.PloneTestCase): |
---|
29 | class layer(PloneSite): |
---|
30 | @classmethod |
---|
31 | def setUp(cls): |
---|
32 | import quintagroup.catalogupdater |
---|
33 | fiveconfigure.debug_mode = True |
---|
34 | zcml.load_config('configure.zcml', quintagroup.catalogupdater) |
---|
35 | fiveconfigure.debug_mode = False |
---|
36 | |
---|
37 | ptc.setupPloneSite() |
---|
38 | |
---|
39 | class TestUtility(TestCase): |
---|
40 | |
---|
41 | def afterSetUp(self): |
---|
42 | self.loginAsPortalOwner() |
---|
43 | self.my_doc = makeContent(self.portal, portal_type='Document', id='my_doc') |
---|
44 | self.catalog = getToolByName(self.portal, 'portal_catalog') |
---|
45 | self.logout() |
---|
46 | |
---|
47 | if IS_NEW: |
---|
48 | self.addIndexerNew() |
---|
49 | else: |
---|
50 | self.addIndexerOld() |
---|
51 | self.catalog.addColumn('test_column') |
---|
52 | |
---|
53 | |
---|
54 | def addIndexerNew(self): |
---|
55 | @indexer(Interface) |
---|
56 | def test_column(obj): |
---|
57 | return obj.id |
---|
58 | provideAdapter(test_column, name='test_column') |
---|
59 | |
---|
60 | |
---|
61 | def addIndexerOld(self): |
---|
62 | def test_column(obj, portal, **kwargs): |
---|
63 | return obj.id |
---|
64 | registerIndexableAttribute("test_column", test_column) |
---|
65 | |
---|
66 | |
---|
67 | def testSingleColumnUpdate(self): |
---|
68 | """ Test is metadata column updated with utility |
---|
69 | """ |
---|
70 | mydoc = self.catalog.unrestrictedSearchResults(id='my_doc')[0] |
---|
71 | self.assertFalse(mydoc.test_column == 'my_doc', |
---|
72 | "'test_column' metadata updated in catalog " \ |
---|
73 | "before utility call: '%s'" % mydoc.test_column) |
---|
74 | |
---|
75 | cu = queryUtility(ICatalogUpdater, name="catalog_updater") |
---|
76 | cu.updateMetadata4All(self.catalog, 'test_column') |
---|
77 | |
---|
78 | mydoc = self.catalog.unrestrictedSearchResults(id='my_doc')[0] |
---|
79 | self.assertTrue(mydoc.test_column == 'my_doc', |
---|
80 | "'test_column' metadata has wrong metadata in catalog: " \ |
---|
81 | "'%s'" % mydoc.test_column) |
---|
82 | |
---|
83 | def testOnlyPointedColumnUpdate(self): |
---|
84 | """ Update a title property for the my_doc object |
---|
85 | (without reindexing) then, after utility usage |
---|
86 | - check is that metadata is leave unchanged. |
---|
87 | """ |
---|
88 | self.loginAsPortalOwner() |
---|
89 | self.my_doc.update(title="My document") |
---|
90 | self.logout() |
---|
91 | |
---|
92 | mydoc = self.catalog.unrestrictedSearchResults(id='my_doc')[0] |
---|
93 | self.assertTrue(mydoc.Title == "My document", mydoc.Title) |
---|
94 | |
---|
95 | self.my_doc.setTitle('New my document') # catalog not updated |
---|
96 | cu = queryUtility(ICatalogUpdater, name="catalog_updater") |
---|
97 | cu.updateMetadata4All(self.catalog, 'test_column') |
---|
98 | |
---|
99 | mydoc = self.catalog.unrestrictedSearchResults(id='my_doc')[0] |
---|
100 | self.assertTrue(mydoc.Title == 'My document', |
---|
101 | "Other metadata updated: Title='%s'" % mydoc.Title) |
---|
102 | |
---|
103 | |
---|
104 | def testAllRecordsUpdate(self): |
---|
105 | """ Test is all records in catalog updated with utility |
---|
106 | """ |
---|
107 | cu = queryUtility(ICatalogUpdater, name="catalog_updater") |
---|
108 | cu.updateMetadata4All(self.catalog, 'test_column') |
---|
109 | |
---|
110 | num_recs = len(self.catalog._catalog.data) |
---|
111 | allcat = self.catalog.unrestrictedSearchResults(path='/') |
---|
112 | num_updated = sum([1 for b in allcat if b.test_column==b.id]) |
---|
113 | |
---|
114 | self.assertTrue(num_updated == num_recs, "Only %d records updated, " \ |
---|
115 | "must be - %d" % (num_updated, num_recs)) |
---|
116 | |
---|
117 | |
---|
118 | def testTransaction(self): |
---|
119 | """ Test is commited subtransactions |
---|
120 | """ |
---|
121 | # savepoint patch |
---|
122 | global sp_commits |
---|
123 | sp_commits = 1 # Starts from 1 to count last commit |
---|
124 | orig_trsp = transaction.savepoint |
---|
125 | def dummy_savepoint(*args, **kwargs): |
---|
126 | global sp_commits |
---|
127 | sp_commits += 1 |
---|
128 | orig_trsp(*args, **kwargs) |
---|
129 | transaction.savepoint = dummy_savepoint |
---|
130 | |
---|
131 | # set threshold for catalog |
---|
132 | num_recs = len(self.catalog.unrestrictedSearchResults(path='/')) |
---|
133 | num_subcommits = 3 |
---|
134 | self.catalog.threshold = num_recs/num_subcommits |
---|
135 | |
---|
136 | cu = queryUtility(ICatalogUpdater, name="catalog_updater") |
---|
137 | cu.updateMetadata4All(self.catalog, 'test_column') |
---|
138 | |
---|
139 | self.assertTrue(sp_commits == num_subcommits, |
---|
140 | "Wrong number of transaction subcommits: actual:%d, must be: %d" % ( |
---|
141 | sp_commits, num_subcommits)) |
---|
142 | |
---|
143 | transaction.savepoint = orig_trsp |
---|
144 | |
---|
145 | |
---|
146 | |
---|
147 | def test_suite(): |
---|
148 | return unittest.TestSuite([ |
---|
149 | unittest.makeSuite(TestUtility), |
---|
150 | ]) |
---|
151 | |
---|
152 | if __name__ == '__main__': |
---|
153 | unittest.main(defaultTest='test_suite') |
---|