from codecs import utf_8_decode
from codecs import utf_8_encode
import datetime
import re
import time
from paste.auth import auth_tkt
from paste.request import get_cookies
from zope.interface import implements
from pyramid.interfaces import IAuthenticationPolicy
from pyramid.request import add_global_response_headers
from pyramid.security import Authenticated
from pyramid.security import Everyone
VALID_TOKEN = re.compile(r"^[A-Za-z][A-Za-z0-9+_-]*$")
class CallbackAuthenticationPolicy(object):
""" Abstract class """
def authenticated_userid(self, request):
userid = self.unauthenticated_userid(request)
if userid is None:
return None
if self.callback is None:
return userid
if self.callback(userid, request) is not None: # is not None!
return userid
def effective_principals(self, request):
effective_principals = [Everyone]
userid = self.unauthenticated_userid(request)
if userid is None:
return effective_principals
if self.callback is None:
groups = []
else:
groups = self.callback(userid, request)
if groups is None: # is None!
return effective_principals
effective_principals.append(Authenticated)
effective_principals.append(userid)
effective_principals.extend(groups)
return effective_principals
[docs]class RepozeWho1AuthenticationPolicy(CallbackAuthenticationPolicy):
""" A :app:`Pyramid` :term:`authentication policy` which
obtains data from the :mod:`repoze.who` 1.X WSGI 'API' (the
``repoze.who.identity`` key in the WSGI environment).
Constructor Arguments
``identifier_name``
Default: ``auth_tkt``. The :mod:`repoze.who` plugin name that
performs remember/forget. Optional.
``callback``
Default: ``None``. A callback passed the :mod:`repoze.who`
identity and the :term:`request`, expected to return ``None``
if the user represented by the identity doesn't exist or a
sequence of group identifiers (possibly empty) if the user
does exist. If ``callback`` is None, the userid will be
assumed to exist with no groups.
"""
implements(IAuthenticationPolicy)
def __init__(self, identifier_name='auth_tkt', callback=None):
self.identifier_name = identifier_name
self.callback = callback
def _get_identity(self, request):
return request.environ.get('repoze.who.identity')
def _get_identifier(self, request):
plugins = request.environ.get('repoze.who.plugins')
if plugins is None:
return None
identifier = plugins[self.identifier_name]
return identifier
def authenticated_userid(self, request):
identity = self._get_identity(request)
if identity is None:
return None
if self.callback is None:
return identity['repoze.who.userid']
if self.callback(identity, request) is not None: # is not None!
return identity['repoze.who.userid']
def unauthenticated_userid(self, request):
identity = self._get_identity(request)
if identity is None:
return None
return identity['repoze.who.userid']
def effective_principals(self, request):
effective_principals = [Everyone]
identity = self._get_identity(request)
if identity is None:
return effective_principals
if self.callback is None:
groups = []
else:
groups = self.callback(identity, request)
if groups is None: # is None!
return effective_principals
userid = identity['repoze.who.userid']
effective_principals.append(Authenticated)
effective_principals.append(userid)
effective_principals.extend(groups)
return effective_principals
def remember(self, request, principal, **kw):
identifier = self._get_identifier(request)
if identifier is None:
return []
environ = request.environ
identity = {'repoze.who.userid':principal}
return identifier.remember(environ, identity)
def forget(self, request):
identifier = self._get_identifier(request)
if identifier is None:
return []
identity = self._get_identity(request)
return identifier.forget(request.environ, identity)
[docs]class RemoteUserAuthenticationPolicy(CallbackAuthenticationPolicy):
""" A :app:`Pyramid` :term:`authentication policy` which
obtains data from the ``REMOTE_USER`` WSGI environment variable.
Constructor Arguments
``environ_key``
Default: ``REMOTE_USER``. The key in the WSGI environ which
provides the userid.
``callback``
Default: ``None``. A callback passed the userid and the request,
expected to return None if the userid doesn't exist or a sequence
of group identifiers (possibly empty) if the user does exist.
If ``callback`` is None, the userid will be assumed to exist with no
groups.
"""
implements(IAuthenticationPolicy)
def __init__(self, environ_key='REMOTE_USER', callback=None):
self.environ_key = environ_key
self.callback = callback
def unauthenticated_userid(self, request):
return request.environ.get(self.environ_key)
def remember(self, request, principal, **kw):
return []
def forget(self, request):
return []
[docs]class AuthTktAuthenticationPolicy(CallbackAuthenticationPolicy):
""" A :app:`Pyramid` :term:`authentication policy` which
obtains data from an :class:`paste.auth.auth_tkt` cookie.
Constructor Arguments
``secret``
The secret (a string) used for auth_tkt cookie signing.
Required.
``callback``
Default: ``None``. A callback passed the userid and the
request, expected to return ``None`` if the userid doesn't
exist or a sequence of group identifiers (possibly empty) if
the user does exist. If ``callback`` is ``None``, the userid
will be assumed to exist with no groups. Optional.
``cookie_name``
Default: ``auth_tkt``. The cookie name used
(string). Optional.
``secure``
Default: ``False``. Only send the cookie back over a secure
conn. Optional.
``include_ip``
Default: ``False``. Make the requesting IP address part of
the authentication data in the cookie. Optional.
``timeout``
Default: ``None``. Maximum number of seconds which a newly
issued ticket will be considered valid. After this amount of
time, the ticket will expire (effectively logging the user
out). If this value is ``None``, the ticket never expires.
Optional.
``reissue_time``
Default: ``None``. If this parameter is set, it represents the
number of seconds that must pass before an authentication token
cookie is reissued. The duration is measured as the number of
seconds since the last auth_tkt cookie was issued and 'now'.
If the ``timeout`` value is ``None``, this parameter has no
effect. If this parameter is provided, and the value of
``timeout`` is not ``None``, the value of ``reissue_time`` must
be smaller than value of ``timeout``. A good rule of thumb: if
you want auto-reissued cookies: set this to the ``timeout``
value divided by ten. If this value is ``0``, a new ticket
cookie will be reissued on every request which needs
authentication. Optional.
``max_age``
Default: ``None``. The max age of the auth_tkt cookie, in
seconds. This differs from ``timeout`` inasmuch as ``timeout``
represents the lifetime of the ticket contained in the cookie,
while this value represents the lifetime of the cookie itself.
When this value is set, the cookie's ``Max-Age`` and
``Expires`` settings will be set, allowing the auth_tkt cookie
to last between browser sessions. It is typically nonsensical
to set this to a value that is lower than ``timeout`` or
``reissue_time``, although it is not explicitly prevented.
Optional.
``path``
Default: ``/``. The path for which the auth_tkt cookie is valid.
May be desirable if the application only serves part of a domain.
Optional.
``http_only``
Default: ``False``. Hide cookie from JavaScript by setting the
HttpOnly flag. Not honored by all browsers.
Optional.
``wild_domain``
Default: ``True``. An auth_tkt cookie will be generated for the
wildcard domain.
Optional.
"""
implements(IAuthenticationPolicy)
def __init__(self,
secret,
callback=None,
cookie_name='auth_tkt',
secure=False,
include_ip=False,
timeout=None,
reissue_time=None,
max_age=None,
path="/",
http_only=False,
wild_domain=True,
):
self.cookie = AuthTktCookieHelper(
secret,
cookie_name=cookie_name,
secure=secure,
include_ip=include_ip,
timeout=timeout,
reissue_time=reissue_time,
max_age=max_age,
http_only=http_only,
path=path,
wild_domain=wild_domain,
)
self.callback = callback
def unauthenticated_userid(self, request):
result = self.cookie.identify(request)
if result:
return result['userid']
def remember(self, request, principal, **kw):
""" Accepts the following kw args: ``max_age``."""
return self.cookie.remember(request, principal, **kw)
def forget(self, request):
return self.cookie.forget(request)
def b64encode(v):
return v.encode('base64').strip().replace('\n', '')
def b64decode(v):
return v.decode('base64')
EXPIRE = object()
[docs]class AuthTktCookieHelper(object):
"""
A helper class for use in third-party authentication policy
implementations. See
:class:`pyramid.authentication.AuthTktAuthenticationPolicy` for the
meanings of the constructor arguments.
"""
auth_tkt = auth_tkt # for tests
now = None # for tests
userid_type_decoders = {
'int':int,
'unicode':lambda x: utf_8_decode(x)[0], # bw compat for old cookies
'b64unicode': lambda x: utf_8_decode(b64decode(x))[0],
'b64str': lambda x: b64decode(x),
}
userid_type_encoders = {
int: ('int', str),
long: ('int', str),
unicode: ('b64unicode', lambda x: b64encode(utf_8_encode(x)[0])),
str: ('b64str', lambda x: b64encode(x)),
}
def __init__(self, secret, cookie_name='auth_tkt', secure=False,
include_ip=False, timeout=None, reissue_time=None,
max_age=None, http_only=False, path="/", wild_domain=True):
self.secret = secret
self.cookie_name = cookie_name
self.include_ip = include_ip
self.secure = secure
self.timeout = timeout
if reissue_time is not None and timeout is not None:
if reissue_time > timeout:
raise ValueError('reissue_time must be lower than timeout')
self.reissue_time = reissue_time
self.max_age = max_age
self.http_only = http_only
self.path = path
self.wild_domain = wild_domain
static_flags = []
if self.secure:
static_flags.append('; Secure')
if self.http_only:
static_flags.append('; HttpOnly')
self.static_flags = "".join(static_flags)
def _get_cookies(self, environ, value, max_age=None):
if max_age is EXPIRE:
max_age = "; Max-Age=0; Expires=Wed, 31-Dec-97 23:59:59 GMT"
elif max_age is not None:
later = datetime.datetime.utcnow() + datetime.timedelta(
seconds=int(max_age))
# Wdy, DD-Mon-YY HH:MM:SS GMT
expires = later.strftime('%a, %d %b %Y %H:%M:%S GMT')
# the Expires header is *required* at least for IE7 (IE7 does
# not respect Max-Age)
max_age = "; Max-Age=%s; Expires=%s" % (max_age, expires)
else:
max_age = ''
cur_domain = environ.get('HTTP_HOST', environ.get('SERVER_NAME'))
cookies = [
('Set-Cookie', '%s="%s"; Path=%s%s%s' % (
self.cookie_name, value, self.path, max_age, self.static_flags)),
('Set-Cookie', '%s="%s"; Path=%s; Domain=%s%s%s' % (
self.cookie_name, value, self.path, cur_domain, max_age,
self.static_flags)),
]
if self.wild_domain:
wild_domain = '.' + cur_domain
cookies.append(('Set-Cookie', '%s="%s"; Path=%s; Domain=%s%s%s' % (
self.cookie_name, value, self.path, wild_domain, max_age,
self.static_flags)))
return cookies
def identify(self, request):
""" Return a dictionary with authentication information, or ``None``
if no valid auth_tkt is attached to ``request``"""
environ = request.environ
cookies = get_cookies(environ)
cookie = cookies.get(self.cookie_name)
if cookie is None or not cookie.value:
return None
if self.include_ip:
remote_addr = environ['REMOTE_ADDR']
else:
remote_addr = '0.0.0.0'
try:
timestamp, userid, tokens, user_data = self.auth_tkt.parse_ticket(
self.secret, cookie.value, remote_addr)
except self.auth_tkt.BadTicket:
return None
now = self.now # service tests
if now is None:
now = time.time()
if self.timeout and ( (timestamp + self.timeout) < now ):
return None
userid_typename = 'userid_type:'
user_data_info = user_data.split('|')
for datum in filter(None, user_data_info):
if datum.startswith(userid_typename):
userid_type = datum[len(userid_typename):]
decoder = self.userid_type_decoders.get(userid_type)
if decoder:
userid = decoder(userid)
reissue = self.reissue_time is not None
if not hasattr(request, '_authtkt_reissued'):
if reissue and ( (now - timestamp) > self.reissue_time):
# work around https://github.com/Pylons/pyramid/issues#issue/108
tokens = filter(None, tokens)
headers = self.remember(request, userid, max_age=self.max_age,
tokens=tokens)
add_global_response_headers(request, headers)
request._authtkt_reissued = True
environ['REMOTE_USER_TOKENS'] = tokens
environ['REMOTE_USER_DATA'] = user_data
environ['AUTH_TYPE'] = 'cookie'
identity = {}
identity['timestamp'] = timestamp
identity['userid'] = userid
identity['tokens'] = tokens
identity['userdata'] = user_data
return identity
def forget(self, request):
""" Return a set of expires Set-Cookie headers, which will destroy
any existing auth_tkt cookie when attached to a response"""
environ = request.environ
return self._get_cookies(environ, '', max_age=EXPIRE)
def remember(self, request, userid, max_age=None, tokens=()):
""" Return a set of Set-Cookie headers; when set into a response,
these headers will represent a valid authentication ticket.
``max_age``
The max age of the auth_tkt cookie, in seconds. When this value is
set, the cookie's ``Max-Age`` and ``Expires`` settings will be set,
allowing the auth_tkt cookie to last between browser sessions.
Default: ``None``.
``tokens``
A sequence of strings that will be placed into the auth_tkt tokens
field. Each string in the sequence must be of the Python ``str``
type and must match the regex ``^[A-Za-z][A-Za-z0-9+_-]*$``.
Tokens are available in the returned identity when an auth_tkt is
found in the request and unpacked. Default: ``()``.
"""
max_age = max_age or self.max_age
environ = request.environ
if self.include_ip:
remote_addr = environ['REMOTE_ADDR']
else:
remote_addr = '0.0.0.0'
user_data = ''
encoding_data = self.userid_type_encoders.get(type(userid))
if encoding_data:
encoding, encoder = encoding_data
userid = encoder(userid)
user_data = 'userid_type:%s' % encoding
for token in tokens:
if not (isinstance(token, str) and VALID_TOKEN.match(token)):
raise ValueError("Invalid token %r" % (token,))
ticket = self.auth_tkt.AuthTicket(
self.secret,
userid,
remote_addr,
tokens=tokens,
user_data=user_data,
cookie_name=self.cookie_name,
secure=self.secure)
cookie_value = ticket.cookie_value()
return self._get_cookies(environ, cookie_value, max_age)