Debug School

rakesh kumar
rakesh kumar

Posted on • Updated on

How to building a Modular Text Embedding Pipeline with LangChain in Python

Introduction

Text embedding pipelines are essential for Natural Language Processing (NLP) tasks such as document retrieval, semantic search, and recommendation systems. LangChain, a robust library for chaining together modular components, simplifies the process of building embedding pipelines. This blog will explain how to build a modular text embedding pipeline using LangChain in Python, its advantages, and practical use cases.

What is a Modular Text Embedding Pipeline

?
A modular text embedding pipeline involves breaking the text processing workflow into manageable steps:

Data Ingestion: Load data from sources like text files, PDFs, or web pages.
Data Transformation: Preprocess and clean the data (e.g., splitting text into sentences).
Embedding Generation: Convert text into numerical vectors (embeddings) using models like OpenAI’s text-embedding-ada-002.
Storage and Retrieval: Store embeddings and enable querying based on similarity metrics like cosine similarity.
Advantages of a Modular Pipeline
Reusability: Individual modules can be reused or swapped for different use cases or data sources.
Scalability: Easier to scale each component separately, such as integrating batch loading or database storage.
Flexibility: Ability to handle diverse data formats like text, PDFs, or JSON.
Maintainability: Well-defined components simplify debugging and upgrades.
Efficiency: Modular design ensures the pipeline is optimized for specific tasks without redundant processing.
Pipeline Components with LangChain
Here's how LangChain components are used for each stage:

Data Ingestion
:

LangChain supports loaders like TextLoader, WebLoader, and PyPDFLoader.
Example: Loading text from a file.

from langchain.document_loaders import TextLoader

loader = TextLoader("example.txt")
documents = loader.load()
raw_text = [doc.page_content for doc in documents]
Enter fullscreen mode Exit fullscreen mode

Data Transformation:

LangChain provides text splitters like UnstructuredSplitter and TextHeaderSplitter to break text into smaller, meaningful chunks.
Example: Splitting text into sentences.

from langchain.text_splitter import UnstructuredSplitter

splitter = UnstructuredSplitter()
split_documents = splitter.split_text(raw_text[0])  # Split the first document
Enter fullscreen mode Exit fullscreen mode

Embedding Generation:

LangChain integrates with OpenAI embeddings models like text-embedding-ada-002.
Example: Generating embeddings for split text.

from langchain.embeddings import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
document_embeddings = embeddings.embed_documents(split_documents)
Enter fullscreen mode Exit fullscreen mode

Storage and Retrieval:

Store embeddings in memory or a database and query using similarity metrics.
Example: Using cosine similarity for querying.

from numpy import dot
from numpy.linalg import norm

def cosine_similarity(v1, v2):
    return dot(v1, v2) / (norm(v1) * norm(v2))

query_embedding = embeddings.embed_documents(["example query"])[0]
results = []  # Initialize an empty list to store the documents and their similarity scores

# Iterate through the split documents and their corresponding embeddings
for doc, emb in zip(split_documents, document_embeddings):
    # Compute cosine similarity between the query embedding and the document embedding
    similarity = cosine_similarity(query_embedding, emb)
    # Append the document and its similarity score as a tuple to the results list
    results.append((doc, similarity))

# Sort the results list by similarity score in descending order
results = sorted(results, key=lambda x: x[1], reverse=True)

# Print the query and the top match
print("Query:", query)
print("Top match:", results[0][0])
Enter fullscreen mode Exit fullscreen mode

Another Example

from langchain.document_loaders import TextLoader, WebLoader, PyPDFLoader
from langchain.text_splitter import (
    TextHeaderSplitter,
    JSONSplitter,
    UnstructuredSplitter,
    HTMLHeaderSplitter,
)
from langchain.embeddings import OpenAIEmbeddings
from typing import List, Tuple, Union

# Step 1: Data Loading
def data_ingestion(source: str, source_type: str) -> List[str]:
    """
    Load data from various sources (text, web, PDF, etc.)
    """
    if source_type == "text":
        loader = TextLoader(source)
    elif source_type == "web":
        loader = WebLoader(source)
    elif source_type == "pdf":
        loader = PyPDFLoader(source)
    else:
        raise ValueError("Unsupported source type. Use 'text', 'web', or 'pdf'.")

    # Load and return raw documents
    documents = loader.load()
    return [doc.page_content for doc in documents]  # Extract raw text

# Step 2: Data Transformation
def data_transformation(
    documents: List[str], transformer_type: str = "unstructured"
) -> List[str]:
    """
    Transform the documents using specified splitter.
    """
    if transformer_type == "text_header":
        splitter = TextHeaderSplitter()
    elif transformer_type == "json":
        splitter = JSONSplitter()
    elif transformer_type == "unstructured":
        splitter = UnstructuredSplitter()
    elif transformer_type == "html_header":
        splitter = HTMLHeaderSplitter()
    else:
        raise ValueError("Unsupported transformer type.")

    # Split the documents and flatten the result
    split_docs = [splitter.split_text(doc) for doc in documents]
    return [chunk for doc in split_docs for chunk in doc]  # Flatten list of lists

# Step 3: Embedding Generation
def generate_embeddings(documents: List[str]) -> List[List[float]]:
    """
    Generate embeddings for the list of documents using OpenAIEmbeddings.
    """
    embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
    return embeddings.embed_documents(documents)

# Step 4: Store and Query/Retrieve
class EmbeddingStore:
    """
    A simple storage and retrieval class for embeddings and their corresponding documents.
    """
    def __init__(self):
        self.store: List[Tuple[str, List[float]]] = []

    def add(self, document: str, embedding: List[float]):
        """
        Add a document and its embedding to the store.
        """
        self.store.append((document, embedding))

    def query(self, query_embedding: List[float], top_k: int = 1) -> List[str]:
        """
        Retrieve the top-k most similar documents based on embedding similarity.
        """
        from numpy import dot
        from numpy.linalg import norm

        def cosine_similarity(v1, v2):
            """
            Compute the cosine similarity between two vectors.
            """
            return dot(v1, v2) / (norm(v1) * norm(v2))

        # Calculate similarity for all stored embeddings
        similarities = [
            (doc, cosine_similarity(query_embedding, emb))
            for doc, emb in self.store
        ]
        # Sort by similarity in descending order
        similarities = sorted(similarities, key=lambda x: x[1], reverse=True)
        return [doc for doc, _ in similarities[:top_k]]

# Main Flow
if __name__ == "__main__":
    # Example: Load data from a text file
    source = "example.txt"  # Replace with your file path or URL
    source_type = "text"  # Options: "text", "web", "pdf"
    raw_documents = data_ingestion(source, source_type)

    # Transform data using a specific splitter
    transformer_type = "unstructured"  # Options: "text_header", "json", "unstructured", "html_header"
    processed_documents = data_transformation(raw_documents, transformer_type)

    # Generate embeddings
    document_embeddings = generate_embeddings(processed_documents)

    # Store embeddings
    store = EmbeddingStore()
    for doc, emb in zip(processed_documents, document_embeddings):
        store.add(doc, emb)

    # Example Query
    query_text = "The first letter of the Greek alphabet"
    query_embedding = generate_embeddings([query_text])[0]  # Generate embedding for query
    results = store.query(query_embedding, top_k=2)  # Retrieve top 2 results

    # Output Results
    print("Query:", query_text)
    print("Top Matches:")
    for result in results:
        print("-", result)
Enter fullscreen mode Exit fullscreen mode

SUMMARY

it implement in fastapi==
1)raw_documents = data_ingestion(source, source_type) ==>where source_type is "text", "web", "pdf" and source is example.txt
2)processed_documents = data_transformation(raw_documents, transformer_type) ===>where  transformer_type is"unstructured"  # Options: "text_header", "json", "unstructured", "html_header"
3)document_embeddings = generate_embeddings(processed_documents)
4)store = EmbeddingStore()==>for doc, emb in zip(processed_documents, document_embeddings):====>store.add(doc, emb)
5)Example Query==>generate_embeddings([query_text])[0] 
store.query(query_embedding, top_k=2) 
1)if transformer_type == "text_header":
        splitter = TextHeaderSplitter()==>split_docs = [splitter.split_text(doc) for doc in documents]
flatten==>[chunk for doc in split_docs for chunk in doc] 
Example Query==>query_embedding = generate_embeddings([query_text])[0]  # Generate embedding for query
    results = store.query(query_embedding, top_k=2
Enter fullscreen mode Exit fullscreen mode

Top comments (0)