Commits

Anonymous committed 7d58361

[svn r6069] XmlRpcPlugin: Improved error handling.
* Moved `XML_RPC` permission check into each protocol handler in order to return valid responses.
* General improvements to error reporting and handling.
* Tests cases for general errors for both protocols ('not allowed', 'no such method' and 'wrong args'.

  • Participants
  • Parent commits b70ee79

Comments (0)

Files changed (4)

trunk/tracrpc/tests/__init__.py

             # Add a delay to make sure server comes up...
             time.sleep(1)
 
+        def _tracadmin(self, *args, **kwargs):
+            SvnFunctionalTestEnvironment._tracadmin(self, *args, **kwargs)
+            # Delay to make sure command executes and cache resets
+            time.sleep(5)
+
     rpc_testenv = RpcTestEnvironment(os.path.realpath(os.path.join(
                 os.path.realpath(__file__), '..', '..', '..', 'rpctestenv')),
                 '8765', 'http://127.0.0.1')
 
     def suite():
         suite = unittest.TestSuite()
+        import tracrpc.tests.xml
+        suite.addTest(tracrpc.tests.xml.suite())
         import tracrpc.tests.json
         suite.addTest(tracrpc.tests.json.suite())
         import tracrpc.tests.ticket

trunk/tracrpc/tests/json.py

                     result['result']['__jsonclass__'][1].decode("base64"))
             self.assertEquals(image_in.getvalue(), image_out.getvalue())
 
+        def test_xmlrpc_permission(self):
+            # Test returned response if not XML_RPC permission
+            rpc_testenv._tracadmin('permission', 'remove', 'anonymous',
+                                    'XML_RPC')
+            try:
+                result = self._anon_req({'method': 'system.listMethods',
+                                         'id': 'no-perm'})
+                self.assertEquals(None, result['result'])
+                self.assertEquals('no-perm', result['id'])
+                self.assertEquals(-32600, result['error']['code'])
+                self.assertTrue('XML_RPC' in result['error']['message'])
+            finally:
+                # Add back the default permission for further tests
+                rpc_testenv._tracadmin('permission', 'add', 'anonymous',
+                                            'XML_RPC')
+
+        def test_method_not_found(self):
+            result = self._anon_req({'method': 'system.doesNotExist',
+                                     'id': 'no-method'})
+            self.assertTrue(result['error'])
+            self.assertEquals(result['id'], 'no-method')
+            self.assertEquals(None, result['result'])
+            self.assertEquals(-32603, result['error']['code'])
+            self.assertTrue('not found' in result['error']['message'])
+
+        def test_wrong_argspec(self):
+            result = self._anon_req({'method': 'system.listMethods',
+                            'params': ['hello'], 'id': 'wrong-args'})
+            self.assertTrue(result['error'])
+            self.assertEquals(result['id'], 'wrong-args')
+            self.assertEquals(None, result['result'])
+            self.assertEquals(-32603, result['error']['code'])
+            self.assertTrue('listMethods() takes exactly 2 arguments' \
+                                in result['error']['message'])
+
     def suite():
         return unittest.makeSuite(JsonTestCase)
 else:

trunk/tracrpc/tests/xml.py

+# -*- coding: utf-8 -*-
+"""
+License: BSD
+
+(c) 2009      ::: www.CodeResort.com - BV Network AS (simon-code@bvnetwork.no)
+"""
+
+import unittest
+
+import xmlrpclib
+
+from tracrpc.tests import rpc_testenv
+
+class RpcXmlTestCase(unittest.TestCase):
+    
+    def setUp(self):
+        self.anon = xmlrpclib.ServerProxy(rpc_testenv.url_anon)
+        self.user = xmlrpclib.ServerProxy(rpc_testenv.url_user)
+        self.admin = xmlrpclib.ServerProxy(rpc_testenv.url_admin)
+
+    def test_xmlrpc_permission(self):
+        # Test returned response if not XML_RPC permission
+        rpc_testenv._tracadmin('permission', 'remove', 'anonymous',
+                                'XML_RPC')
+        try:
+            self.anon.system.listMethods()
+            rpc_testenv._tracadmin('permission', 'add', 'anonymous',
+                                        'XML_RPC')
+            self.fail("Revoked permissions not taken effect???")
+        except xmlrpclib.Fault, e:
+            self.assertEquals(1, e.faultCode)
+            self.assertTrue('XML_RPC' in e.faultString)
+            rpc_testenv._tracadmin('permission', 'add', 'anonymous',
+                                        'XML_RPC')
+
+    def test_method_not_found(self):
+        try:
+            self.admin.system.doesNotExist()
+            self.fail("What? Method exists???")
+        except xmlrpclib.Fault, e:
+            self.assertEquals(1, e.faultCode)
+            self.assertTrue("not found" in e.faultString)
+
+    def test_wrong_argspec(self):
+        try:
+            self.admin.system.listMethods("hello")
+            self.fail("Oops. Wrong argspec accepted???")
+        except xmlrpclib.Fault, e:
+            self.assertEquals(2, e.faultCode)
+            self.assertTrue("listMethods() takes exactly 2 arguments" \
+                                    in e.faultString)
+
+def suite():
+    return unittest.makeSuite(RpcXmlTestCase)
+
+if __name__ == '__main__':
+    unittest.main(defaultTest='suite')

trunk/tracrpc/web_ui.py

 import genshi
 
 from trac.core import *
+from trac.perm import PermissionError
 from trac.util.datefmt import utc
 from trac.util.text import to_unicode, exception_to_unicode
 from trac.web.main import IRequestHandler
         req.write(response)
 
     def process_request(self, req):
-        # Need at least XML_RPC
-        req.perm.assert_permission('XML_RPC')
 
-        # Dump RPC functions
         content_type = req.get_header('Content-Type') or 'text/html'
         if not self.content_type_re.match(content_type):
+            # Dump RPC functions
+            req.perm.require('XML_RPC') # Need at least XML_RPC
             namespaces = {}
             for method in XMLRPCSystem(self.env).all_methods(req):
                 namespace = method.namespace.replace('.', '_')
                                     % (req.authname, method, repr(args)))
         args = self._normalize_xml_input(args)
         try:
+            req.perm.require('XML_RPC') # Need at least XML_RPC
             result = XMLRPCSystem(self.env).get_method(method)(req, args)
             self.env.log.debug("RPC(xml) '%s' result: %s" % (method, repr(result)))
             result = tuple(self._normalize_xml_output(result))
             self._send_response(req, xmlrpclib.dumps(result, methodresponse=True), content_type)
+        except PermissionError, e:
+            self._send_response(req, xmlrpclib.dumps(xmlrpclib.Fault(1, to_unicode(e))),
+                                    content_type)
         except xmlrpclib.Fault, e:
             self.log.error(e)
             self._send_response(req, xmlrpclib.dumps(e), content_type)
 
     def process_json_request(self, req, content_type):
         """ Handles JSON-RPC requests """
-        try: # Catch-all safety
-            try:
-                data = json.load(req, cls=TracRpcJSONDecoder)
-            except Exception, e:
-                # Abort with exception - no data can be read
-                self.log.error("RPC(json) decode error %s" % \
-                        exception_to_unicode(e, traceback=True))
-                response = json.dumps({'result': None, 'id': None,
-                    'error': self._json_error(e, -32700)},
-                    cls=TracRpcJSONEncoder)
-                self._send_response(req, response + '\n', content_type)
-                return
-            self.log.debug("RPC(json) call by '%s': %s" % (req.authname, data))
-            args = data.get('params') or []
-            r_id = data.get('id', None)
-            method = data.get('method', '')
+        try:
+            data = json.load(req, cls=TracRpcJSONDecoder)
+        except Exception, e:
+            # Abort with exception - no data can be read
+            self.log.error("RPC(json) decode error %s" % \
+                    exception_to_unicode(e, traceback=True))
+            response = json.dumps(self._json_error(e, -32700),
+                                    cls=TracRpcJSONEncoder)
+            self._send_response(req, response + '\n', content_type)
+            return
+        self.log.debug("RPC(json) call by '%s': %s" % (req.authname, data))
+        args = data.get('params') or []
+        r_id = data.get('id', None)
+        method = data.get('method', '')
+        try:
+            req.perm.require('XML_RPC') # Need at least XML_RPC
             if method == 'system.multicall': # Custom multicall
-                result = []
+                results = []
                 for mc in args:
-                    result.append(self._json_call(req, mc.get('method', ''),
+                    results.append(self._json_call(req, mc.get('method', ''),
                         mc.get('params') or [], mc.get('id') or r_id))
-                response = {'result': result, 'error': None, 'id': r_id}
+                response = {'result': results, 'error': None, 'id': r_id}
             else:
                 response = self._json_call(req, method, args, r_id)
             try: # JSON encoding
                 self.log.debug("RPC(json) result: %s" % repr(response))
                 response = json.dumps(response, cls=TracRpcJSONEncoder)
             except Exception, e:
-                response = json.dumps({'result': None, 'id': r_id,
-                    'error': self._json_error(e)}, cls=TracRpcJSONEncoder)
+                response = json.dumps(self._json_error(e, r_id=r_id),
+                                        cls=TracRpcJSONEncoder)
+        except PermissionError, e:
+            response = json.dumps(self._json_error(e, -32600, r_id=r_id),
+                cls=TracRpcJSONEncoder)
         except Exception, e:
             self.log.error("RPC(json) error %s" % exception_to_unicode(e,
                                                     traceback=True))
-            response = json.dumps({'result': None, 'id': None,
-                    'error': self._json_error(e)}, cls=TracRpcJSONEncoder)
+            response = json.dumps(self._json_error(e), cls=TracRpcJSONEncoder)
         self.log.debug("RPC(json) encoded result: %s" % response)
         self._send_response(req, response + '\n', content_type)
 
     def _json_call(self, req, method, args, r_id=None):
-        """ Create response dictionary 'result', 'error' and 'id' keys. """
+        """ Call method and create response dictionary. """
         try:
             result = (XMLRPCSystem(self.env).get_method(method)(req, args))[0]
             return {'result': result, 'error': None, 'id': r_id}
         except Exception, e:
-            error = self._json_error(e)
-            return {'result': None, 'error': error, 'id': r_id}
+            return self._json_error(e, r_id=r_id)
 
-    def _json_error(self, msg, c=-32603):
-        return {'name': 'JSONRPCError', 'code': c, 'message': to_unicode(msg)}
+    def _json_error(self, e, c=-32603, r_id=None):
+        """ Makes a response dictionary that is an error. """
+        return {'result': None, 'id': r_id, 'error': {
+                'name': 'JSONRPCError', 'code': c, 'message': to_unicode(e)}}
 
     def _normalize_xml_input(self, args):
         """ Normalizes arguments (at any level - traversing dicts and lists):