Aliyun Python SDK for Key Management Service (KMS)
The `aliyun-python-sdk-kms` library provides Python bindings for Alibaba Cloud's Key Management Service (KMS), enabling users to create, manage, and use encryption keys for their data. It allows operations like encryption, decryption, and key management within the Aliyun ecosystem. The current version is 2.16.5, with updates typically tied to Alibaba Cloud API releases.
Warnings
- breaking API Version Changes: The import paths for KMS requests include an API version (e.g., `v20160120`). If Alibaba Cloud introduces a new major API version or deprecates an old one, these import paths will break, requiring an update to the code.
- gotcha Response Handling: The `client.do_action_with_exception` method returns a `bytes` object, not a Python dictionary. It needs to be decoded from UTF-8 and then parsed as JSON.
- gotcha Region and Endpoint Configuration: Incorrect `REGION_ID` during client initialization, or an explicit `request.set_endpoint()` pointing to a wrong URL, can lead to `ServiceUnavailable` or `EndpointNotFound` errors.
- gotcha Encryption Context Mismatch: If an `EncryptionContext` is provided during encryption (via `set_EncryptionContext`), the exact same context *must* be provided during decryption for the operation to succeed. A mismatch will result in decryption failure.
- gotcha KMS Key Identifier: Distinguish between Key ID (e.g., `arn:acs:kms:cn-hangzhou:123456789:key/your-key-id`) and Key Alias (e.g., `alias/your-alias`). Ensure you are using the correct identifier where expected.
Install
-
pip install aliyun-python-sdk-kms
Imports
- AcsClient
from aliyunsdkcore.client import AcsClient
- EncryptRequest
from aliyunsdkkms.request.v20160120 import EncryptRequest
- DecryptRequest
from aliyunsdkkms.request.v20160120 import DecryptRequest
Quickstart
import os
import json
from aliyunsdkcore.client import AcsClient
from aliyunsdkkms.request.v20160102 import EncryptRequest, DecryptRequest
# Configuration from environment variables
# It's highly recommended to set these environment variables.
ACCESS_KEY_ID = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', 'YOUR_ACCESS_KEY_ID')
ACCESS_KEY_SECRET = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', 'YOUR_ACCESS_KEY_SECRET')
REGION_ID = os.environ.get('ALIBABA_CLOUD_REGION_ID', 'cn-hangzhou') # e.g., cn-hangzhou, us-west-1
KMS_KEY_ID = os.environ.get('ALIBABA_CLOUD_KMS_KEY_ID', 'alias/example_key') # Replace with your KMS Key ID or alias
if ACCESS_KEY_ID == 'YOUR_ACCESS_KEY_ID' or ACCESS_KEY_SECRET == 'YOUR_ACCESS_KEY_SECRET':
print("Warning: Please set ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables.")
print("The example will attempt to run with mock credentials, leading to API authentication errors.")
if KMS_KEY_ID == 'alias/example_key':
print("Warning: Please set ALIBABA_CLOUD_KMS_KEY_ID environment variable for actual KMS operations.")
print("The example will use a placeholder key ID, which will likely cause a 'Key not found' error.")
try:
# Initialize the KMS client
client = AcsClient(ACCESS_KEY_ID, ACCESS_KEY_SECRET, REGION_ID)
print(f"KMS client initialized for region: {REGION_ID}")
# 1. Encrypt a plaintext
plaintext_to_encrypt = "This is a secret message to be encrypted by KMS."
encrypt_request = EncryptRequest.EncryptRequest()
encrypt_request.set_KeyId(KMS_KEY_ID)
encrypt_request.set_Plaintext(plaintext_to_encrypt)
# encrypt_request.set_EncryptionContext(json.dumps({'purpose': 'test'})) # Optional context
print(f"\nAttempting to encrypt: '{plaintext_to_encrypt}' with Key ID: '{KMS_KEY_ID}'")
encrypt_response_bytes = client.do_action_with_exception(encrypt_request)
encrypt_response_data = json.loads(encrypt_response_bytes.decode('utf-8'))
ciphertext_blob = encrypt_response_data.get('CiphertextBlob')
print(f"Encryption successful. CiphertextBlob (truncated): {ciphertext_blob[:50]}...")
# 2. Decrypt the ciphertext
if ciphertext_blob:
decrypt_request = DecryptRequest.DecryptRequest()
decrypt_request.set_CiphertextBlob(ciphertext_blob)
# decrypt_request.set_EncryptionContext(json.dumps({'purpose': 'test'})) # Must match encryption context if used
print(f"\nAttempting to decrypt ciphertext...")
decrypt_response_bytes = client.do_action_with_exception(decrypt_request)
decrypt_response_data = json.loads(decrypt_response_bytes.decode('utf-8'))
decrypted_plaintext = decrypt_response_data.get('Plaintext')
print(f"Decryption successful. Decrypted Plaintext: '{decrypted_plaintext}'")
except Exception as e:
print(f"\nAn error occurred during KMS operation: {e}")
if "InvalidAccessKeyId.NotFound" in str(e) or "InvalidAccessKeySecret" in str(e):
print("Hint: Verify your ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables or credentials.")
elif "Specified key is not found." in str(e) or "The KeyId specified does not exist." in str(e):
print("Hint: Verify your ALIBABA_CLOUD_KMS_KEY_ID environment variable is a valid KMS Key ID or alias in the specified region.")
elif "InvalidRegionId" in str(e):
print("Hint: Verify your ALIBABA_CLOUD_REGION_ID environment variable.")
elif "CiphertextBlobIsNullOrEmpty" in str(e):
print("Hint: Encryption failed to produce a CiphertextBlob, so decryption cannot proceed. Check encryption parameters.")