from codecs import utf_8_decode
from codecs import utf_8_encode
import datetime
import re
import time
from paste.auth import auth_tkt
from paste.request import get_cookies
from zope.interface import implements
from pyramid.interfaces import IAuthenticationPolicy
from pyramid.request import add_global_response_headers
from pyramid.security import Authenticated
from pyramid.security import Everyone
VALID_TOKEN = re.compile(r"^[A-Za-z][A-Za-z0-9+_-]*$")
class CallbackAuthenticationPolicy(object):
""" Abstract class """
def authenticated_userid(self, request):
userid = self.unauthenticated_userid(request)
if userid is None:
return None
if self.callback is None:
return userid
if self.callback(userid, request) is not None: # is not None!
return userid
def effective_principals(self, request):
effective_principals = [Everyone]
userid = self.unauthenticated_userid(request)
if userid is None:
return effective_principals
if self.callback is None:
groups = []
else:
groups = self.callback(userid, request)
if groups is None: # is None!
return effective_principals
effective_principals.append(Authenticated)
effective_principals.append(userid)
effective_principals.extend(groups)
return effective_principals
[docs]class RepozeWho1AuthenticationPolicy(CallbackAuthenticationPolicy):
""" A :app:`Pyramid` :term:`authentication policy` which
obtains data from the :mod:`repoze.who` 1.X WSGI 'API' (the
``repoze.who.identity`` key in the WSGI environment).
Constructor Arguments
``identifier_name``
Default: ``auth_tkt``. The :mod:`repoze.who` plugin name that
performs remember/forget. Optional.
``callback``
Default: ``None``. A callback passed the :mod:`repoze.who` identity
and the :term:`request`, expected to return ``None`` if the user
represented by the identity doesn't exist or a sequence of principal
identifiers (possibly empty) representing groups if the user does
exist. If ``callback`` is None, the userid will be assumed to exist
with no group principals.
Objects of this class implement the interface described by
:class:`pyramid.interfaces.IAuthenticationPolicy`.
"""
implements(IAuthenticationPolicy)
def __init__(self, identifier_name='auth_tkt', callback=None):
self.identifier_name = identifier_name
self.callback = callback
def _get_identity(self, request):
return request.environ.get('repoze.who.identity')
def _get_identifier(self, request):
plugins = request.environ.get('repoze.who.plugins')
if plugins is None:
return None
identifier = plugins[self.identifier_name]
return identifier
def authenticated_userid(self, request):
identity = self._get_identity(request)
if identity is None:
return None
if self.callback is None:
return identity['repoze.who.userid']
if self.callback(identity, request) is not None: # is not None!
return identity['repoze.who.userid']
def unauthenticated_userid(self, request):
identity = self._get_identity(request)
if identity is None:
return None
return identity['repoze.who.userid']
def effective_principals(self, request):
effective_principals = [Everyone]
identity = self._get_identity(request)
if identity is None:
return effective_principals
if self.callback is None:
groups = []
else:
groups = self.callback(identity, request)
if groups is None: # is None!
return effective_principals
userid = identity['repoze.who.userid']
effective_principals.append(Authenticated)
effective_principals.append(userid)
effective_principals.extend(groups)
return effective_principals
def remember(self, request, principal, **kw):
identifier = self._get_identifier(request)
if identifier is None:
return []
environ = request.environ
identity = {'repoze.who.userid':principal}
return identifier.remember(environ, identity)
def forget(self, request):
identifier = self._get_identifier(request)
if identifier is None:
return []
identity = self._get_identity(request)
return identifier.forget(request.environ, identity)
[docs]class RemoteUserAuthenticationPolicy(CallbackAuthenticationPolicy):
""" A :app:`Pyramid` :term:`authentication policy` which
obtains data from the ``REMOTE_USER`` WSGI environment variable.
Constructor Arguments
``environ_key``
Default: ``REMOTE_USER``. The key in the WSGI environ which
provides the userid.
``callback``
Default: ``None``. A callback passed the userid and the request,
expected to return None if the userid doesn't exist or a sequence of
principal identifiers (possibly empty) representing groups if the
user does exist. If ``callback`` is None, the userid will be assumed
to exist with no group principals.
Objects of this class implement the interface described by
:class:`pyramid.interfaces.IAuthenticationPolicy`.
"""
implements(IAuthenticationPolicy)
def __init__(self, environ_key='REMOTE_USER', callback=None):
self.environ_key = environ_key
self.callback = callback
def unauthenticated_userid(self, request):
return request.environ.get(self.environ_key)
def remember(self, request, principal, **kw):
return []
def forget(self, request):
return []
[docs]class AuthTktAuthenticationPolicy(CallbackAuthenticationPolicy):
""" A :app:`Pyramid` :term:`authentication policy` which
obtains data from an :class:`paste.auth.auth_tkt` cookie.
Constructor Arguments
``secret``
The secret (a string) used for auth_tkt cookie signing.
Required.
``callback``
Default: ``None``. A callback passed the userid and the
request, expected to return ``None`` if the userid doesn't
exist or a sequence of principal identifiers (possibly empty) if
the user does exist. If ``callback`` is ``None``, the userid
will be assumed to exist with no principals. Optional.
``cookie_name``
Default: ``auth_tkt``. The cookie name used
(string). Optional.
``secure``
Default: ``False``. Only send the cookie back over a secure
conn. Optional.
``include_ip``
Default: ``False``. Make the requesting IP address part of
the authentication data in the cookie. Optional.
``timeout``
Default: ``None``. Maximum number of seconds which a newly
issued ticket will be considered valid. After this amount of
time, the ticket will expire (effectively logging the user
out). If this value is ``None``, the ticket never expires.
Optional.
``reissue_time``
Default: ``None``. If this parameter is set, it represents the number
of seconds that must pass before an authentication token cookie is
automatically reissued as the result of a request which requires
authentication. The duration is measured as the number of seconds
since the last auth_tkt cookie was issued and 'now'. If this value is
``0``, a new ticket cookie will be reissued on every request which
requires authentication.
A good rule of thumb: if you want auto-expired cookies based on
inactivity: set the ``timeout`` value to 1200 (20 mins) and set the
``reissue_time`` value to perhaps a tenth of the ``timeout`` value
(120 or 2 mins). It's nonsensical to set the ``timeout`` value lower
than the ``reissue_time`` value, as the ticket will never be reissued
if so. However, such a configuration is not explicitly prevented.
Optional.
``max_age``
Default: ``None``. The max age of the auth_tkt cookie, in
seconds. This differs from ``timeout`` inasmuch as ``timeout``
represents the lifetime of the ticket contained in the cookie,
while this value represents the lifetime of the cookie itself.
When this value is set, the cookie's ``Max-Age`` and
``Expires`` settings will be set, allowing the auth_tkt cookie
to last between browser sessions. It is typically nonsensical
to set this to a value that is lower than ``timeout`` or
``reissue_time``, although it is not explicitly prevented.
Optional.
``path``
Default: ``/``. The path for which the auth_tkt cookie is valid.
May be desirable if the application only serves part of a domain.
Optional.
``http_only``
Default: ``False``. Hide cookie from JavaScript by setting the
HttpOnly flag. Not honored by all browsers.
Optional.
``wild_domain``
Default: ``True``. An auth_tkt cookie will be generated for the
wildcard domain.
Optional.
Objects of this class implement the interface described by
:class:`pyramid.interfaces.IAuthenticationPolicy`.
"""
implements(IAuthenticationPolicy)
def __init__(self,
secret,
callback=None,
cookie_name='auth_tkt',
secure=False,
include_ip=False,
timeout=None,
reissue_time=None,
max_age=None,
path="/",
http_only=False,
wild_domain=True,
):
self.cookie = AuthTktCookieHelper(
secret,
cookie_name=cookie_name,
secure=secure,
include_ip=include_ip,
timeout=timeout,
reissue_time=reissue_time,
max_age=max_age,
http_only=http_only,
path=path,
wild_domain=wild_domain,
)
self.callback = callback
def unauthenticated_userid(self, request):
result = self.cookie.identify(request)
if result:
return result['userid']
def remember(self, request, principal, **kw):
""" Accepts the following kw args: ``max_age``."""
return self.cookie.remember(request, principal, **kw)
def forget(self, request):
return self.cookie.forget(request)
def b64encode(v):
return v.encode('base64').strip().replace('\n', '')
def b64decode(v):
return v.decode('base64')
EXPIRE = object()
[docs]class AuthTktCookieHelper(object):
"""
A helper class for use in third-party authentication policy
implementations. See
:class:`pyramid.authentication.AuthTktAuthenticationPolicy` for the
meanings of the constructor arguments.
"""
auth_tkt = auth_tkt # for tests
now = None # for tests
userid_type_decoders = {
'int':int,
'unicode':lambda x: utf_8_decode(x)[0], # bw compat for old cookies
'b64unicode': lambda x: utf_8_decode(b64decode(x))[0],
'b64str': lambda x: b64decode(x),
}
userid_type_encoders = {
int: ('int', str),
long: ('int', str),
unicode: ('b64unicode', lambda x: b64encode(utf_8_encode(x)[0])),
str: ('b64str', lambda x: b64encode(x)),
}
def __init__(self, secret, cookie_name='auth_tkt', secure=False,
include_ip=False, timeout=None, reissue_time=None,
max_age=None, http_only=False, path="/", wild_domain=True):
self.secret = secret
self.cookie_name = cookie_name
self.include_ip = include_ip
self.secure = secure
self.timeout = timeout
self.reissue_time = reissue_time
self.max_age = max_age
self.http_only = http_only
self.path = path
self.wild_domain = wild_domain
static_flags = []
if self.secure:
static_flags.append('; Secure')
if self.http_only:
static_flags.append('; HttpOnly')
self.static_flags = "".join(static_flags)
def _get_cookies(self, environ, value, max_age=None):
if max_age is EXPIRE:
max_age = "; Max-Age=0; Expires=Wed, 31-Dec-97 23:59:59 GMT"
elif max_age is not None:
later = datetime.datetime.utcnow() + datetime.timedelta(
seconds=int(max_age))
# Wdy, DD-Mon-YY HH:MM:SS GMT
expires = later.strftime('%a, %d %b %Y %H:%M:%S GMT')
# the Expires header is *required* at least for IE7 (IE7 does
# not respect Max-Age)
max_age = "; Max-Age=%s; Expires=%s" % (max_age, expires)
else:
max_age = ''
cur_domain = environ.get('HTTP_HOST', environ.get('SERVER_NAME'))
# While Chrome, IE, and Firefox can cope, Opera (at least) cannot
# cope with a port number in the cookie domain when the URL it
# receives the cookie from does not also have that port number in it
# (e.g via a proxy). In the meantime, HTTP_HOST is sent with port
# number, and neither Firefox nor Chrome do anything with the
# information when it's provided in a cookie domain except strip it
# out. So we strip out any port number from the cookie domain
# aggressively to avoid problems. See also
# https://github.com/Pylons/pyramid/issues/131
if ':' in cur_domain:
cur_domain = cur_domain.split(':', 1)[0]
cookies = [
('Set-Cookie', '%s="%s"; Path=%s%s%s' % (
self.cookie_name, value, self.path, max_age, self.static_flags)),
('Set-Cookie', '%s="%s"; Path=%s; Domain=%s%s%s' % (
self.cookie_name, value, self.path, cur_domain, max_age,
self.static_flags)),
]
if self.wild_domain:
wild_domain = '.' + cur_domain
cookies.append(('Set-Cookie', '%s="%s"; Path=%s; Domain=%s%s%s' % (
self.cookie_name, value, self.path, wild_domain, max_age,
self.static_flags)))
return cookies
[docs] def identify(self, request):
""" Return a dictionary with authentication information, or ``None``
if no valid auth_tkt is attached to ``request``"""
environ = request.environ
cookies = get_cookies(environ)
cookie = cookies.get(self.cookie_name)
if cookie is None or not cookie.value:
return None
if self.include_ip:
remote_addr = environ['REMOTE_ADDR']
else:
remote_addr = '0.0.0.0'
try:
timestamp, userid, tokens, user_data = self.auth_tkt.parse_ticket(
self.secret, cookie.value, remote_addr)
except self.auth_tkt.BadTicket:
return None
now = self.now # service tests
if now is None:
now = time.time()
if self.timeout and ( (timestamp + self.timeout) < now ):
# the auth_tkt data has expired
return None
userid_typename = 'userid_type:'
user_data_info = user_data.split('|')
for datum in filter(None, user_data_info):
if datum.startswith(userid_typename):
userid_type = datum[len(userid_typename):]
decoder = self.userid_type_decoders.get(userid_type)
if decoder:
userid = decoder(userid)
reissue = self.reissue_time is not None
if reissue and not hasattr(request, '_authtkt_reissued'):
if ( (now - timestamp) > self.reissue_time ):
# work around https://github.com/Pylons/pyramid/issues#issue/108
tokens = filter(None, tokens)
headers = self.remember(request, userid, max_age=self.max_age,
tokens=tokens)
add_global_response_headers(request, headers)
request._authtkt_reissued = True
environ['REMOTE_USER_TOKENS'] = tokens
environ['REMOTE_USER_DATA'] = user_data
environ['AUTH_TYPE'] = 'cookie'
identity = {}
identity['timestamp'] = timestamp
identity['userid'] = userid
identity['tokens'] = tokens
identity['userdata'] = user_data
return identity
[docs] def forget(self, request):
""" Return a set of expires Set-Cookie headers, which will destroy
any existing auth_tkt cookie when attached to a response"""
environ = request.environ
return self._get_cookies(environ, '', max_age=EXPIRE)
[docs] def remember(self, request, userid, max_age=None, tokens=()):
""" Return a set of Set-Cookie headers; when set into a response,
these headers will represent a valid authentication ticket.
``max_age``
The max age of the auth_tkt cookie, in seconds. When this value is
set, the cookie's ``Max-Age`` and ``Expires`` settings will be set,
allowing the auth_tkt cookie to last between browser sessions. If
this value is ``None``, the ``max_age`` value provided to the
helper itself will be used as the ``max_age`` value. Default:
``None``.
``tokens``
A sequence of strings that will be placed into the auth_tkt tokens
field. Each string in the sequence must be of the Python ``str``
type and must match the regex ``^[A-Za-z][A-Za-z0-9+_-]*$``.
Tokens are available in the returned identity when an auth_tkt is
found in the request and unpacked. Default: ``()``.
"""
if max_age is None:
max_age = self.max_age
environ = request.environ
if self.include_ip:
remote_addr = environ['REMOTE_ADDR']
else:
remote_addr = '0.0.0.0'
user_data = ''
encoding_data = self.userid_type_encoders.get(type(userid))
if encoding_data:
encoding, encoder = encoding_data
userid = encoder(userid)
user_data = 'userid_type:%s' % encoding
for token in tokens:
if not (isinstance(token, str) and VALID_TOKEN.match(token)):
raise ValueError("Invalid token %r" % (token,))
ticket = self.auth_tkt.AuthTicket(
self.secret,
userid,
remote_addr,
tokens=tokens,
user_data=user_data,
cookie_name=self.cookie_name,
secure=self.secure)
cookie_value = ticket.cookie_value()
return self._get_cookies(environ, cookie_value, max_age)
[docs]class SessionAuthenticationPolicy(CallbackAuthenticationPolicy):
""" A :app:`Pyramid` authentication policy which gets its data from the
configured :term:`session`. For this authentication policy to work, you
will have to follow the instructions in the :ref:`sessions_chapter` to
configure a :term:`session factory`.
Constructor Arguments
``prefix``
A prefix used when storing the authentication parameters in the
session. Defaults to 'auth.'. Optional.
``callback``
Default: ``None``. A callback passed the userid and the
request, expected to return ``None`` if the userid doesn't
exist or a sequence of principal identifiers (possibly empty) if
the user does exist. If ``callback`` is ``None``, the userid
will be assumed to exist with no principals. Optional.
"""
implements(IAuthenticationPolicy)
def __init__(self, prefix='auth.', callback=None):
self.callback = callback
self.prefix = prefix or ''
self.userid_key = prefix + 'userid'
def remember(self, request, principal, **kw):
""" Store a principal in the session."""
request.session[self.userid_key] = principal
return []
def forget(self, request):
""" Remove the stored principal from the session."""
if self.userid_key in request.session:
del request.session[self.userid_key]
return []
def unauthenticated_userid(self, request):
return request.session.get(self.userid_key)