Source code for pyramid.session

from hashlib import sha1
import base64
import binascii
import hmac
import time
import os

from zope.interface import implementer

from pyramid.compat import (

from pyramid.httpexceptions import HTTPBadRequest
from pyramid.interfaces import ISession
from pyramid.util import strings_differ

def manage_accessed(wrapped):
    """ Decorator which causes a cookie to be set when a wrapped
    method is called"""
    def accessed(session, *arg, **kw):
        session.accessed = int(time.time())
        if not session._dirty:
            session._dirty = True
            def set_cookie_callback(request, response):
                session.request = None # explicitly break cycle for gc
        return wrapped(session, *arg, **kw)
    accessed.__doc__ = wrapped.__doc__
    return accessed

[docs]def signed_serialize(data, secret): """ Serialize any pickleable structure (``data``) and sign it using the ``secret`` (must be a string). Return the serialization, which includes the signature as its first 40 bytes. The ``signed_deserialize`` method will deserialize such a value. This function is useful for creating signed cookies. For example: .. code-block:: python cookieval = signed_serialize({'a':1}, 'secret') response.set_cookie('signed_cookie', cookieval) """ pickled = pickle.dumps(data, pickle.HIGHEST_PROTOCOL) sig =, pickled, sha1).hexdigest() return sig + native_(base64.b64encode(pickled))
[docs]def signed_deserialize(serialized, secret, hmac=hmac): """ Deserialize the value returned from ``signed_serialize``. If the value cannot be deserialized for any reason, a :exc:`ValueError` exception will be raised. This function is useful for deserializing a signed cookie value created by ``signed_serialize``. For example: .. code-block:: python cookieval = request.cookies['signed_cookie'] data = signed_deserialize(cookieval, 'secret') """ # hmac parameterized only for unit tests try: input_sig, pickled = (serialized[:40], base64.b64decode(bytes_(serialized[40:]))) except (binascii.Error, TypeError) as e: # Badly formed data can make base64 die raise ValueError('Badly formed base64 data: %s' % e) sig =, pickled, sha1).hexdigest() # Avoid timing attacks (see # if strings_differ(sig, input_sig): raise ValueError('Invalid signature') return pickle.loads(pickled)
[docs]def check_csrf_token(request, token='csrf_token', raises=True): """ Check the CSRF token in the request's session against the value in ``request.params.get(token)``. If a ``token`` keyword is not supplied to this function, the string ``csrf_token`` will be used to look up the token within ``request.params``. If the value in ``request.params.get(token)`` doesn't match the value supplied by ``request.session.get_csrf_token()``, and ``raises`` is ``True``, this function will raise an :exc:`pyramid.httpexceptions.HTTPBadRequest` exception. If the check does succeed and ``raises`` is ``False``, this function will return ``False``. If the CSRF check is successful, this function will return ``True`` unconditionally. Note that using this function requires that a :term:`session factory` is configured. .. versionadded:: 1.4a2 """ if request.params.get(token) != request.session.get_csrf_token(): if raises: raise HTTPBadRequest('incorrect CSRF token') return False return True
[docs]def UnencryptedCookieSessionFactoryConfig( secret, timeout=1200, cookie_name='session', cookie_max_age=None, cookie_path='/', cookie_domain=None, cookie_secure=False, cookie_httponly=False, cookie_on_exception=True, signed_serialize=signed_serialize, signed_deserialize=signed_deserialize, ): """ Configure a :term:`session factory` which will provide unencrypted (but signed) cookie-based sessions. The return value of this function is a :term:`session factory`, which may be provided as the ``session_factory`` argument of a :class:`pyramid.config.Configurator` constructor, or used as the ``session_factory`` argument of the :meth:`pyramid.config.Configurator.set_session_factory` method. The session factory returned by this function will create sessions which are limited to storing fewer than 4000 bytes of data (as the payload must fit into a single cookie). Parameters: ``secret`` A string which is used to sign the cookie. ``timeout`` A number of seconds of inactivity before a session times out. ``cookie_name`` The name of the cookie used for sessioning. Default: ``session``. ``cookie_max_age`` The maximum age of the cookie used for sessioning (in seconds). Default: ``None`` (browser scope). ``cookie_path`` The path used for the session cookie. Default: ``/``. ``cookie_domain`` The domain used for the session cookie. Default: ``None`` (no domain). ``cookie_secure`` The 'secure' flag of the session cookie. Default: ``False``. ``cookie_httponly`` The 'httpOnly' flag of the session cookie. Default: ``False``. ``cookie_on_exception`` If ``True``, set a session cookie even if an exception occurs while rendering a view. Default: ``True``. ``signed_serialize`` A callable which takes more or less arbitrary python data structure and a secret and returns a signed serialization in bytes. Default: ``signed_serialize`` (using pickle). ``signed_deserialize`` A callable which takes a signed and serialized data structure in bytes and a secret and returns the original data structure if the signature is valid. Default: ``signed_deserialize`` (using pickle). """ @implementer(ISession) class UnencryptedCookieSessionFactory(dict): """ Dictionary-like session object """ # configuration parameters _cookie_name = cookie_name _cookie_max_age = cookie_max_age _cookie_path = cookie_path _cookie_domain = cookie_domain _cookie_secure = cookie_secure _cookie_httponly = cookie_httponly _cookie_on_exception = cookie_on_exception _secret = secret _timeout = timeout # dirty flag _dirty = False def __init__(self, request): self.request = request now = time.time() created = accessed = now new = True value = None state = {} cookieval = request.cookies.get(self._cookie_name) if cookieval is not None: try: value = signed_deserialize(cookieval, self._secret) except ValueError: value = None if value is not None: accessed, created, state = value new = False if now - accessed > self._timeout: state = {} self.created = created self.accessed = accessed = new dict.__init__(self, state) # ISession methods def changed(self): """ This is intentionally a noop; the session is serialized on every access, so unnecessary""" pass def invalidate(self): self.clear() # XXX probably needs to unset cookie # non-modifying dictionary methods get = manage_accessed(dict.get) __getitem__ = manage_accessed(dict.__getitem__) items = manage_accessed(dict.items) values = manage_accessed(dict.values) keys = manage_accessed(dict.keys) __contains__ = manage_accessed(dict.__contains__) __len__ = manage_accessed(dict.__len__) __iter__ = manage_accessed(dict.__iter__) if not PY3: iteritems = manage_accessed(dict.iteritems) itervalues = manage_accessed(dict.itervalues) iterkeys = manage_accessed(dict.iterkeys) has_key = manage_accessed(dict.has_key) # modifying dictionary methods clear = manage_accessed(dict.clear) update = manage_accessed(dict.update) setdefault = manage_accessed(dict.setdefault) pop = manage_accessed(dict.pop) popitem = manage_accessed(dict.popitem) __setitem__ = manage_accessed(dict.__setitem__) __delitem__ = manage_accessed(dict.__delitem__) # flash API methods @manage_accessed def flash(self, msg, queue='', allow_duplicate=True): storage = self.setdefault('_f_' + queue, []) if allow_duplicate or (msg not in storage): storage.append(msg) @manage_accessed def pop_flash(self, queue=''): storage = self.pop('_f_' + queue, []) return storage @manage_accessed def peek_flash(self, queue=''): storage = self.get('_f_' + queue, []) return storage # CSRF API methods @manage_accessed def new_csrf_token(self): token = text_(binascii.hexlify(os.urandom(20))) self['_csrft_'] = token return token @manage_accessed def get_csrf_token(self): token = self.get('_csrft_', None) if token is None: token = self.new_csrf_token() return token # non-API methods def _set_cookie(self, response): if not self._cookie_on_exception: exception = getattr(self.request, 'exception', None) if exception is not None: # dont set a cookie during exceptions return False cookieval = signed_serialize( (self.accessed, self.created, dict(self)), self._secret ) if len(cookieval) > 4064: raise ValueError( 'Cookie value is too long to store (%s bytes)' % len(cookieval) ) response.set_cookie( self._cookie_name, value=cookieval, max_age = self._cookie_max_age, path = self._cookie_path, domain = self._cookie_domain, secure = self._cookie_secure, httponly = self._cookie_httponly, ) return True return UnencryptedCookieSessionFactory