Flask-Executor
Flask-Executor is an easy-to-use Flask wrapper for the concurrent.futures module that allows you to initialize and configure executors via common Flask application patterns. It provides a lightweight in-process task queue solution for Flask applications, making it suitable for managing background tasks without the overhead of separate worker processes.
Common errors
-
RuntimeError: Working outside of application context.
cause Attempting to access Flask global proxies (e.g., `current_app`, `g`, `request`) within a `ProcessPoolExecutor` task or after the original request context has ended in a `ThreadPoolExecutor` task without proper context handling. `ProcessPoolExecutor` tasks do not receive copied contexts.fixFor `ProcessPoolExecutor`, pass all necessary data explicitly to the task function as arguments. For `ThreadPoolExecutor`, ensure `Flask-Executor` is initialized correctly with the Flask app, as it is designed to copy the context. However, be mindful of the immutability of copied contexts and their limited lifespan. -
Tasks submitted to ProcessPoolExecutor don't seem to access app.config values.
cause Tasks run in a `ProcessPoolExecutor` are executed in entirely separate Python processes. The Flask application context, including `app.config`, is not automatically serialized and transferred to these new processes.fixExplicitly pass any required configuration values as arguments to your task functions. Alternatively, if using named executors, configuration can be set via environment variables prefixed with the executor's name (e.g., `CUSTOM_EXECUTOR_TYPE`). -
TypeError: cannot pickle '_thread.RLock' object
cause You are trying to use a `ProcessPoolExecutor` for a task that involves objects that cannot be serialized (pickled) and passed between processes. This often happens with database connections, Flask application objects, or certain types of locks.fixRefactor your task to avoid passing unpickleable objects to a `ProcessPoolExecutor`. Instead, re-create necessary resources (like database sessions) within the task function itself, or switch to a `ThreadPoolExecutor` if the task is I/O-bound and doesn't require separate processes, as threads share memory and don't require pickling for context. Ensure `app.config['EXECUTOR_TYPE'] = 'thread'`.
Warnings
- gotcha When using `ProcessPoolExecutor`, Flask application and request contexts cannot be automatically propagated to worker processes due to limitations in Python's default object serialization and lack of shared memory. This means tasks in a `ProcessPoolExecutor` cannot directly access `flask.current_app`, `flask.request`, or `flask.g`.
- gotcha Callables submitted to a `ThreadPoolExecutor` are wrapped with a *copy* of the current application and request contexts. Changes made to these copies within the task will not be reflected in the original view, and changes in the original contexts after the task is submitted will not be available to the task.
- gotcha Integrating `Flask-Executor` with `Flask-SQLAlchemy` (especially in `ThreadPoolExecutor`) can lead to `StatementError: Can't reconnect until invalid transaction is rolled back` if a database transaction fails. This occurs because SQLAlchemy sessions bound to the application context might not be properly cleaned up or recycled after failures.
Install
-
pip install flask-executor
Imports
- Executor
from flask_executor import Executor
Quickstart
from flask import Flask, jsonify
from flask_executor import Executor
import time
import os
app = Flask(__name__)
# Configure executor (optional, defaults to ThreadPoolExecutor)
# app.config['EXECUTOR_TYPE'] = 'thread' # or 'process'
# app.config['EXECUTOR_MAX_WORKERS'] = 5 # or None
executor = Executor(app)
def long_running_task(duration):
time.sleep(duration)
return f"Task finished after {duration} seconds"
@app.route('/start-task/<int:duration>')
def start_task(duration):
future = executor.submit(long_running_task, duration)
# You can store the future_key if you want to retrieve results later
# executor.submit_stored('my_task_key', long_running_task, duration)
return jsonify({"message": f"Task submitted, will take {duration} seconds"}), 202
# Example for retrieving stored future (requires submit_stored instead of submit)
# @app.route('/get-task-result')
# def get_task_result():
# if not executor.futures.done('my_task_key'):
# return jsonify({'status': executor.futures._state('my_task_key')}), 202
# future = executor.futures.pop('my_task_key')
# return jsonify({'status': 'done', 'result': future.result()})
if __name__ == '__main__':
# In a real application, consider using a production-ready WSGI server
app.run(debug=True)