Handling Large Log Files
- Log files from production systems, cloud environments, or CI/CD pipelines can be huge (GBs of data).
- Langchain’s text splitting can break these large files into smaller, more manageable chunks.
This allows logs to be processed in parallel or fed into analysis pipelines more efficiently
.
2️⃣ Focused Filtering for Relevant DataOften, only certain parts of logs are needed (e.g., error logs, warning logs, or logs related to specific services).
Langchain's recursive character/text splitter can break logs into smaller chunks per service, request ID, or error block.
Combined with filters (like keywords, timestamps, or log levels), you can extract only relevant portions of logs for faster debugging
.
Example:
Split logs into 500-line chunks.
Filter each chunk for:
- ERROR
- Timeout
- Connection refused This gives DevOps/SREs just the problematic parts.
3️⃣ Pre-processing for Observability & Search
Split logs can be indexed into observability tools (like ElasticSearch, Loki, etc.).
Filters can help tag logs per service, component, or severity.
Searching across these filtered, smaller chunks is faster and more targeted.
4️⃣ Easy Pipeline Integration
The split & filter approach can be automated in log ingestion pipelines.
Instead of ingesting full logs, split and filtered logs are pushed into analysis tools.
5️⃣ Example Langchain Filter Process for Logs
python
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Example log data (usually much larger in real-world scenarios)
log_data = """
[2025-03-06 10:00:01] [INFO] Service A started
[2025-03-06 10:00:03] [ERROR] Database connection failed
[2025-03-06 10:00:05] [INFO] Service B started
[2025-03-06 10:00:08] [WARNING] High memory usage detected
[2025-03-06 10:00:10] [ERROR] Request timeout on endpoint /api/v1/data
"""
# Initialize text splitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=100,
chunk_overlap=0
)
# Split log data
chunks = splitter.split_text(log_data)
# Filter function for relevant logs (only errors and warnings)
def filter_relevant_logs(chunk):
relevant_lines = []
for line in chunk.split('\n'):
if "ERROR" in line or "WARNING" in line:
relevant_lines.append(line)
return '\n'.join(relevant_lines)
# Apply filter to each chunk
filtered_chunks = [filter_relevant_logs(chunk) for chunk in chunks]
filtered_chunks = [chunk for chunk in filtered_chunks if chunk.strip()] # Remove empty chunks
# Output filtered logs
for i, chunk in enumerate(filtered_chunks, 1):
print(f"Filtered Chunk {i}:\n{chunk}\n")
Output
Filtered Chunk 1:
[2025-03-06 10:00:03] [ERROR] Database connection failed
Filtered Chunk 2:
[2025-03-06 10:00:08] [WARNING] High memory usage detected
[2025-03-06 10:00:10] [ERROR] Request timeout on endpoint /api/v1/data
def filter_log_line(line):
keywords = ["ERROR", "Timeout", "Connection refused", "503", "Deployment failed", "Unauthorized"]
return any(keyword in line for keyword in keywords)
with open("full_logs.txt", "r") as file:
filtered_logs = [line for line in file if filter_log_line(line)]
with open("filtered_logs.txt", "w") as file:
file.writelines(filtered_logs)
Log Processing Backend code
from flask import request, jsonify
from extensions import db
from models import Command # Assuming Command model already exists
from langchain.text_splitter import RecursiveCharacterTextSplitter
import re
# Predefined filter criteria - you can customize
FILTER_CRITERIA = {
"log_levels": ["ERROR", "WARNING", "CRITICAL"],
"keywords": ["Timeout", "Connection refused", "Crash", "OOMKilled"],
"http_status_codes": ["500", "502", "503", "504"],
"exception_types": ["KeyError", "NullPointerException"],
"resource_alerts": ["High CPU", "Out of Memory", "Disk Full"],
"database_errors": ["Deadlock", "Connection Pool Exhausted"],
"security_events": ["Unauthorized", "XSS Detected", "Access Denied"],
"deployment_events": ["Deployment started", "Rollback initiated"]
}
# Function to check if a log chunk matches any criteria
def log_matches_criteria(chunk):
for log_level in FILTER_CRITERIA["log_levels"]:
if log_level in chunk:
return True
for keyword in FILTER_CRITERIA["keywords"]:
if keyword in chunk:
return True
for status_code in FILTER_CRITERIA["http_status_codes"]:
if status_code in chunk:
return True
for exception in FILTER_CRITERIA["exception_types"]:
if exception in chunk:
return True
for alert in FILTER_CRITERIA["resource_alerts"]:
if alert in chunk:
return True
for error in FILTER_CRITERIA["database_errors"]:
if error in chunk:
return True
for event in FILTER_CRITERIA["security_events"]:
if event in chunk:
return True
for deploy_event in FILTER_CRITERIA["deployment_events"]:
if deploy_event in chunk:
return True
return False
@customlangchain_bp.route("/process_logs", methods=["POST"])
def process_logs():
try:
data = request.get_json()
raw_logs = data.get("logs", "")
category = data.get("category", "logs") # Default category if none provided
if not raw_logs:
return jsonify({"error": "Log data is required"}), 400
# Use Langchain to split logs into chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50
)
log_chunks = text_splitter.split_text(raw_logs)
# Filter chunks based on criteria
filtered_chunks = [chunk for chunk in log_chunks if log_matches_criteria(chunk)]
if not filtered_chunks:
return jsonify({"message": "No relevant logs found matching criteria"}), 200
# Save filtered logs to database
saved_chunks = []
for chunk in filtered_chunks:
new_command = Command(text=chunk, category=category)
db.session.add(new_command)
saved_chunks.append(new_command)
db.session.commit()
return jsonify({
"message": f"Processed and saved {len(saved_chunks)} relevant log chunks",
"saved_logs": [{"id": cmd.id, "text": cmd.text, "category": cmd.category} for cmd in saved_chunks]
}), 201
except Exception as e:
return jsonify({"error": str(e)}), 500
PROMPT for processing Logs
Admin Section: You can dynamically add Error Categories and Error Values into a database.
User Section: The user can:
Select Error Categories.
Based on the selected category, display a multi-select dropdown for error values.
Paste logs into a textarea and filter logs based on selected categories and error values.
Step-by-Step Solution
-
Backend (Flask)
We will define two routes in Flask:
add_error_filter: To add error categories and their respective error values.
process_logs_with_filter: To filter logs based on selected criteria.
1.1. Models - ErrorFilter
from extensions import db
class ErrorFilter(db.Model):
__tablename__ = 'error_filter'
id = db.Column(db.Integer, primary_key=True)
category = db.Column(db.String(100), nullable=False) # e.g., log_levels, keywords
error_value = db.Column(db.String(255), nullable=False) # e.g., ERROR, Timeout
Add Error Filter (Admin to add categories and errors):
@customlangchain_bp.route("/add_error_filter", methods=["POST"])
def add_error_filter():
try:
data = request.get_json()
category = data.get('category', '').strip()
error_value = data.get('error_value', '').strip()
if not category or not error_value:
return jsonify({"error": "Both category and error value are required"}), 400
new_filter = ErrorFilter(category=category, error_value=error_value)
db.session.add(new_filter)
db.session.commit()
return jsonify({"message": "Error filter added successfully"}), 201
except Exception as e:
return jsonify({"error": str(e)}), 500
Fetch Error Categories (To show the categories in dropdown):
@customlangchain_bp.route("/get_error_categories", methods=["GET"])
def get_error_categories():
try:
categories = db.session.query(ErrorFilter.category.distinct()).all()
category_list = [c[0] for c in categories]
return jsonify(category_list), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
Fetch Error Values by Category (To show errors based on category):
@customlangchain_bp.route("/get_errors_by_category/<category>", methods=["GET"])
def get_errors_by_category(category):
try:
errors = ErrorFilter.query.filter_by(category=category).all()
error_list = [e.error_value for e in errors]
return jsonify(error_list), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
Process Logs with Selected Filters (For log filtering):
@customlangchain_bp.route("/process_logs_with_filter", methods=["POST"])
def process_logs_with_filter():
try:
data = request.get_json()
selected_category = data.get('category')
selected_errors = data.get('error_values', []) # This is a list of errors
logs = data.get('logs', '')
# Validate the incoming data
if not logs or not selected_category or not selected_errors:
return jsonify({"error": "Logs, category, and at least one error are required"}), 400
# Split logs into chunks
from langchain.text_splitter import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
chunks = splitter.split_text(logs)
# Filter logs that contain any of the selected errors
filtered_chunks = []
for chunk in chunks:
if any(error in chunk for error in selected_errors): # Check if any selected error is in the chunk
filtered_chunks.append(chunk)
if not filtered_chunks:
return jsonify({"message": "No logs matched the selected error filters"}), 200
# Optionally save the filtered chunks to the Command table
for chunk in filtered_chunks:
new_command = Command(text=chunk, category=selected_category)
db.session.add(new_command)
db.session.commit()
return jsonify({
"message": f"{len(filtered_chunks)} chunks matched the filter and saved.",
"filtered_chunks": filtered_chunks
}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
- Frontend (React) We’ll build two menus:
Error Category Dropdown (First dropdown for selecting categories).
Error Values Dropdown (Dependent dropdown for error values based on the selected category).
Textarea for Logs.
2.1. Frontend (Add Error Filter Menu)
For Admin to add Error Categories and Error Values.
import React, { useState } from 'react';
const AddErrorFilter = () => {
const [category, setCategory] = useState('');
const [errorValue, setErrorValue] = useState('');
const handleSubmit = async () => {
const response = await fetch('http://127.0.0.1:5000/customlangchain/add_error_filter', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ category, error_value: errorValue })
});
const result = await response.json();
if (result.error) alert(result.error);
else alert(result.message);
};
return (
<div>
<h3>Add Error Filter</h3>
<input
type="text"
placeholder="Category"
value={category}
onChange={e => setCategory(e.target.value)}
/>
<input
type="text"
placeholder="Error Value"
value={errorValue}
onChange={e => setErrorValue(e.target.value)}
/>
<button onClick={handleSubmit}>Add Filter</button>
</div>
);
};
export default AddErrorFilter;
2.2. Frontend (Process Logs Menu)
For users to filter logs based on selected categories and errors.
import React, { useState, useEffect } from 'react';
const ProcessLogs = () => {
const [categories, setCategories] = useState([]);
const [selectedCategory, setSelectedCategory] = useState('');
const [errors, setErrors] = useState([]);
const [selectedErrors, setSelectedErrors] = useState([]); // Multi-select errors
const [logs, setLogs] = useState('');
useEffect(() => {
fetch('http://127.0.0.1:5000/customlangchain/get_error_categories')
.then(res => res.json())
.then(setCategories);
}, []);
const fetchErrors = (category) => {
setSelectedCategory(category);
setSelectedErrors([]);
setErrors([]);
if (category) {
fetch(`http://127.0.0.1:5000/customlangchain/get_errors_by_category/${category}`)
.then(res => res.json())
.then(setErrors);
}
};
const handleProcess = async () => {
const response = await fetch('http://127.0.0.1:5000/customlangchain/process_logs_with_filter', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
category: selectedCategory,
error_values: selectedErrors, // Pass multiple selected errors
logs
})
});
const result = await response.json();
if (result.error) alert(result.error);
else {
alert(result.message);
console.log(result.filtered_chunks);
}
};
return (
<div>
<h3>Process Logs with Filters</h3>
{/* Dropdown 1: Category */}
<select value={selectedCategory} onChange={(e) => fetchErrors(e.target.value)}>
<option value="">Select Category</option>
{categories.map(cat => <option key={cat} value={cat}>{cat}</option>)}
</select>
{/* Dropdown 2: Errors (Multiple Selection) */}
{selectedCategory && (
<select multiple value={selectedErrors} onChange={(e) => setSelectedErrors(Array.from(e.target.selectedOptions, option => option.value))}>
{errors.map(err => <option key={err} value={err}>{err}</option>)}
</select>
)}
{/* Textarea for Logs */}
{selectedCategory && selectedErrors.length > 0 && (
<textarea
rows="10"
placeholder="Paste logs here..."
value={logs}
onChange={(e) => setLogs(e.target.value)}
/>
)}
{/* Submit Button */}
{selectedCategory && selectedErrors.length > 0 && (
<button onClick={handleProcess}>Process Logs</button>
)}
</div>
);
};
export default ProcessLogs;
PROMPT2 FOR solution of log processing
- Accept logs, category, and error selections from the frontend.
- Split logs using LangChain's text splitter.
- Filter logs based on selected errors.
- Apply the LangChain prompt template to generate solutions for the filtered logs . Save the processed logs to the database.
Backend Code (Flask + LangChain)
You need to adjust the existing backend code to:
Accept logs, category, and error selections from the frontend.
Split logs using LangChain's text splitter.
Filter logs based on selected errors.
Apply the LangChain prompt template to generate solutions for the filtered logs.
Save the processed logs to the database.
Here’s the updated backend code:
from flask import Blueprint, request, jsonify
from langchain.prompts import PromptTemplate
from langchain.llms import OpenAI
from langchain.chains import RunnableParallel
from langchain.text_splitter import RecursiveCharacterTextSplitter
from models import db, Command # Assuming `Command` is your database model
customlangchain_bp = Blueprint('customlangchain_bp', __name__)
# Endpoint to process logs and apply filtering
@customlangchain_bp.route("/process_logs_with_filter", methods=["POST"])
def process_logs_with_filter():
try:
# Extract incoming JSON data
data = request.get_json()
logs = data.get('logs', '')
selected_category = data.get('category', '')
selected_errors = data.get('selected_errors', []) # Array of selected errors (multi-select)
if not logs or not selected_errors or not selected_category:
return jsonify({"error": "Logs, selected errors, and category are required"}), 400
# Split logs into chunks using LangChain's text splitter
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
chunks = splitter.split_text(logs)
# Filter the chunks based on the selected errors
filtered_chunks = [chunk for chunk in chunks if any(error in chunk for error in selected_errors)]
if not filtered_chunks:
return jsonify({"message": "No logs matched the selected errors"}), 200
# LangChain Prompt Template to apply solutions based on filtered logs
prompts = {
"log_error_suggestions": PromptTemplate(
input_variables=["error"],
template="Analyze the log error: {error}. Provide a suggested solution."
)
}
# Use LangChain to apply the solution for each filtered log chunk
llm = OpenAI(api_key="your-openai-api-key") # Replace with your OpenAI API key
chains = {key: prompts[key] | llm for key in ["log_error_suggestions"]}
# Execute LangChain in parallel
parallel_chain = RunnableParallel(**chains)
results = parallel_chain.invoke({"error": filtered_chunks})
# Process results
processed_results = []
for key, value in results.items():
content = value.content if hasattr(value, "content") else str(value)
processed_results.append({"error": filtered_chunks, "suggestion": content})
# Optionally save the results into the database (Command table)
for result in processed_results:
new_command = Command(text=result["suggestion"], category=selected_category)
db.session.add(new_command)
db.session.commit()
return jsonify({
"message": f"{len(processed_results)} logs processed and solutions applied.",
"processed_logs": processed_results
}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
Explanation of Backend Code:
Logs and Filter: Logs are received from the frontend, and the selected errors (based on a multi-select checkbox) are used to filter relevant logs.
Splitting Logs: Logs are split into chunks using LangChain’s RecursiveCharacterTextSplitter to break large logs into manageable pieces.
Filtering Errors: After splitting, logs are filtered based on the selected errors (e.g., Timeout, Connection refused).
Prompt Template: A LangChain prompt template is defined to provide suggestions based on the filtered logs. The prompt analyzes the error and provides a suggestion.
LangChain Execution: The filtered logs are processed using LangChain, and the results are returned and saved into the database.
- Frontend Code (React) In React, we need to display the logs, allow the user to select the errors they want to process, and submit the logs to the backend for processing.
React Component (Log Processing)
import React, { useState } from 'react';
const ProcessLogs = () => {
const [logs, setLogs] = useState(''); // Logs from user input
const [selectedCategory, setSelectedCategory] = useState(''); // Selected category
const [selectedErrors, setSelectedErrors] = useState([]); // Selected errors (multi-select)
const [splittedLogs, setSplittedLogs] = useState([]); // Splitted logs from backend
const [loading, setLoading] = useState(false);
const [processedLogs, setProcessedLogs] = useState([]); // Logs after processing
const handleProcessLogs = async () => {
setLoading(true);
const response = await fetch('http://127.0.0.1:5000/customlangchain/process_logs_with_filter', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
logs,
category: selectedCategory,
selected_errors: selectedErrors
})
});
const data = await response.json();
setLoading(false);
if (data.error) {
alert(data.error);
} else {
setProcessedLogs(data.processed_logs); // Store the processed logs
alert(data.message); // Show success message
}
};
const handleSplitLogs = () => {
// Split logs into chunks (for testing purposes)
setSplittedLogs(logs.split('\n').map((log, index) => ({ id: index, log })));
};
const handleCheckboxChange = (e) => {
const { value, checked } = e.target;
if (checked) {
setSelectedErrors([...selectedErrors, value]);
} else {
setSelectedErrors(selectedErrors.filter(error => error !== value));
}
};
return (
<div>
<h3>Process Logs with Filters</h3>
{/* Log Input */}
<textarea
rows="10"
value={logs}
onChange={(e) => setLogs(e.target.value)}
placeholder="Paste your logs here..."
/>
{/* Select Category */}
<select onChange={(e) => setSelectedCategory(e.target.value)} value={selectedCategory}>
<option value="">Select Category</option>
<option value="error">Error</option>
<option value="warning">Warning</option>
{/* Add other categories as needed */}
</select>
{/* Split Logs Button */}
<button onClick={handleSplitLogs}>Split Logs</button>
{/* Render Splitted Logs with Checkboxes */}
<div>
{splittedLogs.length > 0 && (
<div>
<h4>Select Errors to Process:</h4>
{splittedLogs.map((chunk) => (
<div key={chunk.id}>
<input
type="checkbox"
value={chunk.log}
onChange={handleCheckboxChange}
/>
<label>{chunk.log}</label>
</div>
))}
</div>
)}
</div>
{/* Process Logs Button */}
<button onClick={handleProcessLogs} disabled={loading}>
{loading ? "Processing..." : "Process Logs"}
</button>
{/* Display Processed Logs */}
{processedLogs.length > 0 && (
<div>
<h4>Processed Logs:</h4>
{processedLogs.map((log, index) => (
<div key={index}>
<p>{log.suggestion}</p>
</div>
))}
</div>
)}
</div>
);
};
export default ProcessLogs;
Top comments (0)