source: products/quintagroup.catalogupdater/trunk/quintagroup/catalogupdater/tests.py @ 2683

Last change on this file since 2683 was 1866, checked in by mylan, 14 years ago

#163: Fix dependency from plone.indexer for tests

  • Property svn:eol-style set to native
File size: 5.0 KB
Line 
1import unittest
2import transaction
3
4from zope.interface import Interface
5from zope.component import queryUtility
6from zope.component import provideAdapter
7
8from Testing import ZopeTestCase as ztc
9
10from Products.Five import zcml
11from Products.Five import fiveconfigure
12from Products.CMFCore.utils import getToolByName
13from Products.PloneTestCase import PloneTestCase as ptc
14from Products.PloneTestCase.layer import PloneSite
15from Products.Archetypes.tests.utils import makeContent
16
17from quintagroup.catalogupdater.utility import ICatalogUpdater
18
19try:
20    from plone.indexer.decorator import indexer
21except ImportError:
22    IS_NEW = False
23    from Products.CMFPlone.CatalogTool import registerIndexableAttribute
24else:
25    IS_NEW = True
26
27
28class TestCase(ptc.PloneTestCase):
29    class layer(PloneSite):
30        @classmethod
31        def setUp(cls):
32            import quintagroup.catalogupdater
33            fiveconfigure.debug_mode = True
34            zcml.load_config('configure.zcml', quintagroup.catalogupdater)
35            fiveconfigure.debug_mode = False
36
37ptc.setupPloneSite()
38
39class TestUtility(TestCase):
40
41    def afterSetUp(self):
42        self.loginAsPortalOwner()
43        self.my_doc = makeContent(self.portal, portal_type='Document', id='my_doc')
44        self.catalog = getToolByName(self.portal, 'portal_catalog')
45        self.logout()
46
47        if IS_NEW:
48            self.addIndexerNew()
49        else:
50            self.addIndexerOld()
51        self.catalog.addColumn('test_column')
52
53
54    def addIndexerNew(self):
55        @indexer(Interface)
56        def test_column(obj):
57            return obj.id
58        provideAdapter(test_column, name='test_column')
59
60
61    def addIndexerOld(self):
62        def test_column(obj, portal, **kwargs):
63            return obj.id
64        registerIndexableAttribute("test_column", test_column)
65
66
67    def testSingleColumnUpdate(self):
68        """ Test is metadata column updated with utility
69        """
70        mydoc = self.catalog.unrestrictedSearchResults(id='my_doc')[0]
71        self.assertFalse(mydoc.test_column == 'my_doc',
72            "'test_column' metadata updated in catalog " \
73            "before utility call: '%s'" % mydoc.test_column)
74
75        cu = queryUtility(ICatalogUpdater, name="catalog_updater")
76        cu.updateMetadata4All(self.catalog, 'test_column')
77
78        mydoc = self.catalog.unrestrictedSearchResults(id='my_doc')[0]
79        self.assertTrue(mydoc.test_column == 'my_doc',
80            "'test_column' metadata has wrong metadata in catalog: " \
81            "'%s'" % mydoc.test_column)
82
83    def testOnlyPointedColumnUpdate(self):
84        """ Update a title property for the my_doc object
85            (without reindexing) then, after utility usage
86            - check is that metadata is leave unchanged.
87        """
88        self.loginAsPortalOwner()
89        self.my_doc.update(title="My document")
90        self.logout()
91
92        mydoc = self.catalog.unrestrictedSearchResults(id='my_doc')[0]
93        self.assertTrue(mydoc.Title == "My document", mydoc.Title)
94
95        self.my_doc.setTitle('New my document') # catalog not updated
96        cu = queryUtility(ICatalogUpdater, name="catalog_updater")
97        cu.updateMetadata4All(self.catalog, 'test_column')
98
99        mydoc = self.catalog.unrestrictedSearchResults(id='my_doc')[0]
100        self.assertTrue(mydoc.Title == 'My document',
101            "Other metadata updated: Title='%s'" % mydoc.Title)
102
103
104    def testAllRecordsUpdate(self):
105        """ Test is all records in catalog updated with utility
106        """
107        cu = queryUtility(ICatalogUpdater, name="catalog_updater")
108        cu.updateMetadata4All(self.catalog, 'test_column')
109
110        num_recs = len(self.catalog._catalog.data)
111        allcat = self.catalog.unrestrictedSearchResults(path='/')
112        num_updated = sum([1 for b in allcat if b.test_column==b.id])
113
114        self.assertTrue(num_updated == num_recs, "Only %d records updated, " \
115            "must be - %d" % (num_updated, num_recs))
116
117
118    def testTransaction(self):
119        """ Test is commited subtransactions
120        """
121        # savepoint patch
122        global sp_commits
123        sp_commits = 1 # Starts from 1 to count last commit
124        orig_trsp = transaction.savepoint
125        def dummy_savepoint(*args, **kwargs):
126            global sp_commits
127            sp_commits += 1
128            orig_trsp(*args, **kwargs)
129        transaction.savepoint = dummy_savepoint
130
131        # set threshold for catalog
132        num_recs = len(self.catalog.unrestrictedSearchResults(path='/'))
133        num_subcommits = 3
134        self.catalog.threshold = num_recs/num_subcommits
135
136        cu = queryUtility(ICatalogUpdater, name="catalog_updater")
137        cu.updateMetadata4All(self.catalog, 'test_column')
138
139        self.assertTrue(sp_commits == num_subcommits,
140            "Wrong number of transaction subcommits: actual:%d, must be: %d" % (
141            sp_commits, num_subcommits))
142
143        transaction.savepoint = orig_trsp
144       
145
146
147def test_suite():
148    return unittest.TestSuite([
149        unittest.makeSuite(TestUtility),
150        ])
151
152if __name__ == '__main__':
153    unittest.main(defaultTest='test_suite')
Note: See TracBrowser for help on using the repository browser.