Source code for fictive.oidc.validator

"""
validate `JSON Web Tokens`_ using `OpenID Connect Discovery`_

.. _`JSON Web Tokens`: https://tools.ietf.org/html/rfc7519

.. _`OpenID Connect Discovery`: https://openid.net/specs/openid-connect-discovery-1_0.html

"""

import collections.abc
import functools
import typing

from authlib.jose.jwk import JsonWebKey
from authlib.oauth2.rfc7523 import JWTBearerTokenValidator
from authlib.oidc.discovery.models import OpenIDProviderMetadata
from authlib.oidc.discovery.well_known import get_well_known_url
import requests


[docs]class OIDCBearerTokenValidator(JWTBearerTokenValidator): """ Use OIDC Discovery conventions to validate JWTs """
[docs] def __init__( self, issuer: str, realm: typing.Union[str, collections.abc.Iterable[str]] = None, claims_options: collections.abc.Mapping = None, **extra_attributes: typing.Any) -> None: self.issuer = issuer super().__init__(self.fetch_public_key, issuer, realm, **extra_attributes) if claims_options: self.claims_options = claims_options
@functools.cached_property def well_known_url(self) -> str: """ see https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderConfig """ return get_well_known_url(self.issuer, external=True) @functools.cached_property def server_metadata(self) -> OpenIDProviderMetadata: """ see https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderConfigurationValidation # noqa: E501 """ response = requests.get(self.well_known_url) response.raise_for_status() metadata = OpenIDProviderMetadata(response.json()) metadata.validate_issuer() metadata.validate_jwks_uri() metadata.validate_id_token_signing_alg_values_supported() if metadata['issuer'] != self.issuer: raise ValueError('server metadata `issuer` must match base url') return metadata @functools.cached_property def jwk_set(self) -> typing.Union[collections.abc.Mapping, collections.abc.Sequence]: """ see https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderMetadata """ response = requests.get(self.server_metadata['jwks_uri']) response.raise_for_status() return response.json()
[docs] def fetch_public_key( self, headers: collections.abc.Mapping, payload: collections.abc.Mapping) -> collections.abc.Mapping: """ see https://tools.ietf.org/html/draft-ietf-jose-json-web-key-41#section-5 """ # pylint: disable=unused-argument key_set = JsonWebKey.import_key_set(self.jwk_set) public_key = key_set.find_by_kid(headers['kid']) return public_key