GraphRag
/
graphrag-ollama
/lib
/python3.12
/site-packages
/azure
/identity
/_credentials
/certificate.py
| # ------------------------------------ | |
| # Copyright (c) Microsoft Corporation. | |
| # Licensed under the MIT License. | |
| # ------------------------------------ | |
| from binascii import hexlify | |
| from typing import cast, NamedTuple, Union, Dict, Any, Optional | |
| from cryptography import x509 | |
| from cryptography.hazmat.primitives import hashes, serialization | |
| from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey | |
| from cryptography.hazmat.backends import default_backend | |
| from .._internal import validate_tenant_id | |
| from .._internal.client_credential_base import ClientCredentialBase | |
| class CertificateCredential(ClientCredentialBase): | |
| """Authenticates as a service principal using a certificate. | |
| The certificate must have an RSA private key, because this credential signs assertions using RS256. See | |
| `Microsoft Entra ID documentation | |
| <https://learn.microsoft.com/entra/identity-platform/certificate-credentials#register-your-certificate-with-microsoft-identity-platform>`__ | |
| for more information on configuring certificate authentication. | |
| :param str tenant_id: ID of the service principal's tenant. Also called its "directory" ID. | |
| :param str client_id: The service principal's client ID | |
| :param str certificate_path: Optional path to a certificate file in PEM or PKCS12 format, including the private | |
| key. If not provided, **certificate_data** is required. | |
| :keyword str authority: Authority of a Microsoft Entra endpoint, for example "login.microsoftonline.com", | |
| the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.AzureAuthorityHosts` | |
| defines authorities for other clouds. | |
| :keyword bytes certificate_data: The bytes of a certificate in PEM or PKCS12 format, including the private key | |
| :keyword password: The certificate's password. If a unicode string, it will be encoded as UTF-8. If the certificate | |
| requires a different encoding, pass appropriately encoded bytes instead. | |
| :paramtype password: str or bytes | |
| :keyword bool send_certificate_chain: If True, the credential will send the public certificate chain in the x5c | |
| header of each token request's JWT. This is required for Subject Name/Issuer (SNI) authentication. Defaults to | |
| False. | |
| :keyword cache_persistence_options: Configuration for persistent token caching. If unspecified, the credential | |
| will cache tokens in memory. | |
| :paramtype cache_persistence_options: ~azure.identity.TokenCachePersistenceOptions | |
| :keyword bool disable_instance_discovery: Determines whether or not instance discovery is performed when attempting | |
| to authenticate. Setting this to true will completely disable both instance discovery and authority validation. | |
| This functionality is intended for use in scenarios where the metadata endpoint cannot be reached, such as in | |
| private clouds or Azure Stack. The process of instance discovery entails retrieving authority metadata from | |
| https://login.microsoft.com/ to validate the authority. By setting this to **True**, the validation of the | |
| authority is disabled. As a result, it is crucial to ensure that the configured authority host is valid and | |
| trustworthy. | |
| :keyword List[str] additionally_allowed_tenants: Specifies tenants in addition to the specified "tenant_id" | |
| for which the credential may acquire tokens. Add the wildcard value "*" to allow the credential to | |
| acquire tokens for any tenant the application can access. | |
| .. admonition:: Example: | |
| .. literalinclude:: ../samples/credential_creation_code_snippets.py | |
| :start-after: [START create_certificate_credential] | |
| :end-before: [END create_certificate_credential] | |
| :language: python | |
| :dedent: 4 | |
| :caption: Create a CertificateCredential. | |
| """ | |
| def __init__(self, tenant_id: str, client_id: str, certificate_path: Optional[str] = None, **kwargs: Any) -> None: | |
| validate_tenant_id(tenant_id) | |
| client_credential = get_client_credential(certificate_path, **kwargs) | |
| super(CertificateCredential, self).__init__( | |
| client_id=client_id, client_credential=client_credential, tenant_id=tenant_id, **kwargs | |
| ) | |
| def extract_cert_chain(pem_bytes: bytes) -> bytes: | |
| """Extract a certificate chain from a PEM file's bytes, removing line breaks. | |
| :param bytes pem_bytes: The PEM file's bytes | |
| :return: The certificate chain | |
| :rtype: bytes | |
| """ | |
| # if index raises ValueError, there's no PEM-encoded cert | |
| start = pem_bytes.index(b"-----BEGIN CERTIFICATE-----") | |
| footer = b"-----END CERTIFICATE-----" | |
| end = pem_bytes.rindex(footer) | |
| chain = pem_bytes[start : end + len(footer) + 1] | |
| return b"".join(chain.splitlines()) | |
| _Cert = NamedTuple("_Cert", [("pem_bytes", bytes), ("private_key", "Any"), ("fingerprint", bytes)]) | |
| def load_pem_certificate(certificate_data: bytes, password: Optional[bytes] = None) -> _Cert: | |
| private_key = serialization.load_pem_private_key(certificate_data, password, backend=default_backend()) | |
| cert = x509.load_pem_x509_certificate(certificate_data, default_backend()) | |
| fingerprint = cert.fingerprint(hashes.SHA1()) # nosec | |
| return _Cert(certificate_data, private_key, fingerprint) | |
| def load_pkcs12_certificate(certificate_data: bytes, password: Optional[bytes] = None) -> _Cert: | |
| from cryptography.hazmat.primitives.serialization import Encoding, NoEncryption, pkcs12, PrivateFormat | |
| try: | |
| private_key, cert, additional_certs = pkcs12.load_key_and_certificates( | |
| certificate_data, password, backend=default_backend() | |
| ) | |
| except ValueError as ex: | |
| # mentioning PEM here because we raise this error when certificate_data is garbage | |
| raise ValueError("Failed to deserialize certificate in PEM or PKCS12 format") from ex | |
| if not private_key: | |
| raise ValueError("The certificate must include its private key") | |
| if not cert: | |
| raise ValueError("Failed to deserialize certificate in PEM or PKCS12 format") | |
| # This serializes the private key without any encryption it may have had. Doing so doesn't violate security | |
| # boundaries because this representation of the key is kept in memory. We already have the key and its | |
| # password, if any, in memory. | |
| key_bytes = private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()) | |
| pem_sections = [key_bytes] + [c.public_bytes(Encoding.PEM) for c in [cert] + additional_certs] | |
| pem_bytes = b"".join(pem_sections) | |
| fingerprint = cert.fingerprint(hashes.SHA1()) # nosec | |
| return _Cert(pem_bytes, private_key, fingerprint) | |
| def get_client_credential( | |
| certificate_path: Optional[str] = None, | |
| password: Optional[Union[bytes, str]] = None, | |
| certificate_data: Optional[bytes] = None, | |
| send_certificate_chain: bool = False, | |
| **_: Any | |
| ) -> Dict: | |
| """Load a certificate from a filesystem path or bytes, return it as a dict suitable for msal.ClientApplication. | |
| :param str certificate_path: Path to a PEM or PKCS12 certificate file. | |
| :param bytes password: The certificate's password, if any. | |
| :param bytes certificate_data: The PEM or PKCS12 certificate's bytes. | |
| :param bool send_certificate_chain: Whether to send the certificate chain. Defaults to False. | |
| :return: The certificate as a dict | |
| :rtype: dict | |
| """ | |
| if certificate_path: | |
| if certificate_data: | |
| raise ValueError('Please specify either "certificate_path" or "certificate_data", not both') | |
| with open(certificate_path, "rb") as f: | |
| certificate_data = f.read() | |
| elif not certificate_data: | |
| raise ValueError('CertificateCredential requires a value for either "certificate_path" or "certificate_data"') | |
| if password: | |
| # if password is already bytes, no need to encode. | |
| if isinstance(password, str): | |
| password = password.encode("utf-8") | |
| password = cast("Optional[bytes]", password) | |
| if b"-----BEGIN" in certificate_data: | |
| cert = load_pem_certificate(certificate_data, password) | |
| else: | |
| cert = load_pkcs12_certificate(certificate_data, password) | |
| password = None # load_pkcs12_certificate returns cert.pem_bytes decrypted | |
| if not isinstance(cert.private_key, RSAPrivateKey): | |
| raise ValueError("The certificate must have an RSA private key because RS256 is used for signing") | |
| client_credential = {"private_key": cert.pem_bytes, "thumbprint": hexlify(cert.fingerprint).decode("utf-8")} | |
| if password: | |
| client_credential["passphrase"] = password | |
| if send_certificate_chain: | |
| try: | |
| # the JWT needs the whole chain but load_pem_x509_certificate deserializes only the signing cert | |
| chain = extract_cert_chain(cert.pem_bytes) | |
| client_credential["public_certificate"] = chain.decode("utf-8") | |
| except ValueError as ex: | |
| # we shouldn't land here--cryptography already loaded the cert and would have raised if it were malformed | |
| raise ValueError("Malformed certificate") from ex | |
| return client_credential | |