"""
validate `JSON Web Tokens`_ using `OpenID Connect Discovery`_
.. _`JSON Web Tokens`: https://tools.ietf.org/html/rfc7519
.. _`OpenID Connect Discovery`: https://openid.net/specs/openid-connect-discovery-1_0.html
"""
import collections.abc
import functools
import typing
from authlib.jose.jwk import JsonWebKey
from authlib.oauth2.rfc7523 import JWTBearerTokenValidator
from authlib.oidc.discovery.models import OpenIDProviderMetadata
from authlib.oidc.discovery.well_known import get_well_known_url
import requests
[docs]class OIDCBearerTokenValidator(JWTBearerTokenValidator):
"""
Use OIDC Discovery conventions to validate JWTs
"""
[docs] def __init__(
self,
issuer: str,
realm: typing.Union[str, collections.abc.Iterable[str]] = None,
claims_options: collections.abc.Mapping = None,
**extra_attributes: typing.Any) -> None:
self.issuer = issuer
super().__init__(self.fetch_public_key, issuer, realm, **extra_attributes)
if claims_options:
self.claims_options = claims_options
@functools.cached_property
def well_known_url(self) -> str:
"""
see https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderConfig
"""
return get_well_known_url(self.issuer, external=True)
@functools.cached_property
def server_metadata(self) -> OpenIDProviderMetadata:
"""
see https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderConfigurationValidation # noqa: E501
"""
response = requests.get(self.well_known_url)
response.raise_for_status()
metadata = OpenIDProviderMetadata(response.json())
metadata.validate_issuer()
metadata.validate_jwks_uri()
metadata.validate_id_token_signing_alg_values_supported()
if metadata['issuer'] != self.issuer:
raise ValueError('server metadata `issuer` must match base url')
return metadata
@functools.cached_property
def jwk_set(self) -> typing.Union[collections.abc.Mapping, collections.abc.Sequence]:
"""
see https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderMetadata
"""
response = requests.get(self.server_metadata['jwks_uri'])
response.raise_for_status()
return response.json()
[docs] def fetch_public_key(
self,
headers: collections.abc.Mapping,
payload: collections.abc.Mapping) -> collections.abc.Mapping:
"""
see https://tools.ietf.org/html/draft-ietf-jose-json-web-key-41#section-5
"""
# pylint: disable=unused-argument
key_set = JsonWebKey.import_key_set(self.jwk_set)
public_key = key_set.find_by_kid(headers['kid'])
return public_key