coffea
coffea is a Python toolkit designed for columnar data analysis in High-Energy Physics (HEP), providing basic tools and wrappers for efficient manipulation of HEP event data. It integrates with modern big-data technologies like Dask, Parsl, and TaskVine to enable scaling analyses from local machines to computing clusters without code changes. The library is actively developed, currently at version 2026.4.0, with frequent, often monthly or bi-monthly, releases of its calendar-versioned major releases and backports for its 0.7.x branch.
Warnings
- breaking Python 3.9 support was dropped in `coffea` version 2025.12.0. Users on older Python versions will need to upgrade their environment to Python 3.10 or newer to use recent `coffea` releases.
- breaking Major API changes and mandatory `dask-awkward` dependency occurred when migrating from `coffea 0.7.x` to the calendar-versioned releases (`202X.Y.Z`). This transition involved fundamental shifts due to `awkward-array` v1 to v2 migration, making `dask-awkward` and `dask-histogram` mandatory for delayed computation. Old code might require significant updates to adapt to the new pattern, particularly regarding explicit `.compute()` calls.
- gotcha ProcessorABC instances are expected to be fully serializable for distributed execution. Avoid tracking mutable state within a `ProcessorABC` instance, as it's treated as a stateless bundle of methods. Issues can arise with non-picklable objects or shared state that isn't properly handled during serialization/deserialization.
- gotcha Premature calls to `.compute()` on `dask-awkward` arrays or `dask-histogram` objects can severely degrade performance in `coffea` analyses. Explicit `.compute()` calls force immediate evaluation, breaking the efficient Dask task graph designed for lazy, distributed processing.
- gotcha The `coffea` project transitioned from semantic versioning (e.g., `0.7.x`) to calendar versioning (e.g., `202X.Y.Z`). There is an active `0.7.x` backports branch. This dual-versioning scheme can lead to confusion and API incompatibilities if users are not careful to install and develop against the intended version series.
Install
-
pip install coffea -
pip install coffea[dask] -
pip install coffea[parsl]
Imports
- processor
from coffea import processor
- ak
import awkward as ak
- hist
import hist
Quickstart
import awkward as ak
import hist
from coffea import processor
from coffea.nanoevents import NanoEventsFactory, BaseSchema
# Define a simple processor
class MyProcessor(processor.ProcessorABC):
def process(self, events):
# For demonstration, assume 'events' has a 'Muon' collection
# In a real scenario, events would be loaded from a file using NanoEventsFactory
if 'Muon' not in events.fields:
# Create dummy muons if not present, for runnable example
dummy_muons = ak.zip({
"pt": ak.Array([ [10, 20], [30] ]),
"eta": ak.Array([ [0.5, 1.2], [-0.8] ]),
"charge": ak.Array([ [1, -1], [1] ])
}, depth_limit=1)
events = ak.with_field(events, dummy_muons, "Muon")
muons = events.Muon[events.Muon.pt > 15]
# Select opposite-sign dimuons
dimuons = ak.combinations(muons, 2, fields=["lead", "trail"])
dimuons = dimuons[dimuons.lead.charge != dimuons.trail.charge]
# Calculate invariant mass (simplified for example)
# In a real analysis, vector-like operations would be used
if len(dimuons) > 0:
# Dummy mass calculation for illustration
mass = ak.flatten(dimuons.lead.pt + dimuons.trail.pt)
else:
mass = ak.Array([])
# Create a histogram and fill it
h_mass = hist.Hist.new.Reg(50, 0, 100, name="mass", label="Dimuon Mass [GeV]").Double()
h_mass.fill(mass=mass)
return {"mymass_histogram": h_mass, "nevents": len(events)}
def postprocess(self, accumulator):
return accumulator
# Example usage with a local executor
fileset = {"dataset_A": ["dummy_file.root"]}
# Create a dummy events object for local testing without actual file I/O
events_data = {"event_id": ak.Array([1, 2, 3])}
dummy_nanoevents = NanoEventsFactory.from_dict(events_data, schemaclass=BaseSchema).events()
# Instantiate the processor
my_processor_instance = MyProcessor()
# Run the processor with a local executor
# In a real scenario, you'd load actual ROOT files
output = processor.Runner(
executor=processor.IterativeExecutor(status=False),
schema=BaseSchema,
xrootdtimeout=0 # dummy, for local execution
)(fileset, "Events", processor_instance=my_processor_instance)
print(output["mymass_histogram"])