GraphRag
/
graphrag-ollama
/lib
/python3.12
/site-packages
/azure
/identity
/_credentials
/chained.py
| # ------------------------------------ | |
| # Copyright (c) Microsoft Corporation. | |
| # Licensed under the MIT License. | |
| # ------------------------------------ | |
| import logging | |
| from typing import Any, Optional, cast | |
| from azure.core.exceptions import ClientAuthenticationError | |
| from azure.core.credentials import ( | |
| AccessToken, | |
| AccessTokenInfo, | |
| TokenRequestOptions, | |
| SupportsTokenInfo, | |
| TokenCredential, | |
| TokenProvider, | |
| ) | |
| from .. import CredentialUnavailableError | |
| from .._internal import within_credential_chain | |
| _LOGGER = logging.getLogger(__name__) | |
| def _get_error_message(history): | |
| attempts = [] | |
| for credential, error in history: | |
| if error: | |
| attempts.append("{}: {}".format(credential.__class__.__name__, error)) | |
| else: | |
| attempts.append(credential.__class__.__name__) | |
| return """ | |
| Attempted credentials:\n\t{}""".format( | |
| "\n\t".join(attempts) | |
| ) | |
| class ChainedTokenCredential: | |
| """A sequence of credentials that is itself a credential. | |
| Its :func:`get_token` method calls ``get_token`` on each credential in the sequence, in order, returning the first | |
| valid token received. For more information, see | |
| https://aka.ms/azsdk/python/identity/credential-chains#chainedtokencredential-overview. | |
| :param credentials: credential instances to form the chain | |
| :type credentials: ~azure.core.credentials.TokenCredential | |
| .. admonition:: Example: | |
| .. literalinclude:: ../samples/credential_creation_code_snippets.py | |
| :start-after: [START create_chained_token_credential] | |
| :end-before: [END create_chained_token_credential] | |
| :language: python | |
| :dedent: 4 | |
| :caption: Create a ChainedTokenCredential. | |
| """ | |
| def __init__(self, *credentials: TokenProvider) -> None: | |
| if not credentials: | |
| raise ValueError("at least one credential is required") | |
| self._successful_credential: Optional[TokenProvider] = None | |
| self.credentials = credentials | |
| def __enter__(self) -> "ChainedTokenCredential": | |
| for credential in self.credentials: | |
| credential.__enter__() # type: ignore | |
| return self | |
| def __exit__(self, *args: Any) -> None: | |
| for credential in self.credentials: | |
| credential.__exit__(*args) # type: ignore | |
| def close(self) -> None: | |
| """Close the transport session of each credential in the chain.""" | |
| self.__exit__() | |
| def get_token( | |
| self, | |
| *scopes: str, | |
| claims: Optional[str] = None, | |
| tenant_id: Optional[str] = None, | |
| enable_cae: bool = False, | |
| **kwargs: Any, | |
| ) -> AccessToken: | |
| """Request a token from each chained credential, in order, returning the first token received. | |
| If no credential provides a token, raises :class:`azure.core.exceptions.ClientAuthenticationError` | |
| with an error message from each credential. | |
| This method is called automatically by Azure SDK clients. | |
| :param str scopes: desired scopes for the access token. This method requires at least one scope. | |
| For more information about scopes, see | |
| https://learn.microsoft.com/entra/identity-platform/scopes-oidc. | |
| :keyword str claims: additional claims required in the token, such as those returned in a resource provider's | |
| claims challenge following an authorization failure. | |
| :keyword str tenant_id: optional tenant to include in the token request. | |
| :keyword bool enable_cae: indicates whether to enable Continuous Access Evaluation (CAE) for the requested | |
| token. Defaults to False. | |
| :return: An access token with the desired scopes. | |
| :rtype: ~azure.core.credentials.AccessToken | |
| :raises ~azure.core.exceptions.ClientAuthenticationError: no credential in the chain provided a token | |
| """ | |
| within_credential_chain.set(True) | |
| history = [] | |
| for credential in self.credentials: | |
| try: | |
| # Prioritize "get_token". Fall back to "get_token_info" if not available. | |
| if hasattr(credential, "get_token"): | |
| token = cast(TokenCredential, credential).get_token( | |
| *scopes, claims=claims, tenant_id=tenant_id, enable_cae=enable_cae, **kwargs | |
| ) | |
| else: | |
| options: TokenRequestOptions = {} | |
| if claims: | |
| options["claims"] = claims | |
| if tenant_id: | |
| options["tenant_id"] = tenant_id | |
| options["enable_cae"] = enable_cae | |
| token_info = cast(SupportsTokenInfo, credential).get_token_info(*scopes, options=options) | |
| token = AccessToken(token_info.token, token_info.expires_on) | |
| _LOGGER.info("%s acquired a token from %s", self.__class__.__name__, credential.__class__.__name__) | |
| self._successful_credential = credential | |
| within_credential_chain.set(False) | |
| return token | |
| except CredentialUnavailableError as ex: | |
| # credential didn't attempt authentication because it lacks required data or state -> continue | |
| history.append((credential, ex.message)) | |
| except Exception as ex: # pylint: disable=broad-except | |
| # credential failed to authenticate, or something unexpectedly raised -> break | |
| history.append((credential, str(ex))) | |
| _LOGGER.debug( | |
| '%s.get_token failed: %s raised unexpected error "%s"', | |
| self.__class__.__name__, | |
| credential.__class__.__name__, | |
| ex, | |
| exc_info=True, | |
| ) | |
| break | |
| within_credential_chain.set(False) | |
| attempts = _get_error_message(history) | |
| message = ( | |
| self.__class__.__name__ | |
| + " failed to retrieve a token from the included credentials." | |
| + attempts | |
| + "\nTo mitigate this issue, please refer to the troubleshooting guidelines here at " | |
| "https://aka.ms/azsdk/python/identity/defaultazurecredential/troubleshoot." | |
| ) | |
| _LOGGER.warning(message) | |
| raise ClientAuthenticationError(message=message) | |
| def get_token_info(self, *scopes: str, options: Optional[TokenRequestOptions] = None) -> AccessTokenInfo: | |
| """Request a token from each chained credential, in order, returning the first token received. | |
| If no credential provides a token, raises :class:`azure.core.exceptions.ClientAuthenticationError` | |
| with an error message from each credential. | |
| This is an alternative to `get_token` to enable certain scenarios that require additional properties | |
| on the token. This method is called automatically by Azure SDK clients. | |
| :param str scopes: desired scopes for the access token. This method requires at least one scope. | |
| For more information about scopes, see https://learn.microsoft.com/entra/identity-platform/scopes-oidc. | |
| :keyword options: A dictionary of options for the token request. Unknown options will be ignored. Optional. | |
| :paramtype options: ~azure.core.credentials.TokenRequestOptions | |
| :rtype: AccessTokenInfo | |
| :return: An AccessTokenInfo instance containing information about the token. | |
| :raises ~azure.core.exceptions.ClientAuthenticationError: no credential in the chain provided a token. | |
| """ | |
| within_credential_chain.set(True) | |
| history = [] | |
| options = options or {} | |
| for credential in self.credentials: | |
| try: | |
| # Prioritize "get_token_info". Fall back to "get_token" if not available. | |
| if hasattr(credential, "get_token_info"): | |
| token_info = cast(SupportsTokenInfo, credential).get_token_info(*scopes, options=options) | |
| else: | |
| if options.get("pop"): | |
| raise CredentialUnavailableError( | |
| "Proof of possession arguments are not supported for this credential." | |
| ) | |
| token = cast(TokenCredential, credential).get_token(*scopes, **options) | |
| token_info = AccessTokenInfo(token=token.token, expires_on=token.expires_on) | |
| _LOGGER.info("%s acquired a token from %s", self.__class__.__name__, credential.__class__.__name__) | |
| self._successful_credential = credential | |
| within_credential_chain.set(False) | |
| return token_info | |
| except CredentialUnavailableError as ex: | |
| # credential didn't attempt authentication because it lacks required data or state -> continue | |
| history.append((credential, ex.message)) | |
| except Exception as ex: # pylint: disable=broad-except | |
| # credential failed to authenticate, or something unexpectedly raised -> break | |
| history.append((credential, str(ex))) | |
| _LOGGER.debug( | |
| '%s.get_token_info failed: %s raised unexpected error "%s"', | |
| self.__class__.__name__, | |
| credential.__class__.__name__, | |
| ex, | |
| exc_info=True, | |
| ) | |
| break | |
| within_credential_chain.set(False) | |
| attempts = _get_error_message(history) | |
| message = ( | |
| self.__class__.__name__ | |
| + " failed to retrieve a token from the included credentials." | |
| + attempts | |
| + "\nTo mitigate this issue, please refer to the troubleshooting guidelines here at " | |
| "https://aka.ms/azsdk/python/identity/defaultazurecredential/troubleshoot." | |
| ) | |
| _LOGGER.warning(message) | |
| raise ClientAuthenticationError(message=message) | |