source: products/quintagroup.transmogrifier/trunk/quintagroup/transmogrifier/tests.py @ 1588

Last change on this file since 1588 was 1588, checked in by mylan, 10 years ago

Added tests of blacklists migration in importing/exporting portlets

File size: 38.7 KB
Line 
1import unittest
2import pprint
3import os
4
5from zope.testing import doctest, cleanup
6from zope.component import provideUtility, provideAdapter, adapts
7from zope.interface import classProvides, implements
8
9from collective.transmogrifier.interfaces import ISectionBlueprint, ISection
10from collective.transmogrifier.tests import tearDown
11from collective.transmogrifier.sections.tests import sectionsSetUp
12from collective.transmogrifier.sections.tests import SampleSource
13
14from Products.Five import zcml
15
16import quintagroup.transmogrifier
17from quintagroup.transmogrifier.xslt import stylesheet_registry
18
19class DataPrinter(object):
20    classProvides(ISectionBlueprint)
21    implements(ISection)
22
23    def __init__(self, transmogrifier, name, options, previous):
24        self.previous = previous
25        self.printkey = [i.strip() for i in options['print'].splitlines() if i.strip()]
26        if 'prettyprint' in options:
27            self.pprint = pprint.PrettyPrinter().pprint
28
29    def __iter__(self):
30        for item in self.previous:
31            if self.printkey:
32                data = item
33                for i in self.printkey:
34                    if i in data:
35                        data = data[i]
36                    else:
37                        data = None
38                        break
39                if data is not None:
40                    if hasattr(self, 'pprint'):
41                        self.pprint(data)
42                    else:
43                        print data
44            yield item
45
46ctSectionsSetup = sectionsSetUp
47def sectionsSetUp(test):
48    ctSectionsSetup(test)
49    import Products.Five
50    import Products.GenericSetup
51    import zope.annotation
52    zcml.load_config('meta.zcml', Products.Five)
53    zcml.load_config('permissions.zcml', Products.Five)
54    zcml.load_config('meta.zcml', Products.GenericSetup)
55    zcml.load_config('configure.zcml', zope.annotation)
56    zcml.load_config('configure.zcml', quintagroup.transmogrifier)
57
58    from Products.CMFCore import utils
59    def getToolByName(context, tool_id, default=None):
60        return context
61    utils.getToolByName = getToolByName
62
63    import Acquisition
64    def aq_base(obj):
65        return obj
66    Acquisition.aq_base = aq_base
67
68    provideUtility(DataPrinter,
69        name=u'quintagroup.transmogrifier.tests.dataprinter')
70
71def siteWalkerSetUp(test):
72    sectionsSetUp(test)
73
74    from Products.CMFCore.interfaces import IFolderish
75    from Products.Archetypes.interfaces import IBaseFolder
76
77    class MockContent(object):
78        path = ()
79
80        def getPhysicalPath(self):
81            return self.path
82
83        def getPortalTypeName(self):
84            return self.__class__.__name__
85
86    class Document(MockContent):
87        pass
88
89    class Folder(MockContent, dict):
90        implements(IBaseFolder)
91
92        contentItems = dict.items
93        contentValues = dict.values
94
95    class MockPortal(MockContent, dict):
96        implements(IFolderish)
97
98        contentItems = dict.items
99        contentValues = dict.values
100
101    portal = MockPortal()
102
103    test.globs['plone'] = portal
104    test.globs['transmogrifier'].context = test.globs['plone']
105
106    portal.path = ('', 'plone')
107    portal['document1'] = Document()
108    portal['document1'].path = ('', 'plone', 'document1')
109    portal['folder1'] = Folder()
110    portal['folder1'].path = ('', 'plone', 'folder1')
111    portal['folder1']['document2'] = Document()
112    portal['folder1']['document2'].path = ('', 'plone', 'folder1', 'document2')
113    portal['folder1']['folder2'] = Folder()
114    portal['folder1']['folder2'].path = ('', 'plone', 'folder1', 'folder2')
115    portal['document3'] = Document()
116    portal['document3'].path = ('', 'plone', 'document3')
117
118def manifestSetUp(test):
119    sectionsSetUp(test)
120
121    root = dict(
122        _path='',
123        _entries=(
124            ('news', 'Folder'),
125            ('events', 'Folder'),
126            ('front-page', 'Document'),
127            ('only-in-manifest', 'Document')
128        )
129    )
130
131    news = dict(
132        _path='news',
133        _entries=(
134            ('aggregator', 'Topic'),
135            ('once-more', 'File')
136        )
137    )
138
139    aggregator = dict(
140        _path='news/aggregator',
141    )
142
143    events = dict(
144        _path='events'
145    )
146
147    front_page = dict(
148        _path='front-page',
149    )
150
151    members = dict(
152        _path='Members'
153    )
154
155    class ManifestSource(SampleSource):
156        classProvides(ISectionBlueprint)
157        implements(ISection)
158
159        def __init__(self, *args, **kw):
160            super(ManifestSource, self).__init__(*args, **kw)
161            self.sample = (root, dict(), news, aggregator, events, front_page, members)
162
163    provideUtility(ManifestSource,
164        name=u'quintagroup.transmogrifier.tests.manifestsource')
165
166def marshallSetUp(test):
167    sectionsSetUp(test)
168
169    from Products.Archetypes.interfaces import IBaseObject
170
171    class Field(object):
172        def __init__(self, name):
173            self.name = name
174            self.obj = None
175
176        def getAccessor(self, obj):
177            self.obj = obj
178            return self
179
180        def getMutator(self, obj):
181            self.obj = obj
182            return self
183
184        def __call__(self, value=None):
185            if value is None:
186                return self.obj.fields[self.name]
187            else:
188                self.obj.fields[self.name] = value
189
190    class MockBase(object):
191        def __init__(self, effective=None):
192            self.fields = {
193                'effectiveDate': effective,
194                'modification_date': 'changed',
195            }
196
197        def checkCreationFlag(self):
198            return True
199
200        def unmarkCreationFlag(self):
201            pass
202
203        def at_post_create_script(self):
204            pass
205
206        def at_post_edit_script(self):
207            pass
208
209        indexed = ()
210        def indexObject(self):
211            self.indexed += (self._last_path,)
212
213        def getField(self, fname):
214            return Field(fname)
215
216    class MockCriterion(MockBase):
217        implements(IBaseObject)
218        _last_path = None
219        indexed = ()
220        def indexObject(self):
221            self.indexed += (self._last_path,)
222
223    class MockPortal(MockBase):
224        implements(IBaseObject)
225
226        criterion = MockCriterion('not changed')
227
228        _last_path = None
229        def unrestrictedTraverse(self, path, default):
230            if path[0] == '/':
231                return default # path is absolute
232            if isinstance(path, unicode):
233                return default
234            if path == 'not/existing/bar':
235                return default
236            if path == 'topic/criterion':
237                self._last_path = path
238                self.criterion._last_path = path
239                return self.criterion
240            if path.endswith('/notatcontent'):
241                return object()
242            self._last_path = path
243            return self
244
245        def getId(self):
246            return "plone"
247
248        indexed = ()
249        def indexObject(self):
250            self.indexed += (self._last_path,)
251
252        updatedRoles = False
253        def updateRoleMappings(self):
254            self.updatedRoles = True
255
256        reindexed = False
257        def reindexIndex(self, name, extra=None):
258            self.reindexed = True
259
260        marshalled = ()
261        def marshall(self, instance, **kwargs):
262            self.marshalled += ((self._last_path, kwargs.get('atns_exclude')),)
263            # Marshall often fails to export topic criteria
264            if isinstance(instance, MockCriterion):
265                return None, None, None
266            else:
267                return None, None, "marshalled"
268
269        demarshalled = ()
270        def demarshall(self, instance, data):
271            # we don't need to test Marshall product, only check if we call it's components
272            self.demarshalled += (self._last_path,)
273
274    portal = MockPortal()
275    test.globs['plone'] = portal
276    test.globs['transmogrifier'].context = test.globs['plone']
277
278    from Products.Marshall import registry
279    def getComponent(name):
280        return portal
281    registry.getComponent = getComponent
282
283    class MarshallSource(SampleSource):
284        classProvides(ISectionBlueprint)
285        implements(ISection)
286
287        def __init__(self, *args, **kw):
288            super(MarshallSource, self).__init__(*args, **kw)
289            self.sample = (
290                dict(),
291                dict(_path='spam/eggs/foo', _excluded_fields=('file', 'image')),
292                dict(_path='topic/criterion'),
293                dict(_path='not/existing/bar'),
294                dict(_path='spam/eggs/notatcontent', 
295                     _files=dict(marshall=dict(data='xml', name='.marshall.xml'))),
296            )
297    provideUtility(MarshallSource,
298        name=u'quintagroup.transmogrifier.tests.marshallsource')
299
300def propertyManagerSetUp(test):
301    sectionsSetUp(test)
302
303    from OFS.interfaces import IPropertyManager
304
305    class MockPortal(object):
306        implements(IPropertyManager)
307
308        _properties = (
309            {'id':'title', 'type': 'string', 'mode': 'w'},
310            {'id':'description', 'type': 'string', 'mode': 'w'},
311            {'id':'encoding', 'type': 'string', 'mode': 'w'},
312            {'id':'author', 'type': 'string', 'mode': 'w'}
313        )
314
315        _last_path = None
316        def unrestrictedTraverse(self, path, default):
317            if path[0] == '/':
318                return default # path is absolute
319            if isinstance(path, unicode):
320                return default
321            if path == 'not/existing/bar':
322                return default
323            if path.endswith('/notatcontent'):
324                return object()
325            self._last_path = path
326            return self
327
328        def _propertyMap(self):
329            return self._properties
330
331        def getProperty(self, id, d=None):
332            return 'value'
333
334        def propdict(self):
335            d={}
336            for p in self._properties:
337                d[p['id']]=p
338            return d
339
340        updated = ()
341        def _updateProperty(self, id, value):
342            self.updated += ((self._last_path, id, value))
343
344    portal = MockPortal()
345    test.globs['plone'] = portal
346    test.globs['transmogrifier'].context = test.globs['plone']
347
348    class PropertyManagerSource(SampleSource):
349        classProvides(ISectionBlueprint)
350        implements(ISection)
351
352        def __init__(self, *args, **kw):
353            super(PropertyManagerSource, self).__init__(*args, **kw)
354            self.sample = (
355                dict(),
356                dict(_path='not/existing/bar'),
357                dict(_path='spam/eggs/notatcontent'),
358                dict(_path='spam/eggs/foo', _excluded_properties=('encoding',)),
359            )
360
361    provideUtility(PropertyManagerSource,
362        name=u'quintagroup.transmogrifier.tests.propertymanagersource')
363
364def commentsSetUp(test):
365    sectionsSetUp(test)
366
367    class MockDiscussionItem(object):
368        creator = 'creator'
369        modified = 'date'
370
371        def __init__(self, reply, text=""):
372            self.in_reply_to = reply
373            self.text = text
374
375        def __of__(self, container):
376            return self
377
378        def getMetadataHeaders(self):
379            return []
380
381        def setMetadata(self, headers):
382            pass
383
384        def Creator(self):
385            return self.creator
386
387        def addCreator(self, creator):
388            self.creator = creator
389
390        def ModificationDate(self):
391            return self.modified
392
393        def setModificationDate(self, date):
394            self.modified = date
395
396        def setFormat(self, format):
397            pass
398
399        def _edit(self, text=None):
400            self.text = text
401
402        def indexObject(self):
403            pass
404
405        def __repr__(self):
406            return "<DicussionItem %s %s %s %s>" % (
407                self.Creator(),
408                self.ModificationDate(),
409                self.in_reply_to,
410                self.text
411                )
412
413    from Products.CMFDefault import DiscussionItem
414    DiscussionItem.DiscussionItem = MockDiscussionItem
415
416    class MockPortal(object):
417        _discussion = {
418            '1': MockDiscussionItem(None, 'comment to content'),
419            '2': MockDiscussionItem('1', 'reply to first comment'),
420            '3': MockDiscussionItem(None, 'other comment to content')
421        }
422        _container = {}
423
424        @property
425        def talkback(self):
426            return self
427
428        def objectItems(self):
429            l = self._discussion.items()
430            l.sort(key=lambda x: int(x[0]))
431            return l
432
433        def unrestrictedTraverse(self, path, default):
434            if path[0] == '/':
435                return default # path is absolute
436            if isinstance(path, unicode):
437                return default
438            if path == 'not/existing/bar':
439                return default
440            if path.endswith('/notdiscussable'):
441                return object()
442            return self
443
444        def getDiscussionFor(self, obj):
445            return self
446
447    portal = MockPortal()
448    test.globs['plone'] = portal
449    test.globs['transmogrifier'].context = test.globs['plone']
450
451    class CommentsSource(SampleSource):
452        classProvides(ISectionBlueprint)
453        implements(ISection)
454
455        def __init__(self, *args, **kw):
456            super(CommentsSource, self).__init__(*args, **kw)
457            self.sample = (
458                dict(),
459                dict(_path='not/existing/bar'),
460                dict(_path='spam/eggs/notdiscussable'),
461                dict(_path='spam/eggs/foo'),
462            )
463
464    provideUtility(CommentsSource,
465        name=u'quintagroup.transmogrifier.tests.commentssource')
466
467
468def dataCorrectorSetUp(test):
469    sectionsSetUp(test)
470
471    class MockPortal(object):
472        def unrestrictedTraverse(self, path, default):
473            if path[0] == '/':
474                return default # path is absolute
475            if isinstance(path, unicode):
476                return default
477            if path == 'not/existing/bar':
478                return default
479            if path.endswith('/notadaptable'):
480                return object()
481            return self
482
483    portal = MockPortal()
484    test.globs['plone'] = portal
485    test.globs['transmogrifier'].context = test.globs['plone']
486
487    from collective.transmogrifier.interfaces import ITransmogrifier
488    from quintagroup.transmogrifier.interfaces import IExportDataCorrector, \
489        IImportDataCorrector
490
491    class MockExportAdapter(object):
492        implements(IExportDataCorrector)
493        adapts(MockPortal, ITransmogrifier)
494        def __init__(self, context, transmogrifier):
495            self.context = context
496            self.transmogrifier = transmogrifier
497
498        def __call__(self, data):
499            return "modified export data"
500
501    provideAdapter(MockExportAdapter, name="marshall")
502
503    class MockImportAdapter(object):
504        implements(IImportDataCorrector)
505        adapts(MockPortal, ITransmogrifier)
506        def __init__(self, context, transmogrifier):
507            self.context = context
508            self.transmogrifier = transmogrifier
509
510        def __call__(self, data):
511            return "modified import data"
512
513    provideAdapter(MockImportAdapter, name="manifest")
514
515    class DataCorrectorSource(SampleSource):
516        classProvides(ISectionBlueprint)
517        implements(ISection)
518
519        def __init__(self, *args, **kw):
520            super(DataCorrectorSource, self).__init__(*args, **kw)
521            self.sample = (
522                dict(),
523                dict(_files=dict(marshall="item hasn't path")),
524                dict(_path='spam/eggs/foo'),
525                dict(_path='not/existing/bar'),
526                dict(_path='spam/eggs/notadaptable', _files=dict(marshall="object isn't adaptable")),
527                dict(_path='spam/eggs/foo',
528                     _files=dict(marshall='marshall data', unchanged='this must be unchanged')),
529                dict(_path='spam/eggs/foo',
530                     _files=dict(manifest='manifest data', unchanged='this must be unchanged')),
531            )
532
533    provideUtility(DataCorrectorSource,
534        name=u'quintagroup.transmogrifier.tests.datacorrectorsource')
535
536def writerSetUp(test):
537    sectionsSetUp(test)
538
539    class MockExportContext(object):
540        def __init__( self, *args, **kwargs):
541            self.args = args
542            for k, v in kwargs.items():
543                setattr(self, k, v)
544            self._wrote = []
545
546        def __getitem__(self, name):
547            return getattr(self, name, None)
548
549        def __contains__(self, name):
550            return hasattr(self, name)
551
552        def writeDataFile(self, filename, text, content_type, subdir=None):
553            filename = '%s/%s' % (subdir, filename)
554            self._wrote.append((filename, text, content_type))
555
556        def __repr__(self):
557            s = " ".join(["%s=%s" % (k,v) for k,v in self.__dict__.items()])
558            return "<%s %s>" % (self.__class__.__name__, s)
559
560
561    from Products.GenericSetup import context
562
563    context.DirectoryExportContext = type('Directory', (MockExportContext,), {})
564    context.TarballExportContext = type('Tarball', (MockExportContext,), {})
565    context.SnapshotExportContext = type('Snapshot', (MockExportContext,), {})
566
567    class WriterSource(SampleSource):
568        classProvides(ISectionBlueprint)
569        implements(ISection)
570
571        def __init__(self, *args, **kw):
572            super(WriterSource, self).__init__(*args, **kw)
573            self.sample = (
574                dict(_path='spam/eggs/foo'),
575                dict(_files=dict(mock=dict(name='.first.xml', data='some data'))),
576                dict(_path='spam/eggs/foo',
577                     _files=dict(mock=dict(name='.first.xml', data='some data'),
578                                 other=dict(name='.second.xml', data='other data'))),
579                dict(_path='other/path',
580                     _files=dict(mock=dict(name='.third.xml', data='some data')))
581            )
582
583    provideUtility(WriterSource,
584        name=u'quintagroup.transmogrifier.tests.writersource')
585
586    class SingleItemSource(SampleSource):
587        classProvides(ISectionBlueprint)
588        implements(ISection)
589
590        def __init__(self, *args, **kw):
591            super(SingleItemSource, self).__init__(*args, **kw)
592            self.sample = (
593                dict(_path='', _files={}),
594            )
595
596    provideUtility(SingleItemSource,
597        name=u"quintagroup.transmogrifier.tests.singleitemsource")
598
599def readerSetUp(test):
600    sectionsSetUp(test)
601
602    class MockImportContext(object):
603
604        _dirs = [
605            'structure',
606            'structure/news', 'structure/news/recent',
607            'structure/pages', 'structure/pages/front-page',
608        ]
609        _files = [
610            'structure/.properties.xml',
611            'structure/other.file',
612            'structure/news/.objects.xml',
613            'structure/pages/.objects.xml',
614            'structure/pages/front-page/.marshall.xml',
615            'structure/pages/front-page/.comments.xml',
616        ]
617
618        def __init__( self, *args, **kwargs):
619            self.args = args
620            for k, v in kwargs.items():
621                setattr(self, k, v)
622
623        def __repr__(self):
624            s = " ".join(["%s=%s" % (k,v) for k,v in self.__dict__.items()])
625            return "<%s %s>" % (self.__class__.__name__, s)
626
627        def readDataFile(self, filename, subdir=None):
628            return 'some data'
629
630        def isDirectory(self, path):
631            return path == '' or path in self._dirs
632
633        def listDirectory(self, path):
634            all_names = self._dirs + self._files
635            if path:
636                pfx_len = len(path)+1
637            else:
638                pfx_len = 0
639            names = []
640            for name in all_names:
641                if name == path:
642                    continue
643                if not name.startswith(path):
644                    continue
645                name = name[pfx_len:]
646                if '/' in name:
647                    continue
648                names.append(name)
649            return names
650
651    from Products.GenericSetup import context
652
653    context.DirectoryImportContext = type('Directory', (MockImportContext,),
654        {'listDirectory': lambda self, path: []})
655    context.TarballImportContext = type('Tarball', (MockImportContext,), {})
656    context.SnapshotImportContext = type('Snapshot', (MockImportContext,),
657        {'listDirectory': lambda self, path: []})
658
659def substitutionSetUp(test):
660    sectionsSetUp(test)
661
662    class SubstitutionSource(SampleSource):
663        classProvides(ISectionBlueprint)
664        implements(ISection)
665
666        def __init__(self, *args, **kw):
667            super(SubstitutionSource, self).__init__(*args, **kw)
668            self.sample = (
669                {},
670                {'_type': 'Blog'},
671                {'_type': 'PloneFormMailer'},
672                {'_type': 'Document'},
673            )
674
675    provideUtility(SubstitutionSource,
676        name=u'quintagroup.transmogrifier.tests.substitutionsource')
677
678class MetaDirectivesTests(unittest.TestCase):
679    def setUp(self):
680        zcml.load_config('meta.zcml', quintagroup.transmogrifier)
681
682    def tearDown(self):
683        stylesheet_registry.clear()
684        cleanup.cleanUp()
685
686    def testEmptyZCML(self):
687        zcml.load_string('''\
688<configure xmlns:transmogrifier="http://namespaces.plone.org/transmogrifier">
689</configure>''')
690        self.assertEqual(stylesheet_registry.listStylesheetNames(), ())
691
692    def testConfigZCML(self):
693        zcml.load_string('''\
694<configure
695    xmlns:transmogrifier="http://namespaces.plone.org/transmogrifier">
696<transmogrifier:stylesheet
697    source="marshall"
698    from="Blog"
699    to="Weblog"
700    file="blog.xsl"
701    />
702</configure>''')
703        self.assertEqual(stylesheet_registry.listStylesheetNames(),
704                         (u'marshall:Blog:Weblog',))
705        path = os.path.split(quintagroup.transmogrifier.__file__)[0]
706        self.assertEqual(
707            stylesheet_registry.getStylesheet('marshall', 'Blog', 'Weblog'),
708            dict(from_=u'Blog',
709                 to=u'Weblog',
710                 file=os.path.join(path, 'blog.xsl'))
711        )
712
713    def testMultipleZCML(self):
714        zcml.load_string('''\
715<configure
716    xmlns:transmogrifier="http://namespaces.plone.org/transmogrifier">
717<transmogrifier:stylesheet
718    source="marshall"
719    from="Blog"
720    to="Weblog"
721    file="blog.xsl"
722    />
723<transmogrifier:stylesheet
724    source="propertymanager"
725    from="BlogEntry"
726    to="WeblogEntry"
727    file="blogentry.xsl"
728    />
729</configure>''')
730        self.assertEqual(stylesheet_registry.listStylesheetNames(),
731                         (u'marshall:Blog:Weblog', u'propertymanager:BlogEntry:WeblogEntry'))
732
733def xsltSetUp(test):
734    sectionsSetUp(test)
735
736    class XSLTSource(SampleSource):
737        classProvides(ISectionBlueprint)
738        implements(ISection)
739
740        def __init__(self, *args, **kw):
741            super(XSLTSource, self).__init__(*args, **kw)
742            self.sample = (
743                {},
744                {'_type': 'Weblog'},
745                {'_old_type': 'Blog'},
746                {'_old_type': 'Blog',
747                 '_type': 'Weblog',
748                 '_files': {'manifest': {'data': 'xml', 'name': 'manifest.xml'}}},
749                {'_old_type': 'Blog',
750                 '_type': 'Weblog',
751                 '_files': {'marshall': {'data': 'xml', 'name': 'marshall.xml'}}},
752            )
753
754    provideUtility(XSLTSource,
755        name=u'quintagroup.transmogrifier.tests.xsltsource')
756
757    from quintagroup.transmogrifier.xslt import XSLTSection, stylesheet_registry
758
759    XSLTSection.applyTransformations = lambda self, xml, xslt: 'transformed xml'
760    test.globs['stylesheet_registry'] = stylesheet_registry
761
762def binarySetUp(test):
763    sectionsSetUp(test)
764
765    from Products.Archetypes.interfaces import IBaseObject
766
767    class MockPortal(object):
768        implements(IBaseObject)
769
770        _last_path = None
771        def unrestrictedTraverse(self, path, default):
772            if path[0] == '/':
773                return default # path is absolute
774            if isinstance(path, unicode):
775                return default
776            if path == 'not/existing/bar':
777                return default
778            if path.endswith('/notatcontent'):
779                return object()
780            self._last_path = path
781            return self
782
783        fields = ['id', 'title', 'file', 'image']
784
785        def Schema(self):
786            return dict.fromkeys(self.fields)
787
788        def isBinary(self, field):
789            return field in ('file', 'image')
790
791        _current_field = None
792        def getField(self, field):
793            self._current_field = field
794            return self
795
796        def getBaseUnit(self, obj):
797            return self
798
799        def getFilename(self):
800            if self._current_field == 'file':
801                return 'archive.tar.gz'
802            else:
803                return ''
804
805        def getContentType(self):
806            if self._current_field == 'file':
807                return 'application/x-tar'
808            else:
809                return 'image/png'
810
811        def getRaw(self):
812            if self._current_field == 'file':
813                return 'binary data'
814            else:
815                return 'image'
816
817        def getMutator(self, obj):
818            return self
819
820        updated = ()
821        def __call__(self, data, filename=None, mimetype=None):
822            self.updated += (filename, mimetype, data)
823
824    portal = MockPortal()
825    test.globs['plone'] = portal
826    test.globs['transmogrifier'].context = test.globs['plone']
827
828    class BinarySource(SampleSource):
829        classProvides(ISectionBlueprint)
830        implements(ISection)
831
832        def __init__(self, *args, **kw):
833            super(BinarySource, self).__init__(*args, **kw)
834            self.sample = (
835                dict(),
836                dict(_path='not/existing/bar'),
837                dict(_path='spam/eggs/notatcontent'),
838                dict(_path='spam/eggs/foo'),
839            )
840
841    provideUtility(BinarySource,
842        name=u'quintagroup.transmogrifier.tests.binarysource')
843
844from DateTime import DateTime
845
846def catalogSourceSetUp(test):
847    sectionsSetUp(test)
848
849    class MockContent(dict):
850        def __init__(self, **kw):
851            self.update(kw)
852            self['id'] = self.getId
853
854        def getPath(self):
855            return self['path']
856
857        @property
858        def getId(self):
859            path = self.getPath()
860            return path.rsplit('/', 1)[-1]
861
862        @property
863        def portal_type(self):
864            return self['portal_type']
865
866        @property
867        def is_folderish(self):
868            return self['portal_type'] == 'Folder' and True or False
869
870    class MockPortal(dict):
871
872        content = ()
873        def __call__(self, **kw):
874            res = []
875            for obj in self.content:
876                matched = True
877                for index, query in kw.items():
878                    if index not in obj:
879                        matched = False
880                        break
881                    if matched and index == 'modified':
882                        if isinstance(query, dict):
883                            value = query['query']
884                            range_ = query['range']
885                            if range_ == 'min' and DateTime(obj[index]) >= DateTime(value):
886                                matched = True
887                            elif range_ == 'max' and DateTime(obj[index]) <= DateTime(value):
888                                matched = True
889                            else:
890                                matched = False
891                        else:
892                            if DateTime(obj[index]) == DateTime(query):
893                                matched = True
894                            else:
895                                matched = False
896                    elif matched and index == 'path':
897                        if obj[index].startswith(query):
898                            matched = True
899                        else:
900                            matched = False
901                    elif matched:
902                        if obj[index] == query:
903                            matched = True
904                        else:
905                            matched = False
906                if matched:
907                    res.append(obj)
908
909            return res
910
911    portal = MockPortal()
912    doc1 = MockContent(path='/plone/document1', portal_type='Document',
913        modified='2008-11-01T12:00:00Z')
914    folder1 = MockContent(path='/plone/folder1', portal_type='Folder',
915        modified='2008-11-01T12:00:00Z')
916    doc2 = MockContent(path='/plone/folder1/document2', portal_type='Document',
917        modified='2008-11-02T12:00:00Z')
918    doc3 = MockContent(path='/plone/folder1/document3', portal_type='Document',
919        modified='2008-11-02T12:00:00Z')
920    folder2 = MockContent(path='/plone/folder2', portal_type='Folder',
921        modified='2008-11-02T12:00:00Z')
922    doc4 = MockContent(path='/plone/folder2/document4', portal_type='Document',
923        modified='2008-11-01T12:00:00Z')
924    comment = MockContent(path='/plone/folder2/document4/talkback/1234567890', portal_type='Discussion Item',
925        modified='2008-11-02T12:00:00Z')
926    # items are sorted on their modification date
927    portal.content = (doc1, folder1, folder2, doc2, doc3, doc4, comment)
928
929    test.globs['plone'] = portal
930    test.globs['transmogrifier'].context = test.globs['plone']
931
932def flushCacheSetUp(test):
933    sectionsSetUp(test)
934   
935    class DataBase(object):
936        def __init__(self, context):
937            self.context = context
938       
939        def cacheMinimize(self):
940            self.context.cacheMinimized += 1
941
942        def _getDB(self):
943            return self
944   
945    class DataBasePanel(object):
946        def __init__(self, context):
947            self.context = context
948       
949        def getDatabaseNames(self):
950            return ('main',)
951       
952        def __getitem__(self, key):
953            return DataBase(self.context)
954   
955    class ControlPanel(object):
956        def __init__(self, context):
957            self.Database = DataBasePanel(context)
958   
959    class MockPortal(object):
960        def __init__(self):
961            self.cacheMinimized = 0
962            self.Control_Panel = ControlPanel(self)
963       
964    test.globs['plone'] = MockPortal()
965    test.globs['transmogrifier'].context = test.globs['plone']
966
967
968def interfaceManagerSetUp(test):
969    sectionsSetUp(test)
970
971    from zope.interface import Interface
972    from zope.annotation.interfaces import IAttributeAnnotatable
973    from zope.interface import alsoProvides as orig_alsoProvides
974    from Products.Archetypes.interfaces import IBaseObject
975
976    class MockPortal(object):
977
978        implements(
979            IBaseObject,
980        )
981
982        _last_path = None
983        def unrestrictedTraverse(self, path, default):
984            if path[0] == '/':
985                return default # path is absolute
986            if isinstance(path, unicode):
987                return default
988            if path == 'not/existing/bar':
989                return default
990            if path.endswith('/notatcontent'):
991                return object()
992            self._last_path = path
993            return self
994
995        # implement portal_catalog reindex method
996        def reindexIndex(self, *args, **kwargs):
997            pass
998
999
1000    updated = []
1001    test.globs['updated'] = updated
1002    def patch_alsoProvides(object, *interfaces):
1003        updated.extend([i.__identifier__ for i in interfaces])
1004        orig_alsoProvides(object, *interfaces)
1005    quintagroup.transmogrifier.interfacemanager.alsoProvides = patch_alsoProvides
1006
1007    portal = MockPortal()
1008    orig_alsoProvides(portal, IAttributeAnnotatable, Interface)
1009    test.globs['plone'] = portal
1010    test.globs['transmogrifier'].context = test.globs['plone']
1011
1012    class InterfaceManagerSource(SampleSource):
1013        classProvides(ISectionBlueprint)
1014        implements(ISection)
1015
1016        def __init__(self, *args, **kw):
1017            super(InterfaceManagerSource, self).__init__(*args, **kw)
1018            self.sample = (
1019                dict(),
1020                dict(_path='not/existing/bar'),
1021                dict(_path='spam/eggs/notatcontent'),
1022                dict(_path='spam/eggs/foo'),
1023            )
1024
1025    provideUtility(InterfaceManagerSource,
1026        name=u'quintagroup.transmogrifier.tests.interfacemanagersource')
1027
1028def portletsSetUp(test):
1029    sectionsSetUp(test)
1030
1031    from zope.interface import Interface
1032    from zope.annotation.interfaces import IAttributeAnnotatable
1033    from plone.portlets.interfaces import ILocalPortletAssignable
1034
1035    # this bases class adds __of__ method
1036    from Acquisition import Implicit
1037
1038    class MockPortal(Implicit):
1039        implements(IAttributeAnnotatable, ILocalPortletAssignable)
1040
1041        _last_path = None
1042        def unrestrictedTraverse(self, path, default):
1043            if path[0] == '/':
1044                return default # path is absolute
1045            if isinstance(path, unicode):
1046                return default
1047            if path == 'not/existing/bar':
1048                return default
1049            if path.endswith('/notassignable'):
1050                return object()
1051            self._last_path = path
1052            return self
1053
1054        def getPhysicalPath(self):
1055            return [''] + self._last_path.split('/')
1056
1057    portal = MockPortal()
1058    test.globs['plone'] = portal
1059    test.globs['transmogrifier'].context = test.globs['plone']
1060
1061    class PortletsSource(SampleSource):
1062        classProvides(ISectionBlueprint)
1063        implements(ISection)
1064
1065        def __init__(self, *args, **kw):
1066            super(PortletsSource, self).__init__(*args, **kw)
1067            self.sample = (
1068                dict(),
1069                dict(_path='not/existing/bar'),
1070                dict(_path='spam/eggs/notassignable'),
1071                dict(_path='assignable'),
1072                dict(_path='other-assignable', 
1073                    files=dict(portlets=dict(
1074                        name='.portlets.xml',
1075                        data="""<?xml version="1.0" encoding="utf-8"?>
1076<portlets>
1077  <assignment category="context" key="/other-assignable" manager="plone.leftcolumn" name="habra-rss" type="portlets.rss">
1078    <property name="count">
1079      20
1080    </property>
1081    <property name="url">
1082      http://habrahabr.ru/rss/
1083    </property>
1084    <property name="portlet_title">
1085      Habrahabr RSS feed
1086    </property>
1087    <property name="timeout">
1088      120
1089    </property>
1090  </assignment>
1091  <blacklist category="user" manager="plone.leftcolumn" status="block"/>
1092  <blacklist category="group" manager="plone.leftcolumn" status="acquire"/>
1093  <blacklist category="content_type" manager="plone.leftcolumn" status="acquire"/>
1094  <blacklist category="context" manager="plone.leftcolumn" status="acquire"/>
1095</portlets>
1096""")
1097                    )
1098                )
1099            )
1100
1101    provideUtility(PortletsSource,
1102        name=u'quintagroup.transmogrifier.tests.portletssource')
1103
1104    # prepare the one portlet for testing
1105    from zope.interface import alsoProvides
1106    from zope.component import getUtility, getMultiAdapter
1107    from zope.component.interfaces import IFactory
1108    from zope.component.factory import Factory
1109
1110    from plone.portlets.manager import PortletManager
1111    from plone.portlets.interfaces import IPortletManager, IPortletType, \
1112        IPortletAssignmentMapping
1113    from plone.portlets.registration import PortletType
1114
1115    from plone.app.portlets.assignable import localPortletAssignmentMappingAdapter
1116    from plone.portlets.assignable import LocalPortletAssignmentManager
1117    from plone.app.portlets.interfaces import IPortletTypeInterface
1118    from plone.portlets.interfaces import  ILocalPortletAssignmentManager
1119    from plone.portlets.constants import USER_CATEGORY
1120    # from plone.app.portlets.browser.interfaces import IPortletAdding
1121    from plone.app.portlets.portlets.rss import IRSSPortlet, Assignment #, Renderer, AddForm, EditForm
1122
1123    # register portlet manager and assignment mapping adapter
1124    manager = PortletManager()
1125    provideUtility(manager, IPortletManager, name='plone.leftcolumn')
1126    provideAdapter(localPortletAssignmentMappingAdapter)
1127    provideAdapter(LocalPortletAssignmentManager)
1128    mapping = getMultiAdapter((portal, manager), IPortletAssignmentMapping)
1129    assignable = getMultiAdapter((portal, manager), ILocalPortletAssignmentManager)
1130    test.globs['mapping'] = mapping
1131    test.globs['assignable'] = assignable
1132
1133    # register portlet (this is what plone:portlet zcml directive does)
1134    PORTLET_NAME = 'portlets.rss'
1135    alsoProvides(IRSSPortlet, IPortletTypeInterface)
1136    provideUtility(provides=IPortletTypeInterface, name=PORTLET_NAME, component=IRSSPortlet)
1137    provideUtility(provides=IFactory, name=PORTLET_NAME, component=Factory(Assignment))
1138
1139    # register a portlet type (this is what <portlet /> element in portlets.xml
1140    # does)
1141    portlet = PortletType()
1142    portlet.title = 'RSS Feed'
1143    portlet.description = 'A portlet which can receive and render an RSS feed.'
1144    portlet.addview = PORTLET_NAME
1145    portlet.for_ = [Interface]
1146    provideUtility(component=portlet, provides=IPortletType, 
1147        name=PORTLET_NAME)
1148
1149    # add a portlet and configure it (this is done on @@manage-portlets view)
1150    assignment = getUtility(IFactory, name=PORTLET_NAME)()
1151    mapping['rss'] = assignment
1152    portlet_interface = getUtility(IPortletTypeInterface, name=PORTLET_NAME)
1153    data = {
1154        'portlet_title': u'RSS feed',
1155        'count': 10,
1156        'url': u'http://sumno.com/feeds/main-page/',
1157        'timeout': 60
1158    }
1159    for k, v in data.items():
1160        field = portlet_interface.get(k)
1161        field = field.bind(assignment)
1162        field.validate(v)
1163        field.set(assignment, v)
1164    # set blacklists for user category to 'block'
1165    assignable.setBlacklistStatus(USER_CATEGORY, True)
1166
1167def test_suite():
1168    import sys
1169    suite = unittest.findTestCases(sys.modules[__name__])
1170    suite.addTests((
1171        doctest.DocFileSuite(
1172            'sitewalker.txt',
1173            setUp=siteWalkerSetUp, tearDown=tearDown),
1174        doctest.DocFileSuite(
1175            'manifest.txt',
1176            setUp=manifestSetUp, tearDown=tearDown),
1177        doctest.DocFileSuite(
1178            'marshall.txt',
1179            setUp=marshallSetUp, tearDown=tearDown),
1180        doctest.DocFileSuite(
1181            'propertymanager.txt',
1182            setUp=propertyManagerSetUp, tearDown=tearDown),
1183        doctest.DocFileSuite(
1184            'comments.txt',
1185            setUp=commentsSetUp, tearDown=tearDown),
1186        doctest.DocFileSuite(
1187            'datacorrector.txt',
1188            setUp=dataCorrectorSetUp, tearDown=tearDown),
1189        doctest.DocFileSuite(
1190            'writer.txt',
1191            setUp=writerSetUp, tearDown=tearDown),
1192        doctest.DocFileSuite(
1193            'reader.txt',
1194            setUp=readerSetUp, tearDown=tearDown),
1195        doctest.DocFileSuite(
1196            'substitution.txt',
1197            setUp=substitutionSetUp, tearDown=tearDown),
1198        doctest.DocFileSuite(
1199            'xslt.txt',
1200            setUp=xsltSetUp, tearDown=tearDown),
1201        doctest.DocFileSuite(
1202            'binary.txt',
1203            setUp=binarySetUp, tearDown=tearDown),
1204        doctest.DocFileSuite(
1205            'catalogsource.txt',
1206            setUp=catalogSourceSetUp, tearDown=tearDown),
1207        doctest.DocFileSuite(
1208            'flushcache.txt',
1209            setUp=flushCacheSetUp, tearDown=tearDown),
1210        doctest.DocFileSuite(
1211            'interfacemanager.txt',
1212            setUp=interfaceManagerSetUp, tearDown=tearDown),
1213        doctest.DocFileSuite(
1214            'portlets.txt',
1215            setUp=portletsSetUp, tearDown=tearDown),
1216    ))
1217    return suite
Note: See TracBrowser for help on using the repository browser.