InfluxDB Python Client
The official Python client library for InfluxDB 2.0+, providing comprehensive API access for writing, querying (Flux), and managing InfluxDB resources. It is actively maintained with frequent minor releases, typically on a monthly to bi-monthly cadence.
Warnings
- breaking This client is specifically designed for InfluxDB 2.0+ (Flux API) and is NOT compatible with InfluxDB 1.x HTTP API endpoints. Trying to connect to an InfluxDB 1.x instance will result in authentication or API endpoint errors.
- gotcha The default write API is asynchronous (batching and buffering). For immediate writes, ensure you specify `write_type=SYNCHRONOUS` in `WriteOptions`. Not doing so can lead to data not appearing immediately in queries or being lost if the application crashes before the buffer is flushed.
- gotcha Older versions (pre-1.42.0) had issues with serializing Pandas DataFrames containing `NaN` (Not a Number) values, leading to errors or incorrect data storage.
- gotcha The client requires Python 3.7 or newer. Using older Python versions will result in `SyntaxError` or `ImportError` due to modern language features and type hinting.
- deprecated Several internal `urllib` calls and `datetime` timezone helper functions were replaced or refactored in recent versions to avoid deprecated Python functions. While this mainly affects internal workings, it's good practice to update.
Install
-
pip install influxdb-client -
pip install "influxdb-client[pandas]"
Imports
- InfluxDBClient
from influxdb_client import InfluxDBClient
- Point
from influxdb_client import Point
- WriteOptions
from influxdb_client import WriteOptions
- SYNCHRONOUS
from influxdb_client.client.write_api import SYNCHRONOUS
Quickstart
import os
from influxdb_client import InfluxDBClient, Point, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS
# Configuration from environment variables for security and flexibility
INFLUXDB_URL = os.environ.get('INFLUXDB_URL', 'http://localhost:8086')
INFLUXDB_TOKEN = os.environ.get('INFLUXDB_TOKEN', 'YOUR_INFLUXDB_TOKEN')
INFLUXDB_ORG = os.environ.get('INFLUXDB_ORG', 'YOUR_INFLUXDB_ORG')
INFLUXDB_BUCKET = os.environ.get('INFLUXDB_BUCKET', 'YOUR_INFLUXDB_BUCKET')
if 'YOUR_INFLUXDB_TOKEN' in INFLUXDB_TOKEN or 'YOUR_INFLUXDB_ORG' in INFLUXDB_ORG or 'YOUR_INFLUXDB_BUCKET' in INFLUXDB_BUCKET:
print("WARNING: Please set INFLUXDB_URL, INFLUXDB_TOKEN, INFLUXDB_ORG, and INFLUXDB_BUCKET environment variables or update the quickstart code.")
print(f"Connecting to InfluxDB at {INFLUXDB_URL} for org '{INFLUXDB_ORG}'")
with InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) as client:
# 1. Write data point
write_api = client.write_api(write_options=WriteOptions(batch_size=1, flush_interval=1_000, write_type=SYNCHRONOUS))
point = Point("my_measurement") \
.tag("location", "us-west") \
.field("temperature", 25.0) \
.field("humidity", 60.5)
try:
write_api.write(bucket=INFLUXDB_BUCKET, record=point)
print(f"Successfully wrote point: {point.to_line_protocol()}")
except Exception as e:
print(f"Error writing point: {e}")
# 2. Query data using Flux
query_api = client.query_api()
query = f'from(bucket: "{INFLUXDB_BUCKET}") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "my_measurement")'
print(f"Executing Flux query:\n{query}")
try:
tables = query_api.query(query, org=INFLUXDB_ORG)
for table in tables:
for record in table.records:
print(f" Queried: {record.get_measurement()}, {record.get_field()}="{record.get_value()}" at {record.get_time()}")
except Exception as e:
print(f"Error querying data: {e}")
print("Client closed.")