Introduction
Text embedding pipelines are essential for Natural Language Processing (NLP) tasks such as document retrieval, semantic search, and recommendation systems. LangChain, a robust library for chaining together modular components, simplifies the process of building embedding pipelines. This blog will explain how to build a modular text embedding pipeline using LangChain in Python, its advantages, and practical use cases.
What is a Modular Text Embedding Pipeline
?
A modular text embedding pipeline involves breaking the text processing workflow into manageable steps:
Data Ingestion: Load data from sources like text files, PDFs, or web pages.
Data Transformation: Preprocess and clean the data (e.g., splitting text into sentences).
Embedding Generation: Convert text into numerical vectors (embeddings) using models like OpenAI’s text-embedding-ada-002.
Storage and Retrieval: Store embeddings and enable querying based on similarity metrics like cosine similarity.
Advantages of a Modular Pipeline
Reusability: Individual modules can be reused or swapped for different use cases or data sources.
Scalability: Easier to scale each component separately, such as integrating batch loading or database storage.
Flexibility: Ability to handle diverse data formats like text, PDFs, or JSON.
Maintainability: Well-defined components simplify debugging and upgrades.
Efficiency: Modular design ensures the pipeline is optimized for specific tasks without redundant processing.
Pipeline Components with LangChain
Here's how LangChain components are used for each stage:
Data Ingestion
:
LangChain supports loaders like TextLoader, WebLoader, and PyPDFLoader.
Example: Loading text from a file.
from langchain.document_loaders import TextLoader
loader = TextLoader("example.txt")
documents = loader.load()
raw_text = [doc.page_content for doc in documents]
Data Transformation:
LangChain provides text splitters like UnstructuredSplitter and TextHeaderSplitter to break text into smaller, meaningful chunks.
Example: Splitting text into sentences.
from langchain.text_splitter import UnstructuredSplitter
splitter = UnstructuredSplitter()
split_documents = splitter.split_text(raw_text[0]) # Split the first document
Embedding Generation:
LangChain integrates with OpenAI embeddings models like text-embedding-ada-002.
Example: Generating embeddings for split text.
from langchain.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
document_embeddings = embeddings.embed_documents(split_documents)
Storage and Retrieval:
Store embeddings in memory or a database and query using similarity metrics.
Example: Using cosine similarity for querying.
from numpy import dot
from numpy.linalg import norm
def cosine_similarity(v1, v2):
return dot(v1, v2) / (norm(v1) * norm(v2))
query_embedding = embeddings.embed_documents(["example query"])[0]
results = [] # Initialize an empty list to store the documents and their similarity scores
# Iterate through the split documents and their corresponding embeddings
for doc, emb in zip(split_documents, document_embeddings):
# Compute cosine similarity between the query embedding and the document embedding
similarity = cosine_similarity(query_embedding, emb)
# Append the document and its similarity score as a tuple to the results list
results.append((doc, similarity))
# Sort the results list by similarity score in descending order
results = sorted(results, key=lambda x: x[1], reverse=True)
# Print the query and the top match
print("Query:", query)
print("Top match:", results[0][0])
Another Example
from langchain.document_loaders import TextLoader, WebLoader, PyPDFLoader
from langchain.text_splitter import (
TextHeaderSplitter,
JSONSplitter,
UnstructuredSplitter,
HTMLHeaderSplitter,
)
from langchain.embeddings import OpenAIEmbeddings
from typing import List, Tuple, Union
# Step 1: Data Loading
def data_ingestion(source: str, source_type: str) -> List[str]:
"""
Load data from various sources (text, web, PDF, etc.)
"""
if source_type == "text":
loader = TextLoader(source)
elif source_type == "web":
loader = WebLoader(source)
elif source_type == "pdf":
loader = PyPDFLoader(source)
else:
raise ValueError("Unsupported source type. Use 'text', 'web', or 'pdf'.")
# Load and return raw documents
documents = loader.load()
return [doc.page_content for doc in documents] # Extract raw text
# Step 2: Data Transformation
def data_transformation(
documents: List[str], transformer_type: str = "unstructured"
) -> List[str]:
"""
Transform the documents using specified splitter.
"""
if transformer_type == "text_header":
splitter = TextHeaderSplitter()
elif transformer_type == "json":
splitter = JSONSplitter()
elif transformer_type == "unstructured":
splitter = UnstructuredSplitter()
elif transformer_type == "html_header":
splitter = HTMLHeaderSplitter()
else:
raise ValueError("Unsupported transformer type.")
# Split the documents and flatten the result
split_docs = [splitter.split_text(doc) for doc in documents]
return [chunk for doc in split_docs for chunk in doc] # Flatten list of lists
# Step 3: Embedding Generation
def generate_embeddings(documents: List[str]) -> List[List[float]]:
"""
Generate embeddings for the list of documents using OpenAIEmbeddings.
"""
embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
return embeddings.embed_documents(documents)
# Step 4: Store and Query/Retrieve
class EmbeddingStore:
"""
A simple storage and retrieval class for embeddings and their corresponding documents.
"""
def __init__(self):
self.store: List[Tuple[str, List[float]]] = []
def add(self, document: str, embedding: List[float]):
"""
Add a document and its embedding to the store.
"""
self.store.append((document, embedding))
def query(self, query_embedding: List[float], top_k: int = 1) -> List[str]:
"""
Retrieve the top-k most similar documents based on embedding similarity.
"""
from numpy import dot
from numpy.linalg import norm
def cosine_similarity(v1, v2):
"""
Compute the cosine similarity between two vectors.
"""
return dot(v1, v2) / (norm(v1) * norm(v2))
# Calculate similarity for all stored embeddings
similarities = [
(doc, cosine_similarity(query_embedding, emb))
for doc, emb in self.store
]
# Sort by similarity in descending order
similarities = sorted(similarities, key=lambda x: x[1], reverse=True)
return [doc for doc, _ in similarities[:top_k]]
# Main Flow
if __name__ == "__main__":
# Example: Load data from a text file
source = "example.txt" # Replace with your file path or URL
source_type = "text" # Options: "text", "web", "pdf"
raw_documents = data_ingestion(source, source_type)
# Transform data using a specific splitter
transformer_type = "unstructured" # Options: "text_header", "json", "unstructured", "html_header"
processed_documents = data_transformation(raw_documents, transformer_type)
# Generate embeddings
document_embeddings = generate_embeddings(processed_documents)
# Store embeddings
store = EmbeddingStore()
for doc, emb in zip(processed_documents, document_embeddings):
store.add(doc, emb)
# Example Query
query_text = "The first letter of the Greek alphabet"
query_embedding = generate_embeddings([query_text])[0] # Generate embedding for query
results = store.query(query_embedding, top_k=2) # Retrieve top 2 results
# Output Results
print("Query:", query_text)
print("Top Matches:")
for result in results:
print("-", result)
SUMMARY
it implement in fastapi==
1)raw_documents = data_ingestion(source, source_type) ==>where source_type is "text", "web", "pdf" and source is example.txt
2)processed_documents = data_transformation(raw_documents, transformer_type) ===>where transformer_type is"unstructured" # Options: "text_header", "json", "unstructured", "html_header"
3)document_embeddings = generate_embeddings(processed_documents)
4)store = EmbeddingStore()==>for doc, emb in zip(processed_documents, document_embeddings):====>store.add(doc, emb)
5)Example Query==>generate_embeddings([query_text])[0]
store.query(query_embedding, top_k=2)
1)if transformer_type == "text_header":
splitter = TextHeaderSplitter()==>split_docs = [splitter.split_text(doc) for doc in documents]
flatten==>[chunk for doc in split_docs for chunk in doc]
Example Query==>query_embedding = generate_embeddings([query_text])[0] # Generate embedding for query
results = store.query(query_embedding, top_k=2
Top comments (0)