
    i!                        S SK Jr  S SKrS SKrS SKJr  S SKJr  S SK	J
r
  S SKJrJr  SSKJrJr  SS	KJr  SS
KJrJr  SSKJr   " S S5      rg)    )annotationsN)	lru_cache)
SSLContext)Any)	HTTPErrorURLError   )PyJWKPyJWKSet)decode_complete)PyJWKClientConnectionErrorPyJWKClientError)JWKSetCachec                      \ rS rSr       S               SS jjrSS jrSSS jjrSSS jjrSS jrSS jr	\
SS	 j5       rS
rg)PyJWKClient   Nc	                   Uc  0 nXl         SU l        X`l        Xpl        Xl        U(       a&  US::  a  [        SU S35      e[        U5      U l        OSU l        U(       a   [        US9" U R                  5      n	Xl        gg)u  A client for retrieving signing keys from a JWKS endpoint.

``PyJWKClient`` uses a two-tier caching system to avoid unnecessary
network requests:

**Tier 1 — JWK Set cache** (enabled by default):
Caches the entire JSON Web Key Set response from the endpoint.
Controlled by:

- ``cache_jwk_set``: Set to ``True`` (the default) to enable this
  cache. When enabled, the JWK Set is fetched from the network only
  when the cache is empty or expired.
- ``lifespan``: Time in seconds before the cached JWK Set expires.
  Defaults to ``300`` (5 minutes). Must be greater than 0.

**Tier 2 — Signing key cache** (disabled by default):
Caches individual signing keys (looked up by ``kid``) using an LRU
cache with **no time-based expiration**. Keys are evicted only when
the cache reaches its maximum size. Controlled by:

- ``cache_keys``: Set to ``True`` to enable this cache.
  Defaults to ``False``.
- ``max_cached_keys``: Maximum number of signing keys to keep in
  the LRU cache. Defaults to ``16``.

:param uri: The URL of the JWKS endpoint.
:type uri: str
:param cache_keys: Enable the per-key LRU cache (Tier 2).
:type cache_keys: bool
:param max_cached_keys: Max entries in the signing key LRU cache.
:type max_cached_keys: int
:param cache_jwk_set: Enable the JWK Set response cache (Tier 1).
:type cache_jwk_set: bool
:param lifespan: TTL in seconds for the JWK Set cache.
:type lifespan: float
:param headers: Optional HTTP headers to include in requests.
:type headers: dict or None
:param timeout: HTTP request timeout in seconds.
:type timeout: float
:param ssl_context: Optional SSL context for the request.
:type ssl_context: ssl.SSLContext or None
Nr   z/Lifespan must be greater than 0, the input is "")maxsize)	urijwk_set_cacheheaderstimeoutssl_contextr   r   r   get_signing_key)
selfr   
cache_keysmax_cached_keyscache_jwk_setlifespanr   r   r   r   s
             Z/home/maestro/MAESTRO/maestro-backend/venv/lib/python3.13/site-packages/jwt/jwks_client.py__init__PyJWKClient.__init__   s    j ?G15& 1}&EhZqQ  "-X!6D!%D'@AUAUVO#2 	     c                |   Sn [         R                  R                  U R                  U R                  S9n[         R                  R                  X R                  U R                  S9 n[        R                  " U5      nSSS5        UU R                  b  U R                  R                  U5        $ $ ! , (       d  f       N9= f! [        [        4 a:  n[        U[        5      (       a  UR!                  5         [#        SU S35      UeSnAff = f! U R                  b  U R                  R                  U5        f f = f)a5  Fetch the JWK Set from the JWKS endpoint.

Makes an HTTP request to the configured ``uri`` and returns the
parsed JSON response. If the JWK Set cache is enabled, the
response is stored in the cache.

:returns: The parsed JWK Set as a dictionary.
:raises PyJWKClientConnectionError: If the HTTP request fails.
N)urlr   )r   contextz'Fail to fetch data from the url, err: "r   )urllibrequestRequestr   r   urlopenr   r   jsonloadr   putr   TimeoutError
isinstancer   closer   )r   jwk_setrresponsees        r!   
fetch_dataPyJWKClient.fetch_data_   s    	0&&488T\\&JA''<<1A1A ( ))H- !!-""&&w/ .  ,' 	!Y''	,9!A>	 !!-""&&w/ .sB   A$C (B2?C D 2
C <C D5DDD +D;c                    SnU R                   b!  U(       d  U R                   R                  5       nUc  U R                  5       n[        U[        5      (       d  [        S5      e[        R                  " U5      $ )a  Return the JWK Set, using the cache when available.

:param refresh: Force a fresh fetch from the endpoint, bypassing
    the cache.
:type refresh: bool
:returns: The JWK Set.
:rtype: PyJWKSet
:raises PyJWKClientError: If the endpoint does not return a JSON
    object.
Nz.The JWKS endpoint did not return a JSON object)r   getr6   r0   dictr   r   	from_dict)r   refreshdatas      r!   get_jwk_setPyJWKClient.get_jwk_set|   sf     )'%%))+D<??$D$%%"#STT!!$''r$   c                    U R                  U5      nUR                   Vs/ s H*  nUR                  S;   d  M  UR                  (       d  M(  UPM,     nnU(       d  [	        S5      eU$ s  snf )a_  Return all signing keys from the JWK Set.

Filters the JWK Set to keys whose ``use`` is ``"sig"`` (or
unspecified) and that have a ``kid``.

:param refresh: Force a fresh fetch from the endpoint, bypassing
    the cache.
:type refresh: bool
:returns: A list of signing keys.
:rtype: list[PyJWK]
:raises PyJWKClientError: If no signing keys are found.
)sigNz2The JWKS endpoint did not contain any signing keys)r>   keyspublic_key_usekey_idr   )r   r<   r2   jwk_set_keysigning_keyss        r!   get_signing_keysPyJWKClient.get_signing_keys   sr     ""7+  '||
+))]: ?J?Q?Q + 	 
 "#WXX
s   A'A'A'c                    U R                  5       nU R                  X!5      nU(       d6  U R                  SS9nU R                  X!5      nU(       d  [        SU S35      eU$ )aY  Return the signing key matching the given ``kid``.

If no match is found in the current JWK Set, the set is
refreshed from the endpoint and the lookup is retried once.

:param kid: The key ID to look up.
:type kid: str
:returns: The matching signing key.
:rtype: PyJWK
:raises PyJWKClientError: If no matching key is found after
    refreshing.
T)r<   z,Unable to find a signing key that matches: "r   )rG   	match_kidr   )r   kidrF   signing_keys       r!   r   PyJWKClient.get_signing_key   sh     ,,.nn\7000>L..;K&B3%qI  r$   c                d    [        USS0S9nUS   nU R                  UR                  S5      5      $ )a  Return the signing key for a JWT by reading its ``kid`` header.

Extracts the ``kid`` from the token's unverified header and
delegates to :meth:`get_signing_key`.

:param token: The encoded JWT.
:type token: str or bytes
:returns: The matching signing key.
:rtype: PyJWK
verify_signatureF)optionsheaderrK   )decode_tokenr   r9   )r   token
unverifiedrQ   s       r!   get_signing_key_from_jwt$PyJWKClient.get_signing_key_from_jwt   s:     "%2De1LM
H%##FJJu$566r$   c                F    SnU  H  nUR                   U:X  d  M  Un  U$    U$ )zFind a key in *signing_keys* that matches *kid*.

:param signing_keys: The list of keys to search.
:type signing_keys: list[PyJWK]
:param kid: The key ID to match.
:type kid: str
:returns: The matching key, or ``None`` if not found.
:rtype: PyJWK or None
N)rD   )rF   rK   rL   keys       r!   rJ   PyJWKClient.match_kid   s5     CzzS !  
 r$   )r   r   r   r   r   r   )Fr   Ti,  N   N)r   strr   boolr   intr   r\   r    floatr   zdict[str, Any] | Noner   r^   r   zSSLContext | None)returnr   )F)r<   r\   r_   r   )r<   r\   r_   list[PyJWK])rK   r[   r_   r
   )rS   zstr | bytesr_   r
   )rF   r`   rK   r[   r_   zPyJWK | None)__name__
__module____qualname____firstlineno__r"   r6   r>   rG   r   rU   staticmethodrJ   __static_attributes__ r$   r!   r   r      s     !!")-)-L3L3 L3 	L3
 L3 L3 'L3 L3 'L3\0:(.287  r$   r   )
__future__r   r,   urllib.requestr(   	functoolsr   sslr   typingr   urllib.errorr   r   api_jwkr
   r   api_jwtr   rR   
exceptionsr   r   r   r   r   rg   r$   r!   <module>rq      s2    "      , $ 4 D &Y Yr$   