AWS IoT Device SDK for Python V2

1.28.2 · active · verified Sun Apr 12

The AWS IoT Device SDK for Python V2, built on the AWS Common Runtime (CRT), enables Python applications to connect securely to AWS IoT Core and interact with services like MQTT, Device Shadow, and Jobs. It's designed for high performance and reliability. The library has a frequent release cadence, often with monthly or bi-monthly updates to integrate fixes and improvements from the underlying AWS CRT.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to connect to AWS IoT Core using mutual TLS authentication, subscribe to an MQTT topic, publish a message, and receive a message. It relies on environment variables for configuration details like endpoint, certificate paths, and client ID. Remember to replace placeholder paths with your actual certificate files.

import sys
import threading
import time
import os
from awsiot.iot import mqtt_connection_builder
from awscrt.mqtt import QoS
from awscrt.io import LogLevel, log_level_to_string, init_logging, CrtError
from awscrt.exceptions import AwsCrtError

# Configure logging to see detailed output
init_logging(LogLevel.Trace, 'stderr')

# Configuration from environment variables
endpoint = os.environ.get('AWS_IOT_ENDPOINT', 'your-iot-endpoint.amazonaws.com')
cert_filepath = os.environ.get('AWS_IOT_CERT', 'path/to/device.pem.crt') # e.g., 'device.pem.crt'
pri_key_filepath = os.environ.get('AWS_IOT_PRIVATE_KEY', 'path/to/private.pem.key') # e.g., 'private.pem.key'
root_ca_filepath = os.environ.get('AWS_IOT_ROOT_CA', 'path/to/rootCA.pem') # e.g., 'AmazonRootCA1.pem'
client_id = os.environ.get('AWS_IOT_CLIENT_ID', 'test_client')
topic = os.environ.get('AWS_IOT_TOPIC', 'sdk/test/python')
message_payload = "Hello from Python SDK!"

if not all([endpoint, cert_filepath, pri_key_filepath, root_ca_filepath, client_id, topic]):
    print("Please set AWS_IOT_ENDPOINT, AWS_IOT_CERT, AWS_IOT_PRIVATE_KEY, AWS_IOT_ROOT_CA, AWS_IOT_CLIENT_ID, AWS_IOT_TOPIC environment variables.")
    sys.exit(1)

# Callback when connection is interrupted
def on_connection_interrupted(connection, error, **kwargs):
    print(f"Connection interrupted: {error}. Will attempt to reconnect.")

# Callback when connection is resumed
def on_connection_resumed(connection, return_code, session_present, **kwargs):
    print(f"Connection resumed: {connection.id} Return Code: {return_code} Session Present: {session_present}")

# Callback for incoming messages
received_all_event = threading.Event()
def on_message_received(response):
    print(f"Received message on topic '{response.topic_name}': {response.payload.decode()}")
    received_all_event.set()

mqtt_connection = None
try:
    print(f"Connecting to {endpoint} with client ID '{client_id}'...")
    mqtt_connection = mqtt_connection_builder.mtls_from_path(
        endpoint=endpoint,
        cert_filepath=cert_filepath,
        pri_key_filepath=pri_key_filepath,
        ca_filepath=root_ca_filepath,
        on_connection_interrupted=on_connection_interrupted,
        on_connection_resumed=on_connection_resumed,
        client_id=client_id,
        clean_session=False,
        keep_alive_secs=30
    )

    # Connect to AWS IoT
    connect_future = mqtt_connection.connect()
    connect_future.result() # Wait for connection to complete
    print("Connected!")

    # Subscribe
    print(f"Subscribing to topic '{topic}'...")
    subscribe_future, packet_id = mqtt_connection.subscribe(
        topic=topic,
        qos=QoS.AT_LEAST_ONCE,
        callback=on_message_received
    )
    subscribe_result = subscribe_future.result()
    print(f"Subscribed with QoS: {subscribe_result.qos}")

    # Publish
    print(f"Publishing message to topic '{topic}': {message_payload}")
    mqtt_connection.publish(
        topic=topic,
        payload=message_payload,
        qos=QoS.AT_LEAST_ONCE
    )
    print("Published.")

    # Wait for the published message to be received (if subscribed to the same topic)
    print("Waiting for message to be received...")
    if not received_all_event.wait(timeout=10): # Wait up to 10 seconds
        print("Did not receive message within timeout.")

except CrtError as e:
    print(f"AWS CRT Error: {e}")
    sys.exit(1)
except AwsCrtError as e:
    print(f"AWS CRT Error (Python wrapper): {e}")
    sys.exit(1)
except Exception as e:
    print(f"An unexpected error occurred: {e}")
    sys.exit(1)
finally:
    # Disconnect
    print("Disconnecting...")
    if mqtt_connection:
        disconnect_future = mqtt_connection.disconnect()
        disconnect_future.result()
    print("Disconnected.")

view raw JSON →