[1] | 1 | from Testing import ZopeTestCase |
---|
| 2 | from Products.CMFPlone.tests import PloneTestCase |
---|
| 3 | from Products.CMFCore.utils import getToolByName |
---|
| 4 | from Products.Archetypes.tests import ArchetypesTestCase |
---|
| 5 | from AccessControl.SecurityManagement import newSecurityManager |
---|
| 6 | from AccessControl.SecurityManagement import noSecurityManager |
---|
| 7 | from Products.PloneSMSCommunicator.pyXIAM import SMSsubmitRequest, getAllTextFromTag |
---|
| 8 | from xml.dom.minidom import * |
---|
| 9 | from StringIO import StringIO |
---|
| 10 | import SimpleXiamServer |
---|
| 11 | import BaseHTTPServer |
---|
| 12 | import thread |
---|
| 13 | |
---|
| 14 | ZopeTestCase.installProduct('PloneSMSCommunicator') |
---|
| 15 | |
---|
| 16 | tests=[] |
---|
| 17 | |
---|
| 18 | class SmSHandler(SimpleXiamServer.SimpleXiamHandler): |
---|
| 19 | |
---|
| 20 | server_url = None |
---|
| 21 | def testCommunicator(self): |
---|
| 22 | cl = int(self.headers['content-length']) |
---|
| 23 | doc = parseString(self.rfile.read(cl)) |
---|
| 24 | self.assertEqual(getAllTextFromTag(doc, 'from')[0], '+380979312198') |
---|
| 25 | self.assertEqual(getAllTextFromTag(doc, 'to')[0], '+380979987348') |
---|
| 26 | self.assertEqual(getAllTextFromTag(doc, 'content')[0], 'hello how are you?') |
---|
| 27 | |
---|
| 28 | def do_POST(self): |
---|
| 29 | SimpleXiamServer.SimpleXiamHandler.do_POST(self) |
---|
| 30 | self.testCommunicator() |
---|
| 31 | |
---|
| 32 | |
---|
| 33 | class TestPloneSMSCommunicator(ArchetypesTestCase.ArcheSiteTestCase, ZopeTestCase.Functional): |
---|
| 34 | |
---|
| 35 | httpd = BaseHTTPServer.HTTPServer(("", 10010), SmSHandler) |
---|
| 36 | |
---|
| 37 | def createManager(self, portal): |
---|
| 38 | |
---|
| 39 | acl_users = portal.acl_users |
---|
| 40 | return acl_users._doAddUser('PortalManager', '', ['Manager'], (), (), ) |
---|
| 41 | |
---|
| 42 | def startServer(self): |
---|
| 43 | |
---|
| 44 | print 'serving at port', 10010 |
---|
| 45 | self.httpd.serve_forever() |
---|
| 46 | |
---|
| 47 | def installProducts(self, portal): |
---|
| 48 | #create manager user |
---|
| 49 | self.createManager(portal) |
---|
| 50 | user = portal.acl_users.getUserById('PortalManager') |
---|
| 51 | |
---|
| 52 | #login as manager |
---|
| 53 | newSecurityManager(None, user) |
---|
| 54 | |
---|
| 55 | #install products |
---|
| 56 | qi = getToolByName(portal, 'portal_quickinstaller') |
---|
| 57 | qi.installProduct('PloneSMSCommunicator') |
---|
| 58 | |
---|
| 59 | #log out |
---|
| 60 | noSecurityManager() |
---|
| 61 | |
---|
| 62 | def afterSetUp(self): |
---|
| 63 | |
---|
| 64 | portal = self.portal |
---|
| 65 | self.installProducts(portal) |
---|
| 66 | |
---|
| 67 | def test_sendRequest(self): |
---|
| 68 | portal = self.portal |
---|
| 69 | #start new thread |
---|
| 70 | th = thread.start_new_thread(self.startServer, ()) |
---|
| 71 | |
---|
| 72 | communicator = portal.portal_smsCommunicator |
---|
| 73 | communicator.setProperties(log_flag = None) |
---|
| 74 | communicator.setProperties(server_url = 'http://localhost:10010/') |
---|
| 75 | communicator.send_Request(originator = '+380979312198', destination = ['+380979987348'], body = 'hello how are you?') |
---|
| 76 | #destroy thread |
---|
| 77 | del th |
---|
| 78 | |
---|
| 79 | #def test_Response(self): |
---|
| 80 | # portal = self.portal |
---|
| 81 | # communicator = portal.portal_smsCommunicator |
---|
| 82 | # communicator.setProperties(log_flag = 'False') |
---|
| 83 | # response = """<?xml version="1.0" encoding="UTF-8"?> |
---|
| 84 | # <!DOCTYPE xiamSMS SYSTEM "xiamSMSMessage.dtd"> |
---|
| 85 | # <xiamSMS status="OK" statusText="XML contained 1 xir messages"> |
---|
| 86 | # <submitResponse id="betye54"> |
---|
| 87 | # <result status="OK" statusText="">+380979987348</result> |
---|
| 88 | # </submitResponse> |
---|
| 89 | # </xiamSMS>""" |
---|
| 90 | # out = StringIO() |
---|
| 91 | # out.write(response) |
---|
| 92 | # communicator = portal.portal_smsCommunicator |
---|
| 93 | # data = self.publish(portal.id+'/portal_smsCommunicator/Response', 'mgr:mgrpw', env=None, extra=None, request_method='POST', stdin = out) |
---|
| 94 | # response = data.getBody() |
---|
| 95 | # self.assertEqual(response, """<?xml version="1.0" ?>\n<!DOCTYPE xiamSMS SYSTEM "xiamSMSMessage.dtd" >\n<xiamSMS status="OK"><submitRsponse id="betye54">XML contained your response messages</submitRsponse></xiamSMS>""") |
---|
| 96 | |
---|
| 97 | tests.append(TestPloneSMSCommunicator) |
---|
| 98 | |
---|
| 99 | def test_suite(): |
---|
| 100 | from unittest import TestSuite, makeSuite |
---|
| 101 | suite = TestSuite() |
---|
| 102 | suite.addTest(makeSuite(TestPloneSMSCommunicator)) |
---|
| 103 | return suite |
---|
| 104 | |
---|
| 105 | if __name__ == '__main__': |
---|
| 106 | framework() |
---|