Debug School

rakesh kumar
rakesh kumar

Posted on

Efficient Log Splitting, Filtering and processing for DevOps & SREs"

Handling Large Log Files

Image description

  1. Log files from production systems, cloud environments, or CI/CD pipelines can be huge (GBs of data).
  2. Langchain’s text splitting can break these large files into smaller, more manageable chunks.
  3. This allows logs to be processed in parallel or fed into analysis pipelines more efficiently
    .
    2️⃣ Focused Filtering for Relevant Data

  4. Often, only certain parts of logs are needed (e.g., error logs, warning logs, or logs related to specific services).

  5. Langchain's recursive character/text splitter can break logs into smaller chunks per service, request ID, or error block.

  6. Combined with filters (like keywords, timestamps, or log levels), you can extract only relevant portions of logs for faster debugging
    .
    Example:

Split logs into 500-line chunks.
Filter each chunk for:

  • ERROR
  • Timeout
  • Connection refused This gives DevOps/SREs just the problematic parts.

3️⃣ Pre-processing for Observability & Search
Split logs can be indexed into observability tools (like ElasticSearch, Loki, etc.).
Filters can help tag logs per service, component, or severity.
Searching across these filtered, smaller chunks is faster and more targeted.
4️⃣ Easy Pipeline Integration
The split & filter approach can be automated in log ingestion pipelines.
Instead of ingesting full logs, split and filtered logs are pushed into analysis tools.
5️⃣ Example Langchain Filter Process for Logs
python

from langchain.text_splitter import RecursiveCharacterTextSplitter

# Example log data (usually much larger in real-world scenarios)
log_data = """
[2025-03-06 10:00:01] [INFO] Service A started
[2025-03-06 10:00:03] [ERROR] Database connection failed
[2025-03-06 10:00:05] [INFO] Service B started
[2025-03-06 10:00:08] [WARNING] High memory usage detected
[2025-03-06 10:00:10] [ERROR] Request timeout on endpoint /api/v1/data
"""

# Initialize text splitter
splitter = RecursiveCharacterTextSplitter(
    chunk_size=100,
    chunk_overlap=0
)

# Split log data
chunks = splitter.split_text(log_data)

# Filter function for relevant logs (only errors and warnings)
def filter_relevant_logs(chunk):
    relevant_lines = []
    for line in chunk.split('\n'):
        if "ERROR" in line or "WARNING" in line:
            relevant_lines.append(line)
    return '\n'.join(relevant_lines)

# Apply filter to each chunk
filtered_chunks = [filter_relevant_logs(chunk) for chunk in chunks]
filtered_chunks = [chunk for chunk in filtered_chunks if chunk.strip()]  # Remove empty chunks

# Output filtered logs
for i, chunk in enumerate(filtered_chunks, 1):
    print(f"Filtered Chunk {i}:\n{chunk}\n")
Enter fullscreen mode Exit fullscreen mode

Output

Filtered Chunk 1:
[2025-03-06 10:00:03] [ERROR] Database connection failed
Enter fullscreen mode Exit fullscreen mode
Filtered Chunk 2:
[2025-03-06 10:00:08] [WARNING] High memory usage detected
[2025-03-06 10:00:10] [ERROR] Request timeout on endpoint /api/v1/data
Enter fullscreen mode Exit fullscreen mode

Image description

Image description

Image description

def filter_log_line(line):
    keywords = ["ERROR", "Timeout", "Connection refused", "503", "Deployment failed", "Unauthorized"]
    return any(keyword in line for keyword in keywords)

with open("full_logs.txt", "r") as file:
    filtered_logs = [line for line in file if filter_log_line(line)]

with open("filtered_logs.txt", "w") as file:
    file.writelines(filtered_logs)
Enter fullscreen mode Exit fullscreen mode

Image description

Image description

Log Processing Backend code

from flask import request, jsonify
from extensions import db
from models import Command  # Assuming Command model already exists
from langchain.text_splitter import RecursiveCharacterTextSplitter
import re

# Predefined filter criteria - you can customize
FILTER_CRITERIA = {
    "log_levels": ["ERROR", "WARNING", "CRITICAL"],
    "keywords": ["Timeout", "Connection refused", "Crash", "OOMKilled"],
    "http_status_codes": ["500", "502", "503", "504"],
    "exception_types": ["KeyError", "NullPointerException"],
    "resource_alerts": ["High CPU", "Out of Memory", "Disk Full"],
    "database_errors": ["Deadlock", "Connection Pool Exhausted"],
    "security_events": ["Unauthorized", "XSS Detected", "Access Denied"],
    "deployment_events": ["Deployment started", "Rollback initiated"]
}


# Function to check if a log chunk matches any criteria
def log_matches_criteria(chunk):
    for log_level in FILTER_CRITERIA["log_levels"]:
        if log_level in chunk:
            return True
    for keyword in FILTER_CRITERIA["keywords"]:
        if keyword in chunk:
            return True
    for status_code in FILTER_CRITERIA["http_status_codes"]:
        if status_code in chunk:
            return True
    for exception in FILTER_CRITERIA["exception_types"]:
        if exception in chunk:
            return True
    for alert in FILTER_CRITERIA["resource_alerts"]:
        if alert in chunk:
            return True
    for error in FILTER_CRITERIA["database_errors"]:
        if error in chunk:
            return True
    for event in FILTER_CRITERIA["security_events"]:
        if event in chunk:
            return True
    for deploy_event in FILTER_CRITERIA["deployment_events"]:
        if deploy_event in chunk:
            return True

    return False


@customlangchain_bp.route("/process_logs", methods=["POST"])
def process_logs():
    try:
        data = request.get_json()
        raw_logs = data.get("logs", "")
        category = data.get("category", "logs")  # Default category if none provided

        if not raw_logs:
            return jsonify({"error": "Log data is required"}), 400

        # Use Langchain to split logs into chunks
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=500,
            chunk_overlap=50
        )
        log_chunks = text_splitter.split_text(raw_logs)

        # Filter chunks based on criteria
        filtered_chunks = [chunk for chunk in log_chunks if log_matches_criteria(chunk)]

        if not filtered_chunks:
            return jsonify({"message": "No relevant logs found matching criteria"}), 200

        # Save filtered logs to database
        saved_chunks = []
        for chunk in filtered_chunks:
            new_command = Command(text=chunk, category=category)
            db.session.add(new_command)
            saved_chunks.append(new_command)

        db.session.commit()

        return jsonify({
            "message": f"Processed and saved {len(saved_chunks)} relevant log chunks",
            "saved_logs": [{"id": cmd.id, "text": cmd.text, "category": cmd.category} for cmd in saved_chunks]
        }), 201

    except Exception as e:
        return jsonify({"error": str(e)}), 500
Enter fullscreen mode Exit fullscreen mode

PROMPT for processing Logs

Admin Section: You can dynamically add Error Categories and Error Values into a database.
User Section: The user can:
Select Error Categories.
Based on the selected category, display a multi-select dropdown for error values.
Paste logs into a textarea and filter logs based on selected categories and error values.
Enter fullscreen mode Exit fullscreen mode

Step-by-Step Solution

  1. Backend (Flask) We will define two routes in Flask:

add_error_filter: To add error categories and their respective error values.
process_logs_with_filter: To filter logs based on selected criteria.
1.1. Models - ErrorFilter

from extensions import db

class ErrorFilter(db.Model):
    __tablename__ = 'error_filter'
    id = db.Column(db.Integer, primary_key=True)
    category = db.Column(db.String(100), nullable=False)  # e.g., log_levels, keywords
    error_value = db.Column(db.String(255), nullable=False)  # e.g., ERROR, Timeout
Enter fullscreen mode Exit fullscreen mode

Add Error Filter (Admin to add categories and errors):

@customlangchain_bp.route("/add_error_filter", methods=["POST"])
def add_error_filter():
    try:
        data = request.get_json()
        category = data.get('category', '').strip()
        error_value = data.get('error_value', '').strip()

        if not category or not error_value:
            return jsonify({"error": "Both category and error value are required"}), 400

        new_filter = ErrorFilter(category=category, error_value=error_value)
        db.session.add(new_filter)
        db.session.commit()

        return jsonify({"message": "Error filter added successfully"}), 201
    except Exception as e:
        return jsonify({"error": str(e)}), 500
Enter fullscreen mode Exit fullscreen mode

Fetch Error Categories (To show the categories in dropdown):

@customlangchain_bp.route("/get_error_categories", methods=["GET"])
def get_error_categories():
    try:
        categories = db.session.query(ErrorFilter.category.distinct()).all()
        category_list = [c[0] for c in categories]
        return jsonify(category_list), 200
    except Exception as e:
        return jsonify({"error": str(e)}), 500
Enter fullscreen mode Exit fullscreen mode

Fetch Error Values by Category (To show errors based on category):

@customlangchain_bp.route("/get_errors_by_category/<category>", methods=["GET"])
def get_errors_by_category(category):
    try:
        errors = ErrorFilter.query.filter_by(category=category).all()
        error_list = [e.error_value for e in errors]
        return jsonify(error_list), 200
    except Exception as e:
        return jsonify({"error": str(e)}), 500
Enter fullscreen mode Exit fullscreen mode

Process Logs with Selected Filters (For log filtering):

@customlangchain_bp.route("/process_logs_with_filter", methods=["POST"])
def process_logs_with_filter():
    try:
        data = request.get_json()
        selected_category = data.get('category')
        selected_errors = data.get('error_values', [])  # This is a list of errors
        logs = data.get('logs', '')

        # Validate the incoming data
        if not logs or not selected_category or not selected_errors:
            return jsonify({"error": "Logs, category, and at least one error are required"}), 400

        # Split logs into chunks
        from langchain.text_splitter import RecursiveCharacterTextSplitter
        splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
        chunks = splitter.split_text(logs)

        # Filter logs that contain any of the selected errors
        filtered_chunks = []
        for chunk in chunks:
            if any(error in chunk for error in selected_errors):  # Check if any selected error is in the chunk
                filtered_chunks.append(chunk)

        if not filtered_chunks:
            return jsonify({"message": "No logs matched the selected error filters"}), 200

        # Optionally save the filtered chunks to the Command table
        for chunk in filtered_chunks:
            new_command = Command(text=chunk, category=selected_category)
            db.session.add(new_command)

        db.session.commit()

        return jsonify({
            "message": f"{len(filtered_chunks)} chunks matched the filter and saved.",
            "filtered_chunks": filtered_chunks
        }), 200

    except Exception as e:
        return jsonify({"error": str(e)}), 500
Enter fullscreen mode Exit fullscreen mode
  1. Frontend (React) We’ll build two menus:

Error Category Dropdown (First dropdown for selecting categories).
Error Values Dropdown (Dependent dropdown for error values based on the selected category).
Textarea for Logs.
2.1. Frontend (Add Error Filter Menu)
For Admin to add Error Categories and Error Values.

import React, { useState } from 'react';

const AddErrorFilter = () => {
    const [category, setCategory] = useState('');
    const [errorValue, setErrorValue] = useState('');

    const handleSubmit = async () => {
        const response = await fetch('http://127.0.0.1:5000/customlangchain/add_error_filter', {
            method: 'POST',
            headers: { 'Content-Type': 'application/json' },
            body: JSON.stringify({ category, error_value: errorValue })
        });

        const result = await response.json();
        if (result.error) alert(result.error);
        else alert(result.message);
    };

    return (
        <div>
            <h3>Add Error Filter</h3>
            <input
                type="text"
                placeholder="Category"
                value={category}
                onChange={e => setCategory(e.target.value)}
            />
            <input
                type="text"
                placeholder="Error Value"
                value={errorValue}
                onChange={e => setErrorValue(e.target.value)}
            />
            <button onClick={handleSubmit}>Add Filter</button>
        </div>
    );
};

export default AddErrorFilter;
Enter fullscreen mode Exit fullscreen mode

2.2. Frontend (Process Logs Menu)
For users to filter logs based on selected categories and errors.

import React, { useState, useEffect } from 'react';

const ProcessLogs = () => {
    const [categories, setCategories] = useState([]);
    const [selectedCategory, setSelectedCategory] = useState('');
    const [errors, setErrors] = useState([]);
    const [selectedErrors, setSelectedErrors] = useState([]);  // Multi-select errors
    const [logs, setLogs] = useState('');

    useEffect(() => {
        fetch('http://127.0.0.1:5000/customlangchain/get_error_categories')
            .then(res => res.json())
            .then(setCategories);
    }, []);

    const fetchErrors = (category) => {
        setSelectedCategory(category);
        setSelectedErrors([]);
        setErrors([]);

        if (category) {
            fetch(`http://127.0.0.1:5000/customlangchain/get_errors_by_category/${category}`)
                .then(res => res.json())
                .then(setErrors);
        }
    };

    const handleProcess = async () => {
        const response = await fetch('http://127.0.0.1:5000/customlangchain/process_logs_with_filter', {
            method: 'POST',
            headers: { 'Content-Type': 'application/json' },
            body: JSON.stringify({
                category: selectedCategory,
                error_values: selectedErrors,  // Pass multiple selected errors
                logs
            })
        });

        const result = await response.json();
        if (result.error) alert(result.error);
        else {
            alert(result.message);
            console.log(result.filtered_chunks);
        }
    };

    return (
        <div>
            <h3>Process Logs with Filters</h3>

            {/* Dropdown 1: Category */}
            <select value={selectedCategory} onChange={(e) => fetchErrors(e.target.value)}>
                <option value="">Select Category</option>
                {categories.map(cat => <option key={cat} value={cat}>{cat}</option>)}
            </select>

            {/* Dropdown 2: Errors (Multiple Selection) */}
            {selectedCategory && (
                <select multiple value={selectedErrors} onChange={(e) => setSelectedErrors(Array.from(e.target.selectedOptions, option => option.value))}>
                    {errors.map(err => <option key={err} value={err}>{err}</option>)}
                </select>
            )}

            {/* Textarea for Logs */}
            {selectedCategory && selectedErrors.length > 0 && (
                <textarea
                    rows="10"
                    placeholder="Paste logs here..."
                    value={logs}
                    onChange={(e) => setLogs(e.target.value)}
                />
            )}

            {/* Submit Button */}
            {selectedCategory && selectedErrors.length > 0 && (
                <button onClick={handleProcess}>Process Logs</button>
            )}
        </div>
    );
};

export default ProcessLogs;
Enter fullscreen mode Exit fullscreen mode

PROMPT2 FOR solution of log processing

  • Accept logs, category, and error selections from the frontend.
  • Split logs using LangChain's text splitter.
  • Filter logs based on selected errors.
  • Apply the LangChain prompt template to generate solutions for the filtered logs . Save the processed logs to the database.

Backend Code (Flask + LangChain)
You need to adjust the existing backend code to:

Accept logs, category, and error selections from the frontend.
Split logs using LangChain's text splitter.
Filter logs based on selected errors.
Apply the LangChain prompt template to generate solutions for the filtered logs.
Save the processed logs to the database.
Here’s the updated backend code:

from flask import Blueprint, request, jsonify
from langchain.prompts import PromptTemplate
from langchain.llms import OpenAI
from langchain.chains import RunnableParallel
from langchain.text_splitter import RecursiveCharacterTextSplitter
from models import db, Command  # Assuming `Command` is your database model

customlangchain_bp = Blueprint('customlangchain_bp', __name__)

# Endpoint to process logs and apply filtering
@customlangchain_bp.route("/process_logs_with_filter", methods=["POST"])
def process_logs_with_filter():
    try:
        # Extract incoming JSON data
        data = request.get_json()
        logs = data.get('logs', '')
        selected_category = data.get('category', '')
        selected_errors = data.get('selected_errors', [])  # Array of selected errors (multi-select)

        if not logs or not selected_errors or not selected_category:
            return jsonify({"error": "Logs, selected errors, and category are required"}), 400

        # Split logs into chunks using LangChain's text splitter
        splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
        chunks = splitter.split_text(logs)

        # Filter the chunks based on the selected errors
        filtered_chunks = [chunk for chunk in chunks if any(error in chunk for error in selected_errors)]

        if not filtered_chunks:
            return jsonify({"message": "No logs matched the selected errors"}), 200

        # LangChain Prompt Template to apply solutions based on filtered logs
        prompts = {
            "log_error_suggestions": PromptTemplate(
                input_variables=["error"],
                template="Analyze the log error: {error}. Provide a suggested solution."
            )
        }

        # Use LangChain to apply the solution for each filtered log chunk
        llm = OpenAI(api_key="your-openai-api-key")  # Replace with your OpenAI API key
        chains = {key: prompts[key] | llm for key in ["log_error_suggestions"]}

        # Execute LangChain in parallel
        parallel_chain = RunnableParallel(**chains)
        results = parallel_chain.invoke({"error": filtered_chunks})

        # Process results
        processed_results = []
        for key, value in results.items():
            content = value.content if hasattr(value, "content") else str(value)
            processed_results.append({"error": filtered_chunks, "suggestion": content})

        # Optionally save the results into the database (Command table)
        for result in processed_results:
            new_command = Command(text=result["suggestion"], category=selected_category)
            db.session.add(new_command)

        db.session.commit()

        return jsonify({
            "message": f"{len(processed_results)} logs processed and solutions applied.",
            "processed_logs": processed_results
        }), 200

    except Exception as e:
        return jsonify({"error": str(e)}), 500
Enter fullscreen mode Exit fullscreen mode

Explanation of Backend Code:
Logs and Filter: Logs are received from the frontend, and the selected errors (based on a multi-select checkbox) are used to filter relevant logs.
Splitting Logs: Logs are split into chunks using LangChain’s RecursiveCharacterTextSplitter to break large logs into manageable pieces.
Filtering Errors: After splitting, logs are filtered based on the selected errors (e.g., Timeout, Connection refused).
Prompt Template: A LangChain prompt template is defined to provide suggestions based on the filtered logs. The prompt analyzes the error and provides a suggestion.
LangChain Execution: The filtered logs are processed using LangChain, and the results are returned and saved into the database.

  1. Frontend Code (React) In React, we need to display the logs, allow the user to select the errors they want to process, and submit the logs to the backend for processing.

React Component (Log Processing)

import React, { useState } from 'react';

const ProcessLogs = () => {
    const [logs, setLogs] = useState(''); // Logs from user input
    const [selectedCategory, setSelectedCategory] = useState(''); // Selected category
    const [selectedErrors, setSelectedErrors] = useState([]); // Selected errors (multi-select)
    const [splittedLogs, setSplittedLogs] = useState([]); // Splitted logs from backend
    const [loading, setLoading] = useState(false);
    const [processedLogs, setProcessedLogs] = useState([]); // Logs after processing

    const handleProcessLogs = async () => {
        setLoading(true);

        const response = await fetch('http://127.0.0.1:5000/customlangchain/process_logs_with_filter', {
            method: 'POST',
            headers: { 'Content-Type': 'application/json' },
            body: JSON.stringify({
                logs,
                category: selectedCategory,
                selected_errors: selectedErrors
            })
        });

        const data = await response.json();
        setLoading(false);

        if (data.error) {
            alert(data.error);
        } else {
            setProcessedLogs(data.processed_logs); // Store the processed logs
            alert(data.message); // Show success message
        }
    };

    const handleSplitLogs = () => {
        // Split logs into chunks (for testing purposes)
        setSplittedLogs(logs.split('\n').map((log, index) => ({ id: index, log })));
    };

    const handleCheckboxChange = (e) => {
        const { value, checked } = e.target;
        if (checked) {
            setSelectedErrors([...selectedErrors, value]);
        } else {
            setSelectedErrors(selectedErrors.filter(error => error !== value));
        }
    };

    return (
        <div>
            <h3>Process Logs with Filters</h3>

            {/* Log Input */}
            <textarea
                rows="10"
                value={logs}
                onChange={(e) => setLogs(e.target.value)}
                placeholder="Paste your logs here..."
            />

            {/* Select Category */}
            <select onChange={(e) => setSelectedCategory(e.target.value)} value={selectedCategory}>
                <option value="">Select Category</option>
                <option value="error">Error</option>
                <option value="warning">Warning</option>
                {/* Add other categories as needed */}
            </select>

            {/* Split Logs Button */}
            <button onClick={handleSplitLogs}>Split Logs</button>

            {/* Render Splitted Logs with Checkboxes */}
            <div>
                {splittedLogs.length > 0 && (
                    <div>
                        <h4>Select Errors to Process:</h4>
                        {splittedLogs.map((chunk) => (
                            <div key={chunk.id}>
                                <input
                                    type="checkbox"
                                    value={chunk.log}
                                    onChange={handleCheckboxChange}
                                />
                                <label>{chunk.log}</label>
                            </div>
                        ))}
                    </div>
                )}
            </div>

            {/* Process Logs Button */}
            <button onClick={handleProcessLogs} disabled={loading}>
                {loading ? "Processing..." : "Process Logs"}
            </button>

            {/* Display Processed Logs */}
            {processedLogs.length > 0 && (
                <div>
                    <h4>Processed Logs:</h4>
                    {processedLogs.map((log, index) => (
                        <div key={index}>
                            <p>{log.suggestion}</p>
                        </div>
                    ))}
                </div>
            )}
        </div>
    );
};

export default ProcessLogs;
Enter fullscreen mode Exit fullscreen mode

Top comments (0)