source: products/quintagroup.catalogupdater/trunk/quintagroup/catalogupdater/tests.py @ 3142

Last change on this file since 3142 was 3142, checked in by vmaksymiv, 13 years ago

pep8 fixes

  • Property svn:eol-style set to native
File size: 5.0 KB
Line 
1import unittest
2import transaction
3
4from zope.interface import Interface
5from zope.component import queryUtility
6from zope.component import provideAdapter
7
8from Products.Five import zcml
9from Products.Five import fiveconfigure
10from Products.CMFCore.utils import getToolByName
11from Products.PloneTestCase import PloneTestCase as ptc
12from Products.PloneTestCase.layer import PloneSite
13from Products.Archetypes.tests.utils import makeContent
14
15from quintagroup.catalogupdater.utility import ICatalogUpdater
16
17try:
18    from plone.indexer.decorator import indexer
19except ImportError:
20    IS_NEW = False
21    from Products.CMFPlone.CatalogTool import registerIndexableAttribute
22else:
23    IS_NEW = True
24
25
26class TestCase(ptc.PloneTestCase):
27    class layer(PloneSite):
28        @classmethod
29        def setUp(cls):
30            import quintagroup.catalogupdater
31            fiveconfigure.debug_mode = True
32            zcml.load_config('configure.zcml', quintagroup.catalogupdater)
33            fiveconfigure.debug_mode = False
34
35ptc.setupPloneSite()
36
37
38class TestUtility(TestCase):
39
40    def afterSetUp(self):
41        self.loginAsPortalOwner()
42        self.my_doc = makeContent(self.portal, portal_type='Document',
43                                  id='my_doc')
44        self.catalog = getToolByName(self.portal, 'portal_catalog')
45        self.logout()
46
47        if IS_NEW:
48            self.addIndexerNew()
49        else:
50            self.addIndexerOld()
51        self.catalog.addColumn('test_column')
52
53    def addIndexerNew(self):
54        @indexer(Interface)
55        def test_column(obj):
56            return obj.id
57        provideAdapter(test_column, name='test_column')
58
59    def addIndexerOld(self):
60        def test_column(obj, portal, **kwargs):
61            return obj.id
62        registerIndexableAttribute("test_column", test_column)
63
64    def testSingleColumnUpdate(self):
65        """ Test is metadata column updated with utility
66        """
67        mydoc = self.catalog.unrestrictedSearchResults(id='my_doc')[0]
68        self.assertFalse(mydoc.test_column == 'my_doc',
69            "'test_column' metadata updated in catalog " \
70            "before utility call: '%s'" % mydoc.test_column)
71
72        cu = queryUtility(ICatalogUpdater, name="catalog_updater")
73        cu.updateMetadata4All(self.catalog, 'test_column')
74
75        mydoc = self.catalog.unrestrictedSearchResults(id='my_doc')[0]
76        self.assertTrue(mydoc.test_column == 'my_doc',
77            "'test_column' metadata has wrong metadata in catalog: " \
78            "'%s'" % mydoc.test_column)
79
80    def testOnlyPointedColumnUpdate(self):
81        """ Update a title property for the my_doc object
82            (without reindexing) then, after utility usage
83            - check is that metadata is leave unchanged.
84        """
85        self.loginAsPortalOwner()
86        self.my_doc.update(title="My document")
87        self.logout()
88
89        mydoc = self.catalog.unrestrictedSearchResults(id='my_doc')[0]
90        self.assertTrue(mydoc.Title == "My document", mydoc.Title)
91
92        self.my_doc.setTitle('New my document')  # catalog not updated
93        cu = queryUtility(ICatalogUpdater, name="catalog_updater")
94        cu.updateMetadata4All(self.catalog, 'test_column')
95
96        mydoc = self.catalog.unrestrictedSearchResults(id='my_doc')[0]
97        self.assertTrue(mydoc.Title == 'My document',
98            "Other metadata updated: Title='%s'" % mydoc.Title)
99
100    def testAllRecordsUpdate(self):
101        """ Test is all records in catalog updated with utility
102        """
103        cu = queryUtility(ICatalogUpdater, name="catalog_updater")
104        cu.updateMetadata4All(self.catalog, 'test_column')
105
106        num_recs = len(self.catalog._catalog.data)
107        allcat = self.catalog.unrestrictedSearchResults(path='/')
108        num_updated = sum([1 for b in allcat if b.test_column == b.id])
109
110        self.assertTrue(num_updated == num_recs, "Only %d records updated, " \
111            "must be - %d" % (num_updated, num_recs))
112
113    def testTransaction(self):
114        """ Test is commited subtransactions
115        """
116        # savepoint patch
117        global sp_commits
118        sp_commits = 1  # Starts from 1 to count last commit
119        orig_trsp = transaction.savepoint
120
121        def dummy_savepoint(*args, **kwargs):
122            global sp_commits
123            sp_commits += 1
124            orig_trsp(*args, **kwargs)
125        transaction.savepoint = dummy_savepoint
126
127        # set threshold for catalog
128        num_recs = len(self.catalog.unrestrictedSearchResults(path='/'))
129        num_subcommits = 3
130        self.catalog.threshold = num_recs / num_subcommits
131
132        cu = queryUtility(ICatalogUpdater, name="catalog_updater")
133        cu.updateMetadata4All(self.catalog, 'test_column')
134
135        self.assertTrue(sp_commits == num_subcommits, "Wrong number of " \
136            "transaction subcommits: actual:%d, must be: %d" % (sp_commits,
137            num_subcommits))
138
139        transaction.savepoint = orig_trsp
140
141
142def test_suite():
143    return unittest.TestSuite([
144        unittest.makeSuite(TestUtility),
145        ])
146
147if __name__ == '__main__':
148    unittest.main(defaultTest='test_suite')
Note: See TracBrowser for help on using the repository browser.