Source code for pyramid_rpc.jsonrpc

import json
import logging
import copy

import venusian
from pyramid.exceptions import ConfigurationError
from pyramid.httpexceptions import HTTPForbidden
from pyramid.httpexceptions import HTTPNotFound
from pyramid.renderers import null_renderer
from pyramid.renderers import render
from pyramid.request import Request
from pyramid.response import Response
from pyramid.security import NO_PERMISSION_REQUIRED

from pyramid_rpc.compat import is_nonstr_iter
from pyramid_rpc.mapper import MapplyViewMapper
from pyramid_rpc.mapper import ViewMapperArgsInvalid
from pyramid_rpc.util import combine


log = logging.getLogger(__name__)

DEFAULT_RENDERER = 'pyramid_rpc:jsonrpc'

_marker = object()


[docs]class JsonRpcError(Exception): code = -32603 # sane default message = 'internal error' # sane default data = None def __init__(self, code=None, message=None, data=None): if code is not None: self.code = code if message is not None: self.message = message if data is not None: self.data = data def as_dict(self): """Return a dictionary representation of this object for serialization in a JSON-RPC response.""" error = dict(code=self.code, message=self.message) if self.data is not None: error['data'] = self.data return error
[docs]class JsonRpcParseError(JsonRpcError): code = -32700 message = 'parse error'
[docs]class JsonRpcRequestInvalid(JsonRpcError): code = -32600 message = 'invalid request'
[docs]class JsonRpcMethodNotFound(JsonRpcError): code = -32601 message = 'method not found'
[docs]class JsonRpcParamsInvalid(JsonRpcError): code = -32602 message = 'invalid params'
[docs]class JsonRpcInternalError(JsonRpcError): code = -32603 message = 'internal error'
def make_error_response(request, error, id=None): """ Marshal a Python Exception into a ``Response`` object with a body that is a JSON string suitable for use as a JSON-RPC response with a content-type of ``application/json`` and return the response. """ # we may need to render a parse error, at which point we don't know # much about the request renderer = getattr(request, 'rpc_renderer', DEFAULT_RENDERER) out = { 'jsonrpc': '2.0', 'id': id, 'error': error.as_dict(), } body = render(renderer, out, request=request).encode('utf-8') response = Response(body, charset='utf-8') response.content_type = 'application/json' return response def exception_view(exc, request): rpc_id = getattr(request, 'rpc_id', None) if isinstance(exc, JsonRpcError): fault = exc log.debug('json-rpc error rpc_id:%s "%s"', rpc_id, exc.message) elif isinstance(exc, HTTPNotFound): fault = JsonRpcMethodNotFound() log.debug('json-rpc method not found rpc_id:%s "%s"', rpc_id, request.rpc_method) elif isinstance(exc, HTTPForbidden): fault = JsonRpcRequestInvalid() log.debug('json-rpc method forbidden rpc_id:%s "%s"', rpc_id, request.rpc_method) elif isinstance(exc, ViewMapperArgsInvalid): fault = JsonRpcParamsInvalid() log.debug('json-rpc invalid method params') else: fault = JsonRpcInternalError() log.exception('json-rpc exception rpc_id:%s "%s"', rpc_id, exc) return make_error_response(request, fault, rpc_id) def make_response(request, result): rpc_id = getattr(request, 'rpc_id', None) response = request.response # store content_type before render is called ct = response.content_type out = { 'jsonrpc': '2.0', 'id': rpc_id, 'result': result, } if request.rpc_id is not None else '' response.body = render( request.rpc_renderer, out, request=request ).encode(response.charset) if ct == response.default_content_type: response.content_type = 'application/json' return response def _render(value, system): return json.dumps(value) def jsonrpc_renderer(info): return _render class jsonrpc_view(object): """ Decorator that wraps a view and converts the result into a valid JSON-RPC Response object. """ def __init__(self, renderer=DEFAULT_RENDERER): self.renderer = renderer def __call__(self, wrapped): def wrapper(context, request): request.rpc_renderer = self.renderer result = wrapped(context, request) if not request.is_response(result): result = make_response(request, result) return result return wrapper def parse_request_GET(request): """ Parse JSON-RPC parameters from the request query string.""" args = request.GET.get('params') if args is not None: try: request.rpc_args = json.loads(args) except ValueError: raise JsonRpcParseError else: request.rpc_args = () request.rpc_method = request.GET.get('method') request.rpc_id = request.GET.get('id') request.rpc_version = request.GET.get('jsonrpc') def parse_request_POST(request): """ Parse JSON-RPC parameters from the request body.""" try: body = request.json_body except ValueError: raise JsonRpcParseError try: batched = body[:] except TypeError: batched = None if batched is not None: request.batched_rpc_requests = batched else: request.rpc_id = body.get('id') request.rpc_args = body.get('params', ()) request.rpc_method = body.get('method') request.rpc_version = body.get('jsonrpc') def setup_request(endpoint, request): """ Parse a JSON-RPC request body.""" if request.method == 'GET': parse_request_GET(request) elif request.method == 'POST': parse_request_POST(request) else: log.debug('unsupported request method "%s"', request.method) raise JsonRpcRequestInvalid if hasattr(request, 'batched_rpc_requests'): log.debug('handling batched rpc request') # the checks below will look at the subrequests return if request.rpc_version != '2.0': log.debug('id:%s invalid rpc version %s', request.rpc_id, request.rpc_version) raise JsonRpcRequestInvalid if request.rpc_method is None: log.debug('id:%s invalid rpc method', request.rpc_id) raise JsonRpcRequestInvalid log.debug('handling id:%s method:%s', request.rpc_id, request.rpc_method) class EndpointPredicate(object): def __init__(self, val, config): self.val = val def text(self): return 'jsonrpc endpoint = %s' % self.val phash = text def __call__(self, info, request): if self.val: # find the endpoint info key = info['route'].name endpoint = request.registry.jsonrpc_endpoints[key] # potentially setup either rpc v1 or v2 from the parsed body setup_request(endpoint, request) # update request with endpoint information request.rpc_endpoint = endpoint # Always return True so that even if it isn't a valid RPC it # will fall through to the notfound_view which will still # return a valid JSON-RPC response. return True class MethodPredicate(object): def __init__(self, val, config): self.method = val def text(self): return 'jsonrpc method = %s' % self.method phash = text def __call__(self, context, request): return getattr(request, 'rpc_method', None) == self.method class BatchedRequestPredicate(object): def __init__(self, val, config): self.val = val def text(self): return 'jsonrpc batched request = %s' % self.val phash = text def __call__(self, context, request): if self.val: return hasattr(request, 'batched_rpc_requests') def batched_request_view(request): json_response = [] response = request.response for rpc_request in request.batched_rpc_requests: body = json.dumps(rpc_request).encode(request.charset) subrequest_headers = copy.copy(request.headers) subrequest_headers.pop('Content-Length', None) subrequest = Request.blank(path=request.path, environ=request.environ, base_url=request.application_url, headers=subrequest_headers, POST=body, charset=request.charset) subresponse = request.invoke_subrequest(subrequest, use_tweens=True) if subresponse.json_body != '': json_response.append(subresponse.json_body) if json_response: # use charset and content-type from last subresponse response.charset = subresponse.charset response.content_type = subresponse.content_type # will automatically be encoded response.json_body = json_response else: # if we would send an empty list, instead send nothing # per JSON-RPC: http://www.jsonrpc.org/specification#batch response.content_type = 'text/plain' response.body = b'' return response class Endpoint(object): def __init__(self, name, default_mapper, default_renderer): self.name = name self.default_mapper = default_mapper self.default_renderer = default_renderer
[docs]def add_jsonrpc_endpoint(config, name, *args, **kw): """Add an endpoint for handling JSON-RPC. ``name`` The name of the endpoint. ``default_mapper`` A default view mapper that will be passed as the ``mapper`` argument to each of the endpoint's methods. ``default_renderer`` A default renderer that will be passed as the ``renderer`` argument to each of the endpoint's methods. This should be the string name of the renderer, registered via :meth:`pyramid.config.Configurator.add_renderer`. A JSON-RPC method also accepts all of the arguments supplied to :meth:`pyramid.config.Configurator.add_route`. """ default_mapper = kw.pop('default_mapper', MapplyViewMapper) default_renderer = kw.pop('default_renderer', DEFAULT_RENDERER) endpoint = Endpoint( name, default_mapper=default_mapper, default_renderer=default_renderer, ) config.registry.jsonrpc_endpoints[name] = endpoint kw['jsonrpc_endpoint'] = True config.add_route(name, *args, **kw) kw = {} kw['jsonrpc_batched'] = True kw['renderer'] = null_renderer config.add_view(batched_request_view, route_name=name, permission=NO_PERMISSION_REQUIRED, **kw) config.add_view(exception_view, route_name=name, context=Exception, permission=NO_PERMISSION_REQUIRED)
[docs]def add_jsonrpc_method(config, view, **kw): """Add a method to a JSON-RPC endpoint. ``endpoint`` The name of the endpoint. ``method`` The name of the method. A JSON-RPC method also accepts all of the arguments supplied to :meth:`pyramid.config.Configurator.add_view`. A view mapper is registered by default which will match the ``request.rpc_args`` to parameters on the view. To override this behavior simply set the ``mapper`` argument to None or another view mapper. .. note:: An endpoint **must** be defined before methods may be added. """ endpoint_name = kw.pop('endpoint', kw.pop('route_name', None)) if endpoint_name is None: raise ConfigurationError( 'Cannot register a JSON-RPC endpoint without specifying the ' 'name of the endpoint.') endpoint = config.registry.jsonrpc_endpoints.get(endpoint_name) if endpoint is None: raise ConfigurationError( 'Could not find an endpoint with the name "%s".' % endpoint_name) # pop the method name method = kw.pop('method', None) if method is None: raise ConfigurationError( 'Cannot register a JSON-RPC method without specifying the ' '"method"') mapper = kw.pop('mapper', _marker) if mapper is _marker: # only override mapper if not supplied mapper = endpoint.default_mapper kw['mapper'] = mapper renderer = kw.pop('renderer', None) if renderer is None: renderer = endpoint.default_renderer kw['renderer'] = null_renderer kw['jsonrpc_method'] = method rpc_decorator = jsonrpc_view(renderer) decorator = kw.get('decorator', None) if decorator is None: decorator = rpc_decorator else: if not is_nonstr_iter(decorator): decorator = (decorator,) # we want to apply the view_wrapper first, then the other decorators # and combine() reverses the order, so ours goes last decorators = list(decorator) + [rpc_decorator] decorator = combine(*decorators) kw['decorator'] = decorator config.add_view(view, route_name=endpoint_name, **kw)
[docs]class jsonrpc_method(object): """This decorator may be used with pyramid view callables to enable them to respond to JSON-RPC method calls. If ``method`` is not supplied, then the callable name will be used for the method name. ``_depth`` may be specified when wrapping ``jsonrpc_method`` in another decorator. The value should reflect how many stack frames are between the wrapped target and ``jsonrpc_method``. Thus a decorator one level deep would pass in ``_depth=1``. This is the lazy analog to the :func:`~pyramid_rpc.jsonrpc.add_jsonrpc_method`` and accepts all of the same arguments. """ def __init__(self, method=None, **kw): self.method = method self.kw = kw def __call__(self, wrapped): kw = self.kw.copy() kw['method'] = self.method or wrapped.__name__ depth = kw.pop('_depth', 0) def callback(context, name, ob): config = context.config.with_package(info.module) config.add_jsonrpc_method(view=ob, **kw) info = venusian.attach(wrapped, callback, category='pyramid', depth=depth + 1) if info.scope == 'class': # ensure that attr is set if decorating a class method kw.setdefault('attr', wrapped.__name__) kw['_info'] = info.codeinfo # fbo action_method return wrapped
[docs]def includeme(config): """ Set up standard configurator registrations. Use via: .. code-block:: python config = Configurator() config.include('pyramid_rpc.jsonrpc') Once this function has been invoked, two new directives will be available on the configurator: - ``add_jsonrpc_endpoint``: Add an endpoint for handling JSON-RPC. - ``add_jsonrpc_method``: Add a method to a JSON-RPC endpoint. """ if not hasattr(config.registry, 'jsonrpc_endpoints'): config.registry.jsonrpc_endpoints = {} config.add_view_predicate('jsonrpc_method', MethodPredicate) config.add_view_predicate('jsonrpc_batched', BatchedRequestPredicate) config.add_route_predicate('jsonrpc_endpoint', EndpointPredicate) config.add_renderer(DEFAULT_RENDERER, jsonrpc_renderer) config.add_directive('add_jsonrpc_endpoint', add_jsonrpc_endpoint) config.add_directive('add_jsonrpc_method', add_jsonrpc_method) config.add_view(exception_view, context=JsonRpcError, permission=NO_PERMISSION_REQUIRED)