Python Hyperscan

0.8.2 · active · verified Thu Apr 16

Python bindings for Hyperscan. This library provides high-performance regular expression matching, designed for large-scale pattern matching tasks, including multi-pattern and streaming modes. Version 0.8.2 is the current release, with active development and frequent updates, often seeing several patch releases and minor updates within a few months.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to compile multiple regular expressions into a Hyperscan database and then scan input data in both block and streaming modes. It includes a match event handler to process detected matches. Remember that patterns must be bytes.

import hyperscan

def on_match(id: int, from_: int, to: int, flags: int, context: object | None) -> int:
    print(f"Match for pattern ID {id} at [{from_}:{to}] with flags {flags}")
    return 0 # Continue scanning

# Define patterns with IDs and flags
patterns_config = [
    (b'foobar', 101, 0), # Simple literal match
    (b'baz', 102, hyperscan.HS_FLAG_CASELESS), # Case-insensitive
    (b'qux', 103, hyperscan.HS_FLAG_SOM_LEFTMOST | hyperscan.HS_FLAG_SINGLEMATCH) # Report start of match, single match
]

expressions, ids, flags = zip(*patterns_config)

db = hyperscan.Database()
db.compile(
    expressions=expressions,
    ids=ids,
    elements=len(patterns_config),
    flags=flags
)

# Create a scratch space for scanning
scratch = db.alloc_scratch()

# Scan a data buffer in block mode
data = b'This is a FoObAr string with baz and QuX inside.'
print(f"Scanning data: '{data.decode()}'")
matches_found = db.scan(data=data, scratch=scratch, match_event_handler=on_match)

if not matches_found:
    print("No matches found.")

# Example of streaming mode
print("\n--- Streaming Mode ---")
db_streaming = hyperscan.Database(mode=hyperscan.HS_MODE_STREAM)
db_streaming.compile(
    expressions=[b'stream_test'],
    ids=[201],
    elements=1,
    flags=[0]
)
scratch_streaming = db_streaming.alloc_scratch()

with db_streaming.stream(scratch=scratch_streaming, match_event_handler=on_match) as stream:
    stream.scan(data=b'first part of stream_test data')
    stream.scan(data=b'cond part of stream_test data')
    # Matches might only be reported on close or when enough data accumulated.
print("Streaming scan initiated. Matches may be reported during stream.scan or stream.close.")

view raw JSON →