GraphRag
/
graphrag-ollama
/lib
/python3.12
/site-packages
/azure
/identity
/_credentials
/vscode.py
| # ------------------------------------ | |
| # Copyright (c) Microsoft Corporation. | |
| # Licensed under the MIT License. | |
| # ------------------------------------ | |
| import abc | |
| import os | |
| import sys | |
| from typing import cast, Any, Dict, Optional | |
| from azure.core.credentials import AccessToken, TokenRequestOptions, AccessTokenInfo | |
| from azure.core.exceptions import ClientAuthenticationError | |
| from .._exceptions import CredentialUnavailableError | |
| from .._constants import AzureAuthorityHosts, AZURE_VSCODE_CLIENT_ID, EnvironmentVariables | |
| from .._internal import normalize_authority, validate_tenant_id, within_dac | |
| from .._internal.aad_client import AadClient, AadClientBase | |
| from .._internal.get_token_mixin import GetTokenMixin | |
| from .._internal.decorators import log_get_token | |
| if sys.platform.startswith("win"): | |
| from .._internal.win_vscode_adapter import get_refresh_token, get_user_settings | |
| elif sys.platform.startswith("darwin"): | |
| from .._internal.macos_vscode_adapter import get_refresh_token, get_user_settings | |
| else: | |
| from .._internal.linux_vscode_adapter import get_refresh_token, get_user_settings | |
| class _VSCodeCredentialBase(abc.ABC): | |
| def __init__(self, **kwargs: Any) -> None: | |
| super(_VSCodeCredentialBase, self).__init__() | |
| user_settings = get_user_settings() | |
| self._cloud = user_settings.get("azure.cloud", "AzureCloud") | |
| self._refresh_token = None | |
| self._unavailable_reason = "" | |
| self._client = kwargs.get("_client") | |
| if not self._client: | |
| self._initialize(user_settings, **kwargs) | |
| if not (self._client or self._unavailable_reason): | |
| self._unavailable_reason = "Initialization failed" | |
| def _get_client(self, **kwargs: Any) -> AadClientBase: | |
| pass | |
| def _get_refresh_token(self) -> str: | |
| if not self._refresh_token: | |
| self._refresh_token = get_refresh_token(self._cloud) | |
| if not self._refresh_token: | |
| message = ( | |
| "Failed to get Azure user details from Visual Studio Code. " | |
| "Currently, the VisualStudioCodeCredential only works with the Azure " | |
| "Account extension version 0.9.11 and earlier. A long-term fix is in " | |
| "progress, see https://github.com/Azure/azure-sdk-for-python/issues/25713" | |
| ) | |
| raise CredentialUnavailableError(message=message) | |
| return self._refresh_token | |
| def _initialize(self, vscode_user_settings: Dict, **kwargs: Any) -> None: | |
| """Build a client from kwargs merged with VS Code user settings. | |
| The first stable version of this credential defaulted to Public Cloud and the "organizations" | |
| tenant when it failed to read VS Code user settings. That behavior is preserved here. | |
| :param dict vscode_user_settings: VS Code user settings | |
| """ | |
| # Precedence for authority: | |
| # 1) VisualStudioCodeCredential(authority=...) | |
| # 2) $AZURE_AUTHORITY_HOST | |
| # 3) authority matching VS Code's "azure.cloud" setting | |
| # 4) default: Public Cloud | |
| authority = kwargs.pop("authority", None) or os.environ.get(EnvironmentVariables.AZURE_AUTHORITY_HOST) | |
| if not authority: | |
| # the application didn't specify an authority, so we figure it out from VS Code settings | |
| if self._cloud == "AzureCloud": | |
| authority = AzureAuthorityHosts.AZURE_PUBLIC_CLOUD | |
| elif self._cloud == "AzureChinaCloud": | |
| authority = AzureAuthorityHosts.AZURE_CHINA | |
| elif self._cloud == "AzureUSGovernment": | |
| authority = AzureAuthorityHosts.AZURE_GOVERNMENT | |
| else: | |
| # If the value is anything else ("AzureCustomCloud" is the only other known value), | |
| # we need the user to provide the authority because VS Code has no setting for it and | |
| # we can't guess confidently. | |
| self._unavailable_reason = ( | |
| 'VS Code is configured to use a custom cloud. Set keyword argument "authority"' | |
| + ' with the Microsoft Entra endpoint for cloud "{}"'.format(self._cloud) | |
| ) | |
| return | |
| # Precedence for tenant ID: | |
| # 1) VisualStudioCodeCredential(tenant_id=...) | |
| # 2) "azure.tenant" in VS Code user settings | |
| # 3) default: organizations | |
| tenant_id = kwargs.pop("tenant_id", None) or vscode_user_settings.get("azure.tenant", "organizations") | |
| validate_tenant_id(tenant_id) | |
| if tenant_id.lower() == "adfs": | |
| self._unavailable_reason = "VisualStudioCodeCredential authentication unavailable. ADFS is not supported." | |
| return | |
| self._client = self._get_client( | |
| authority=normalize_authority(authority), client_id=AZURE_VSCODE_CLIENT_ID, tenant_id=tenant_id, **kwargs | |
| ) | |
| class VisualStudioCodeCredential(_VSCodeCredentialBase, GetTokenMixin): | |
| """Authenticates as the Azure user signed in to Visual Studio Code via the 'Azure Account' extension. | |
| It's a `known issue <https://github.com/Azure/azure-sdk-for-python/issues/23249>`_ that this credential doesn't | |
| work with `Azure Account extension <https://marketplace.visualstudio.com/items?itemName=ms-vscode.azure-account>`_ | |
| versions newer than **0.9.11**. A long-term fix to this problem is in progress. In the meantime, consider | |
| authenticating with :class:`AzureCliCredential`. | |
| :keyword str authority: Authority of a Microsoft Entra endpoint, for example "login.microsoftonline.com". | |
| This argument is required for a custom cloud and usually unnecessary otherwise. Defaults to the authority | |
| matching the "Azure: Cloud" setting in VS Code's user settings or, when that setting has no value, the | |
| authority for Azure Public Cloud. | |
| :keyword str tenant_id: ID of the tenant the credential should authenticate in. Defaults to the "Azure: Tenant" | |
| setting in VS Code's user settings or, when that setting has no value, the "organizations" tenant, which | |
| supports only Microsoft Entra work or school accounts. | |
| :keyword List[str] additionally_allowed_tenants: Specifies tenants in addition to the specified "tenant_id" | |
| for which the credential may acquire tokens. Add the wildcard value "*" to allow the credential to | |
| acquire tokens for any tenant the application can access. | |
| """ | |
| def __enter__(self) -> "VisualStudioCodeCredential": | |
| if self._client: | |
| self._client.__enter__() | |
| return self | |
| def __exit__(self, *args: Any) -> None: | |
| if self._client: | |
| self._client.__exit__(*args) | |
| def close(self) -> None: | |
| """Close the credential's transport session.""" | |
| self.__exit__() | |
| def get_token( | |
| self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, **kwargs: Any | |
| ) -> AccessToken: | |
| """Request an access token for `scopes` as the user currently signed in to Visual Studio Code. | |
| This method is called automatically by Azure SDK clients. | |
| :param str scopes: desired scopes for the access token. This method requires at least one scope. | |
| For more information about scopes, see | |
| https://learn.microsoft.com/entra/identity-platform/scopes-oidc. | |
| :keyword str claims: additional claims required in the token, such as those returned in a resource provider's | |
| claims challenge following an authorization failure. | |
| :keyword str tenant_id: optional tenant to include in the token request. | |
| :return: An access token with the desired scopes. | |
| :rtype: ~azure.core.credentials.AccessToken | |
| :raises ~azure.identity.CredentialUnavailableError: the credential cannot retrieve user details from Visual | |
| Studio Code | |
| """ | |
| if self._unavailable_reason: | |
| error_message = ( | |
| self._unavailable_reason + "\n" | |
| "Visit https://aka.ms/azsdk/python/identity/vscodecredential/troubleshoot" | |
| " to troubleshoot this issue." | |
| ) | |
| raise CredentialUnavailableError(message=error_message) | |
| if within_dac.get(): | |
| try: | |
| token = super().get_token(*scopes, claims=claims, tenant_id=tenant_id, **kwargs) | |
| return token | |
| except ClientAuthenticationError as ex: | |
| raise CredentialUnavailableError(message=ex.message) from ex | |
| return super().get_token(*scopes, claims=claims, tenant_id=tenant_id, **kwargs) | |
| def get_token_info(self, *scopes: str, options: Optional[TokenRequestOptions] = None) -> AccessTokenInfo: | |
| """Request an access token for `scopes` as the user currently signed in to Visual Studio Code. | |
| This is an alternative to `get_token` to enable certain scenarios that require additional properties | |
| on the token. This method is called automatically by Azure SDK clients. | |
| :param str scopes: desired scopes for the access token. This method requires at least one scope. | |
| For more information about scopes, see https://learn.microsoft.com/entra/identity-platform/scopes-oidc. | |
| :keyword options: A dictionary of options for the token request. Unknown options will be ignored. Optional. | |
| :paramtype options: ~azure.core.credentials.TokenRequestOptions | |
| :rtype: AccessTokenInfo | |
| :return: An AccessTokenInfo instance containing information about the token. | |
| :raises ~azure.identity.CredentialUnavailableError: the credential cannot retrieve user details from Visual | |
| Studio Code. | |
| """ | |
| if self._unavailable_reason: | |
| error_message = ( | |
| self._unavailable_reason + "\n" | |
| "Visit https://aka.ms/azsdk/python/identity/vscodecredential/troubleshoot" | |
| " to troubleshoot this issue." | |
| ) | |
| raise CredentialUnavailableError(message=error_message) | |
| if within_dac.get(): | |
| try: | |
| token = super().get_token_info(*scopes, options=options) | |
| return token | |
| except ClientAuthenticationError as ex: | |
| raise CredentialUnavailableError(message=ex.message) from ex | |
| return super().get_token_info(*scopes, options=options) | |
| def _acquire_token_silently(self, *scopes: str, **kwargs: Any) -> Optional[AccessTokenInfo]: | |
| self._client = cast(AadClient, self._client) | |
| return self._client.get_cached_access_token(scopes, **kwargs) | |
| def _request_token(self, *scopes: str, **kwargs: Any) -> AccessTokenInfo: | |
| refresh_token = self._get_refresh_token() | |
| self._client = cast(AadClient, self._client) | |
| return self._client.obtain_token_by_refresh_token(scopes, refresh_token, **kwargs) | |
| def _get_client(self, **kwargs: Any) -> AadClient: | |
| return AadClient(**kwargs) | |