dvc-task
dvc-task is an extensible Python library for queuing, running, and managing background jobs (processes) from standalone applications. It is built on Celery but leverages a pre-configured filesystem transport, eliminating the need for a full AMQP messaging server. As of April 2026, the current version is 0.40.2, with active development and somewhat frequent, though irregular, releases.
Warnings
- breaking Version 0.3.0 introduced a breaking change by bumping the minimum required versions for Celery and Kombu to 5.3.0. Users on older versions of these dependencies will need to upgrade.
- gotcha Users observed 'Celery Monitor Shutdowns' prior to version 0.40.1, indicating potential instability or unexpected behavior in monitoring. Version 0.40.1 includes a fix (dvc-10427) for this issue.
- gotcha While `dvc-task` is tested on Windows, its upstream dependency Celery is not officially supported on Windows. This means that certain issues specific to Windows might stem from Celery itself and could have limited official support.
- gotcha The version numbering jumped from `0.4.0` to `0.40.1`. While not explicitly stated as breaking, such a large jump in the minor version often signals significant changes or potential incompatibilities, even if release notes are brief. Users should review the changelog carefully when upgrading across this boundary.
Install
-
pip install dvc-task
Imports
- FSApp
from dvc_task.app import FSApp
- ProcessManager
from dvc_task.proc import ProcessManager
Quickstart
from dvc_task.app import FSApp
from dvc_task.proc import ProcessManager
import time
import os
# Create a filesystem-based Celery app
# For a real application, consider a more persistent wdir
app = FSApp(wdir="./dvc-task-data")
@app.task
def run_command_task(command_args, task_name):
manager = ProcessManager(wdir=os.path.join(app.wdir, 'processes'))
# Run a simple command as a background process
# The '.delay()' call queues the task
signature = manager.run(command_args, name=task_name).delay()
return signature.id
# Example usage:
if __name__ == "__main__":
print("Starting dvc-task example...")
task_id = run_command_task.delay(command_args=["echo", "Hello, dvc-task!"], task_name="greet_task")
print(f"Task 'greet_task' queued with ID: {task_id}")
print("Starting temporary worker...")
# In a real scenario, the worker would run in a separate process/daemon
# For quickstart, we run a temporary worker that exits when the queue is empty
worker = app.TemporaryWorker()
worker.start() # This blocks until the queue is empty or timeout
# Clean up (optional, but good for quickstart)
app.clean()
print("dvc-task example finished and cleaned up.")