Flask-Queue-Worker is useful for executing long-running tasks asynchronously so that the main Flask app remains responsive. Below are some real-world scenarios where you can use it, along with example use cases
Background AI Content Generation
β
Scenario: A user submits a request to generate AI-based content (e.g., blog, summary, or chatbot response).
π§Example Use Case
- A user requests an AI-generated blog post.
- The task is queued and processed in the background.
- The user gets a task_id and can check progress later
.
Example API Flow
User requests content generation
:
curl -X POST http://127.0.0.1:5000/generate -H "Content-Type: application/json" -d '{"prompt": "Write an article about space exploration"}'
Response
:
{ "job_id": "xyz123" }
User checks task status
:
curl -X GET http://127.0.0.1:5000/task-status/xyz123
Response
:
{ "status": "COMPLETED", "response": "Space exploration has advanced significantly over the past decades..." }
Asynchronous Image Processing
Scenario: Users upload an image that needs background processing (e.g., resizing, filtering, or AI-based enhancements).
π§ Example Use Case
A user uploads an image.
The image is processed (resized, enhanced, etc.) in the background.
The user receives a link to download the processed image.
Example API Flow
User uploads an image for enhancement
:
curl -X POST http://127.0.0.1:5000/enhance-image -F "image=@input.jpg"
Response
:
{ "job_id": "abc123" }
User checks task status
:
curl -X GET http://127.0.0.1:5000/task-status/abc123
Response (Completed)
:
{ "status": "COMPLETED", "image_url": "http://127.0.0.1:5000/downloads/output.jpg" }
Video Transcoding in the Background
Scenario: Users upload videos, and the system processes (converts, compresses, or enhances) them asynchronously.
π§ Example Use Case
- User uploads a large video file.
- The system queues it for compression and format conversion.
- User gets a link to the processed video when ready . Example API Flow User uploads a video for processing:
curl -X POST http://127.0.0.1:5000/convert-video -F "video=@input.mp4"
Response:
{ "job_id": "video456" }
User checks task status:
curl -X GET http://127.0.0.1:5000/task-status/video456
Response (Completed)
:
{ "status": "COMPLETED", "video_url": "http://127.0.0.1:5000/downloads/output.mp4" }
Asynchronous Email Sending
Scenario: Users perform an action (e.g., registration, password reset), and an email needs to be sent without delaying the response.
π§ Example Use Case
User signs up or requests a password reset.
Email sending is queued as a background job.
The user receives a confirmation response immediately.
Example API Flow
User registers for an account
:
curl -X POST http://127.0.0.1:5000/register -H "Content-Type: application/json" -d '{"email": "user@example.com"}'
Response
:
{ "message": "Registration successful. Check your email!" }
Background worker sends email asynchronously.}
Web Scraping & Data Extraction
Scenario: A user requests data from a website, and scraping runs in the background to avoid blocking the app.
π§ Example Use Case
- User requests product prices from an e-commerce site.
- The scraping task is queued and runs asynchronously.
- The user retrieves results when ready . Example API Flow User requests a product price comparison:
curl -X POST http://127.0.0.1:5000/scrape-prices -H "Content-Type: application/json" -d '{"url": "https://www.example.com/product"}'
Response:
{ "job_id": "scrape789" }
User checks scraping progress
:
curl -X GET http://127.0.0.1:5000/task-status/scrape789
Response (Completed)
:
{ "status": "COMPLETED", "prices": [{"store": "Amazon", "price": "$50"}, {"store": "eBay"}
Generating Reports in the Background
Scenario: Users request reports, which are generated asynchronously to avoid blocking the main application.
π§ Example Use Case
A user requests a financial or analytics report.
The report generation runs in the background.
The user receives a download link when itβs ready.
Example API Flow
User requests report generation
:
curl -X POST http://127.0.0.1:5000/generate-report -H "Content-Type: application/json" -d '{"report_type": "sales"}'
Response
:
{ "job_id": "report001" }
User checks report status:
curl -X GET http://127.0.0.1:5000/task-status/report001
Response (Completed)
:
{ "status": "COMPLETED", "report_url": "http://127.0.0.1:5000/downloads/sales_report.pdf" }
Background Machine Learning Model Training
Scenario: Users upload training data, and a machine learning model is trained asynchronously.
π§ Example Use Case
- A user submits a dataset.
- The model is trained in the background.
- The user can later download the trained model . Example API Flow User submits dataset for training:
curl -X POST http://127.0.0.1:5000/train-model -F "dataset=@data.csv"
Response:
{ "job_id": "ml_training123" }
User checks model training progress
:
curl -X GET http://127.0.0.1:5000/task-status/ml_training123
Response (Completed)
:
{ "status": "COMPLETED", "model_url": "http://127.0.0.1:5000/downloads/trained_model.pk"}
Example of single Queue
Install Required Dependencies
pip install flask-queue
Flask Back-End (app.py)
This script handles background content generation using Flask-Queue-Worker.
import os
import uuid
import time
from flask import Flask, request, jsonify, send_from_directory
from flask_queue import Queue
from multiprocessing import Process, Manager
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage
# Initialize Flask app
app = Flask(__name__, static_folder="static")
# Initialize in-memory queue (No Redis, just Python's built-in multiprocessing)
task_manager = Manager()
queue = Queue(app, backend=task_manager.dict())
# Set OpenAI API Key (Replace with your actual API Key)
os.environ["OPENAI_API_KEY"] = "your_openai_api_key"
# Initialize LangChain Model
chat_model = ChatOpenAI(model_name="gpt-4", temperature=0.7)
def generate_content(job_id, prompt, task_store):
"""
Function that generates content using LangChain and stores the result.
Runs in the background via Flask-Queue-Worker.
"""
task_store[job_id] = {"status": "PROCESSING"}
try:
response = chat_model([HumanMessage(content=prompt)])
task_store[job_id] = {"status": "COMPLETED", "response": response.content}
except Exception as e:
task_store[job_id] = {"status": "FAILED", "error": str(e)}
@app.route('/generate', methods=['POST'])
def generate():
"""
API endpoint to trigger content generation in the background.
"""
data = request.get_json()
prompt = data.get("prompt", "")
if not prompt:
return jsonify({"error": "Prompt is required"}), 400
job_id = str(uuid.uuid4())
queue.task_store[job_id] = {"status": "QUEUED"}
# Enqueue the task (Worker will pick it up)
queue.enqueue(generate_content, job_id, prompt)
return jsonify({"job_id": job_id}), 202
@app.route('/task-status/<job_id>', methods=['GET'])
def get_task_status(job_id):
"""
API endpoint to check the status of a background task.
"""
task = queue.task_store.get(job_id)
if not task:
return jsonify({"status": "NOT_FOUND"}), 404
return jsonify(task)
@app.route('/')
def serve_index():
"""Serve the HTML front-end"""
return send_from_directory("static", "index.html")
if __name__ == '__main__':
app.run(debug=True)
2οΈβ£ Front-End (static/index.html)
This HTML + JavaScript UI triggers the Flask API and checks job status.
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Flask Queue Background Task</title>
<style>
body {
font-family: Arial, sans-serif;
max-width: 600px;
margin: 50px auto;
text-align: center;
}
input, button {
padding: 10px;
font-size: 16px;
}
#status {
margin-top: 20px;
font-weight: bold;
}
</style>
</head>
<body>
<h2>Generate AI Content (Background Task)</h2>
<input type="text" id="prompt" placeholder="Enter prompt here" />
<button onclick="generateContent()">Generate</button>
<p id="status"></p>
<p id="result"></p>
<script>
let jobId = "";
function generateContent() {
const prompt = document.getElementById("prompt").value;
if (!prompt) {
alert("Please enter a prompt");
return;
}
document.getElementById("status").innerText = "Submitting request...";
document.getElementById("result").innerText = "";
fetch("/generate", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prompt })
})
.then(response => response.json())
.then(data => {
jobId = data.job_id;
document.getElementById("status").innerText = "Processing... (Checking every 3s)";
checkStatus();
})
.catch(error => {
document.getElementById("status").innerText = "Error: " + error;
});
}
function checkStatus() {
if (!jobId) return;
fetch(`/task-status/${jobId}`)
.then(response => response.json())
.then(data => {
if (data.status === "PROCESSING") {
setTimeout(checkStatus, 3000);
} else if (data.status === "COMPLETED") {
document.getElementById("status").innerText = "Completed!";
document.getElementById("result").innerText = data.response;
} else {
document.getElementById("status").innerText = "Task Failed!";
}
})
.catch(error => {
document.getElementById("status").innerText = "Error: " + error;
});
}
</script>
</body>
</html>
3οΈβ£ Worker Process (worker.py)
This script processes jobs in the queue.
from flask_queue import Worker
from app import queue
if __name__ == '__main__':
worker = Worker(queue)
worker.work()
4οΈβ£ Running the Application
Step 1: Start Flask Server
python app.py
Flask runs on http://127.0.0.1:5000/.
The front-end UI is accessible at http://127.0.0.1:5000/.
Step 2: Start the Worker
In a new terminal:
python worker.py
This processes jobs from the queue asynchronously.
5οΈβ£ How It Works (Step-by-Step)
- User enters a prompt and clicks "Generate".
- Front-end calls /generate, returns job_id.
- Flask queues the task.
- Worker processes the task in the background.
- Front-end polls /task-status/ every 3 seconds.
- Once completed, the AI-generated response is displayed . 6οΈβ£ Example Usage User Triggers API
curl -X POST http://127.0.0.1:5000/generate -H "Content-Type: application/json" -d '{"prompt": "Write about AI"}'
Response:
{ "job_id": "abc123" }
Check Task Status
curl -X GET http://127.0.0.1:5000/task-status/abc123
Response (Processing):
{ "status": "PROCESSING" }
Response (Completed)
:
{ "status": "COMPLETED", "response": "AI has revolutionized technology..." }
Example of Multiple Queue
Install Required Dependencies
Ensure you have Python installed, then install the required packages:
pip install flask flask-queue langchain openai pandas
- Create Flask App with Background Queue Create a file app.py and include endpoints for all seven scenarios.
import os
import uuid
import time
import pandas as pd
from flask import Flask, request, jsonify
from flask_queue import Queue
from multiprocessing import Process, Manager
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage
# Initialize Flask app
app = Flask(__name__)
# In-memory queue (No Redis, uses Pythonβs multiprocessing)
task_manager = Manager()
queue = Queue(app, backend=task_manager.dict())
# OpenAI API Key (Replace with your key)
os.environ["OPENAI_API_KEY"] = "your_openai_api_key"
# Initialize LangChain model
chat_model = ChatOpenAI(model_name="gpt-4", temperature=0.7)
def update_task_status(job_id, status, result=None):
"""Update the status of a queued task"""
queue.task_store[job_id] = {"status": status, "result": result}
def generate_content(job_id, prompt):
"""Generates AI content using LangChain"""
update_task_status(job_id, "PROCESSING")
try:
response = chat_model([HumanMessage(content=prompt)])
update_task_status(job_id, "COMPLETED", response.content)
except Exception as e:
update_task_status(job_id, "FAILED", str(e))
def enhance_image(job_id, image_path):
"""Simulate Image Processing"""
update_task_status(job_id, "PROCESSING")
time.sleep(5) # Simulate processing time
processed_image = f"{image_path}_enhanced.jpg"
update_task_status(job_id, "COMPLETED", processed_image)
def convert_video(job_id, video_path):
"""Simulate Video Transcoding"""
update_task_status(job_id, "PROCESSING")
time.sleep(10) # Simulate conversion
converted_video = f"{video_path}_converted.mp4"
update_task_status(job_id, "COMPLETED", converted_video)
def send_email(job_id, email):
"""Simulate Email Sending"""
update_task_status(job_id, "PROCESSING")
time.sleep(3) # Simulate sending delay
update_task_status(job_id, "COMPLETED", f"Email sent to {email}")
def scrape_prices(job_id, url):
"""Simulate Web Scraping"""
update_task_status(job_id, "PROCESSING")
time.sleep(5) # Simulate scraping
prices = [{"store": "Amazon", "price": "$50"}, {"store": "eBay", "price": "$55"}]
update_task_status(job_id, "COMPLETED", prices)
def generate_report(job_id, report_type):
"""Simulate Report Generation"""
update_task_status(job_id, "PROCESSING")
time.sleep(7) # Simulate report creation
report_url = f"http://127.0.0.1:5000/downloads/{report_type}_report.pdf"
update_task_status(job_id, "COMPLETED", report_url)
def train_model(job_id, dataset_path):
"""Simulate ML Model Training"""
update_task_status(job_id, "PROCESSING")
time.sleep(15) # Simulate training
trained_model_path = f"{dataset_path}_trained.pkl"
update_task_status(job_id, "COMPLETED", trained_model_path)
@app.route('/generate', methods=['POST'])
def generate():
"""Trigger AI content generation"""
data = request.get_json()
job_id = str(uuid.uuid4())
queue.enqueue(generate_content, job_id, data["prompt"])
return jsonify({"job_id": job_id}), 202
@app.route('/enhance-image', methods=['POST'])
def enhance():
"""Trigger image enhancement"""
job_id = str(uuid.uuid4())
queue.enqueue(enhance_image, job_id, "input.jpg")
return jsonify({"job_id": job_id}), 202
@app.route('/convert-video', methods=['POST'])
def convert():
"""Trigger video processing"""
job_id = str(uuid.uuid4())
queue.enqueue(convert_video, job_id, "input.mp4")
return jsonify({"job_id": job_id}), 202
@app.route('/send-email', methods=['POST'])
def email():
"""Trigger email sending"""
data = request.get_json()
job_id = str(uuid.uuid4())
queue.enqueue(send_email, job_id, data["email"])
return jsonify({"job_id": job_id}), 202
@app.route('/scrape-prices', methods=['POST'])
def scrape():
"""Trigger web scraping"""
data = request.get_json()
job_id = str(uuid.uuid4())
queue.enqueue(scrape_prices, job_id, data["url"])
return jsonify({"job_id": job_id}), 202
@app.route('/generate-report', methods=['POST'])
def report():
"""Trigger report generation"""
data = request.get_json()
job_id = str(uuid.uuid4())
queue.enqueue(generate_report, job_id, data["report_type"])
return jsonify({"job_id": job_id}), 202
@app.route('/train-model', methods=['POST'])
def train():
"""Trigger ML model training"""
job_id = str(uuid.uuid4())
queue.enqueue(train_model, job_id, "dataset.csv")
return jsonify({"job_id": job_id}), 202
@app.route('/task-status/<job_id>', methods=['GET'])
def get_task_status(job_id):
"""Check the status of a background task"""
task = queue.task_store.get(job_id)
if not task:
return jsonify({"status": "NOT_FOUND"}), 404
return jsonify(task)
if __name__ == '__main__':
app.run(debug=True)
- Create a Worker Process Create a file worker.py to run queued tasks.
from flask_queue import Worker
from app import queue
if __name__ == '__main__':
worker = Worker(queue)
worker.work()
- Run Flask & Worker Start the Flask Server
python app.py
Start the Worker
Open another terminal and run:
python worker.py
- Test the API Trigger AI Content Generation
curl -X POST http://127.0.0.1:5000/generate -H "Content-Type: application/json" -d '{"prompt": "Write an article about AI"}'
Check Task Status
curl -X GET http://127.0.0.1:5000/task-status/your_job_id
Example Response:
{
"status": "COMPLETED",
"result": "Artificial Intelligence has transformed the world by..."
}
- Summary β 7 Background Tasks Implemented β No Redis Needed (Fully Python-based Queue) β Flask-Queue-Worker Handles Task Execution β Flask Stays Responsive While Tasks Run in the Background
π Now, your Flask API supports multiple background processing scenarios! Let me know if you
Top comments (0)