Building a High-Performance RAG-Based AI with Qdrant, Groq, LangChain, and DAGWorks Hamilton

Vyom Modi
14 min readMay 24, 2024

--

Overview of the Article

In this article, we will delve into the process of building an AI assistant to answer that allows users to interact with web content through natural language queries. The application utilises advanced technologies like Retrieval-Augmented Generation (RAG), Dagworks for workflow visualization, Qdrant for vector similarity search, and Groq for high-performance AI inference. I will provide a comprehensive, step-by-step guide that covers all aspects from installing the necessary tools to integrating them into a cohesive system. By the end of this article, you will have a clear understanding of how to build and deploy such an application, including practical code snippets and explanations.

Introduction to RAG (Retrieval-Augmented Generation)

Definition and Importance

Retrieval-Augmented Generation (RAG) is a potent approach that combines the strengths of text generation and information retrieval. A retrieval system in RAG first retrieves pertinent documents or data points based on a user’s query. Once this data has been retrieved, a generative model uses this information to create a coherent and contextually appropriate response. This approach greatly improves the generated texts quality and relevancy which makes it especially helpful for applications like information extraction chatbots and answering systems.

Image Credit: https://towardsai.net/p/machine-learning/retrieval-augmented-generation-aka-rag-how-does-it-work

Use Cases and Benefits

RAG is widely used in various applications where the accuracy and relevance of responses are crucial. Some notable use cases include:

  • Customer Support: Enhancing automated responses by retrieving relevant support documents.
  • Educational Tools: Providing detailed explanations and answers by accessing a vast database of academic content.
  • Content Creation: Assisting writers by generating text based on a repository of existing content.

The primary benefits of RAG include improved response accuracy, the ability to handle a wide range of queries, and enhanced user satisfaction due to more relevant and informed responses.

Introduction to Dagworks

Definition and Role in Data Workflows

Dagworks is a platform designed to streamline the creation and management of data workflows. It offers an easy-to-use interface and along with powerful tools to build, visualize, and maintain data pipelines. Dagworks helps data scientists and engineers to concentrate more on their analytical tasks rather than the underlying infrastructure by making the orchestration of complex workflows simpler.

Key Features

Dagworks offers several key features that make it a valuable tool for data workflow management:

  • Visual Workflow Builder: The drag-and-drop Visual Workflow Builder allows users to create and edit workflows.
  • Automated Scheduling: Allows scheduling and automation of tasks inside workflows.
  • Scalability: Supports scaling workflows to handle large datasets and complex processing requirements.
  • Integration: Seamless integration with various data sources, processing engines, and storage solutions.

Hamilton

Hamilton is a declarative micro-framework for creating dataflows, particularly suitable for machine learning and data processing pipelines. It is part of the DAGWorks framework, along with Burr. Hamilton allows you to define transformations as Python functions and manage complex workflows with ease.

Introduction to Qdrant

Definition and Functionality

Qdrant is an open-source vector similarity search engine as well as a database, designed to handle high-dimensional data efficiently. It enables quick and precise searching in extensive collections of vector embeddings, perfect for tasks that rely on similarity-based (semantic) retrieval like recommendation systems, image search, and document retrieval.

Use in Vector Similarity Search

In the context of our application, Qdrant will be used to store and query vector embeddings generated from web content. By leveraging Qdrant’s capabilities, we can quickly find the most relevant pieces of content in response to user queries, thereby enhancing the effectiveness of our AI system.

Introduction to Groq

Definition and Performance Advantages

Groq is a hardware and software platform specifically designed for accelerating AI workloads. The LPU Inference Engine by Groq provides significant speed and efficiency improvements over traditional processors. It is optimized for tasks such as inference, where rapid computation of AI models is crucial.

Applications in AI Inference

Applications needing high throughput and low latency like large-scale data analysis autonomous systems and real-time AI inference in chatbots are especially well-suited for Groq’s platform. In our application, Groq will be used to expedite the processing of user queries and the generation of responses, ensuring a smooth and responsive user experience.

Problem Statement

Goal of the AI Application

The main goal of this AI application is to allow users to engage with online content by using natural language questions. Our goal is to create a system that utilises different advanced technologies to comprehend user inquiries, extract pertinent details from online content, and produce precise, contextually relevant answers. This app aims to improve the process of finding information by making it easier and quicker for users, offering a smooth and engaging experience.

Importance of Enabling Natural Language Queries on Web Content

Enabling natural language queries on web content has significant benefits:

  • User-Friendly Interaction: Makes it simpler for non-technical users to obtain information by enabling users to ask questions in their own words.
  • Efficiency: Speeds up the process of locating pertinent data saving time and effort compared to manual web page searching.
  • Enhanced Accessibility: Increases the accessibility of web content to a wider range of users, including those with limited technical skills.

Step-by-Step Guide

Environment Setup

  1. Using an environment manager of your choice, create and initiate a python virtual environment.
  2. Install qdrant

Before proceeding to install qdrant python client, make sure you have the Docker installed.

To install the Docker, you can run following code snippet, or simply visit https://docs.docker.com/get-docker/.

sudo apt-get update
sudo apt-get install -y docker.io
sudo systemctl start docker
sudo systemctl enable docker

Now we can proceed towards qdrant installation. In the terminal where you previously initiated the virtual environment, you can run following command to install the qdrant python client.

pip install qdrant-client

Now to run Qdrant locally, run the docker container with this command. It will create a local server at the port 6333 on your device.

docker run -p 6333:6333 qdrant/qdrant

Now you can access this server with any client, including Python.

qdrant = QdrantClient("http://localhost:6333") # Connect to existing Qdrant instance

To verify if the Qdrant is Running, open your browser and navigate to http://localhost:6333/. You should see the following screen.

If you face any errors in installation, do refer the following qdrant documentation: https://github.com/qdrant/qdrant.

3. Install DAGWorks Hamilton

Now moving on to Hamilton. To install the same, simply run pip install sf-hamilton in your terminal. It should install the necessary files.

To verify your installation, run the following command. If it successfully displays the version, your installation is successful.

python3 -c “import hamilton; print(hamilton.__version__)”

4. Setup Groq

Go to the Groq Portal. Sign up or log in to your account. Navigate to the API Keys section and generate a new API key. Make sure to keep your API key safe.

Your portal should look like this after generating a key

Install Groq SDK:

pip install groq

Set Up Environment Variables:

Create a .env file in your project directory and add your API key:

GROQ_API_KEY=your_groq_api_key

Load Environment Variables in Your Code:

from dotenv import load_dotenv
import os

load_dotenv()
groq_api_key = os.getenv('GROQ_API_KEY')

if not groq_api_key:
raise ValueError("GROQ_API_KEY not found in environment variables")

Data Preparation

Fetching Webpage Content

To fetch webpage content efficiently and avoid being blocked by websites, we use the BeautifulSoup library for parsing HTML and the fake_useragent library to randomize our HTTP request headers.

Here’s the code snippet for fetching webpage content:

import requests
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
import pandas as pd
from urllib.parse import urlparse

# Initialize a DataFrame to cache the URLs and their scraped content
cache_df = pd.DataFrame(columns=['url', 'content'])

# Initialize the user agent object
ua = UserAgent()

def standardize_url(url: str) -> str:
"""
Standardize the given URL to ensure consistency.
"""
parsed = urlparse(url)
scheme = parsed.scheme if parsed.scheme else 'http'
netloc = parsed.netloc if parsed.netloc else parsed.path.split('/')[0]
path = parsed.path if parsed.netloc else '/'.join(parsed.path.split('/')[1:])
standardized_url = f"{scheme}://{netloc}{path}"
return standardized_url

def fetch_webpage_content(url: str) -> str:
"""
Fetch webpage content using BeautifulSoup and fake_useragent to avoid blocking.
"""
try:
global cache_df

# Check if the URL is already scraped
if url in cache_df['url'].values:
return cache_df.loc[cache_df['url'] == url, 'content'].values[0]

# Use fake_useragent to prevent blocking
headers = {'User-Agent': ua.random}
response = requests.get(url, headers=headers)
response.raise_for_status() # Raises HTTPError for bad responses

# Store the scraped content in cache
cache_df = pd.concat([cache_df, pd.DataFrame({'url': [url], 'content': [response.text]})], ignore_index=True)

return response.text
except requests.RequestException as e:
return f"Error fetching {url}: {str(e)}"

# Example test
test_url = "https://en.wikipedia.org/wiki/Main_Page"
standardized_url = standardize_url(test_url)
content = fetch_webpage_content(standardized_url)
print("Fetched Content:", content[:500]) # Print first 500 characters for brevity

Error Handling and Caching Mechanism

The code includes error handling to manage HTTP errors gracefully and uses a caching mechanism to avoid redundant network requests. The cache_df DataFrame stores previously fetched URLs and their content to speed up repeated requests.

Extracting Text from HTML

To convert HTML content into plain text, we use the html2text library. This library simplifies the process of stripping away HTML tags and converting the content into a readable format.

Here’s the code snippet for extracting text from HTML:

import html2text

# Initialize html2text converter
converter = html2text.HTML2Text()
converter.ignore_links = True
converter.ignore_images = True
converter.ignore_emphasis = True

def extract_text_from_html(html_content: str) -> str:
"""
Extract text from HTML content using html2text.
"""
try:
text = converter.handle(html_content)
return text
except Exception as e:
return f"Error processing HTML content: {str(e)}"

# Extract text
extracted_text = extract_text_from_html(content)
print("Extracted Text:", extracted_text[:500]) # Print first 500 characters for brevity

Text Processing and Embedding Generation

Text Chunking

To manage large text efficiently, we split it into smaller chunks. We use the RecursiveCharacterTextSplitter from LangChain. It tries to split by different characters to tokenize the text into sentences and then group these sentences into chunks that do not exceed a specified word limit.

Here’s the code snippet for text chunking:

from langchain.text_splitter import RecursiveCharacterTextSplitter

def chunk_text(extract_text_from_html: str, max_chunk_size: int = 512) -> list:
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=max_chunk_size,
chunk_overlap=0,
separators=["\n\n", "\n", " ", ""]
)
chunks = text_splitter.split_text(extract_text_from_html)
return chunks

# Example of chunking text
chunks = chunk_text(extracted_text)
print("Number of Chunks:", len(chunks))

Generating Embeddings

We use Sentence-BERT, a pre-trained model specifically designed for generating sentence embeddings. This model converts each text chunk into a high-dimensional vector representation.

Here’s the code snippet for generating embeddings:

from sentence_transformers import SentenceTransformer
import numpy as np

# Load the Sentence-BERT model
model = SentenceTransformer('multi-qa-MiniLM-L6-cos-v1')

def generate_embeddings(text_chunks):
embeddings = []
for chunk in text_chunks:
# Generate embedding for each chunk
embedding = model.encode(chunk, convert_to_tensor=True)
embeddings.append(embedding.cpu().numpy())
return np.array(embeddings)

# Generate embeddings for the text chunks
embeddings = generate_embeddings(chunks)
print("Generated Embeddings:", embeddings.shape)

Storing and Querying Data with Qdrant

Creating Qdrant Collection

We need to create a collection in Qdrant to store our embeddings. Each collection is configured with specific parameters, such as the vector size and distance metric.

Here’s the code snippet for creating a Qdrant collection:

from qdrant_client import QdrantClient
from qdrant_client.http import models

# Initialize the Qdrant client
qdrant_client = QdrantClient(host='localhost', port=6333)

def create_qdrant_collection(collection_name: str, vector_dim: int):
try:
# Try creating the collection
qdrant_client.create_collection(
collection_name=collection_name,
vectors_config=models.VectorParams(
size=vector_dim,
distance=models.Distance.COSINE # Using Cosine distance as mentioned in your plan
)
)
print(f"Collection '{collection_name}' created.")
except Exception as e:
# If the collection already exists, we expect an error indicating that
if 'already exists' in str(e).lower():
print(f"Collection '{collection_name}' already exists.")
else:
# If the error is due to other reasons, we raise it
raise

# Create a collection for the embeddings
create_qdrant_collection(collection_name='web_content', vector_dim=embeddings.shape[1])

We store the generated embeddings along with their metadata (e.g., the original URL and text chunk) in the Qdrant collection.

Here’s the code snippet for storing embeddings:

def store_embeddings_in_qdrant(embeddings, metadata, collection_name='web_content'):
# Ensure the collection exists by trying to create it
create_qdrant_collection(vector_dim=generate_embeddings.shape[1], collection_name=collection_name)

points = []
for i, embedding in enumerate(embeddings):
point = models.PointStruct(
id=i,
vector=embedding.tolist(),
payload=metadata[i]
)
points.append(point)

# Store points in Qdrant in batches
batch_size = 100
for start in range(0, len(points), batch_size):
batch_points = points[start:start + batch_size]
qdrant_client.upsert(collection_name=collection_name, points=batch_points)
print(f"Stored {len(points)} embeddings in collection '{collection_name}'.")

# Prepare metadata and store embeddings
metadata = [{'url': standardized_url, 'chunk': chunk} for chunk in chunks]
store_embeddings_in_qdrant(embeddings, metadata)

Querying Qdrant

To retrieve relevant information based on a user’s query, we encode the query into an embedding and search for the closest matches in Qdrant.

def query_qdrant(query_embedding, collection_name='web_content', top_k=1):
search_result = qdrant_client.search(
collection_name=collection_name,
query_vector=query_embedding,
limit=top_k,
with_payload=True
)
return [hit.payload for hit in search_result]

# Example query
query_text = "What is the main page of Wikipedia about?"
query_embedding = model.encode(query_text, convert_to_tensor=True).cpu().numpy()
result = query_qdrant(query_embedding)
print("Query Result:", result)

Integrating with Groq for Interactive Queries

Setting Up Groq API

To leverage Groq’s powerful inference capabilities, we need to set up the Groq API key (as shown in previous section) and then retrieve it from the environment variable:

from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

# Retrieve the Groq API key from environment variables
groq_api_key = os.getenv('GROQ_API_KEY')

# Ensure the API key is retrieved successfully
if not groq_api_key:
raise ValueError("GROQ_API_KEY not found in environment variables")

Next, we initialize the ChatGroq model from the langchain-groq library, specifying the model name and enabling streaming for real-time response generation. We are utilising the LLaMA 3 model from Meta. To see the full list of available models, visit https://console.groq.com/docs/models.

from langchain_groq import ChatGroq

# Initialize the ChatGroq model with streaming enabled
chat_model = ChatGroq(model_name='llama3-8b-8192', api_key=groq_api_key, streaming=True)

Generating Responses

To generate responses based on user queries, we leverage the embeddings stored in Qdrant and the inference capabilities of Groq. Here’s the code snippet that orchestrates the entire process:

from langchain_core.prompts import PromptTemplate
from langchain.memory.buffer import ConversationBufferMemory

# Initialize the ConversationBufferMemory
memory = ConversationBufferMemory(return_messages=True)

# Define the ChatPromptTemplate for user interaction
template = """Answer the following question from the context

context = {context}

question = {question}

"""
prompt_template = PromptTemplate(input_variables=["context", "question"], template=template)

# @config
def generate_response(user_input: str) -> str:
try:
query_embedding = model.encode(user_input, convert_to_tensor=True).cpu().numpy()
relevant_metadata = query_qdrant(query_embedding)
context = " ".join([meta['chunk'] for meta in relevant_metadata])
full_response = chat_model.predict(prompt_template.format(question=user_input, context=context))
return full_response.strip()
except Exception as e:
print(f"An error occurred in generate_response: {str(e)}")
return f"Error: {str(e)}"

The generate_response function takes a user's input query, encodes it using the Sentence-BERT model, and searches for the most relevant chunks in Qdrant (query_qdrant). It then concatenates these chunks to form the context for the query.

Next, the function formats the context and query using the PromptTemplate and passes it to the ChatGroq model for inference. The model generates a response based on the provided context and query.

# Example usage
user_query = "What is the main page of Wikipedia about?"
response = generate_response(user_query)
print(response)

Building a User Interface with Streamlit

To provide a user-friendly interface for our AI application, we leverage Streamlit, a popular Python library for building interactive web applications. Here’s the code snippet for setting up a basic Streamlit app:

import streamlit as st

def app():
st.title("RAG-based AI Assistant")

st.header("Step 1: Scrape and Process Content")
url_input = st.text_input("Enter a URL to scrape:", key='url')

if st.button('Process Content'):
data = execute(["process_and_store_text"], driver_func=get_hamilton_driver1)

if "process_and_store_text" in data:
st.success(data["process_and_store_text"])

if "extract_text_from_html" in data:
st.text_area("Extracted Text:", data["extract_text_from_html"])

st.header("Step 2: Ask a Question")
user_query = st.text_input("Ask a question about the content:", key='user_input')
if st.button('Generate Response'):
if user_query:
response = generate_response(user_query)
st.write(response)

if __name__ == "__main__":
app()

The app consists of two main sections:

  1. Scrape and Process Content: In this section, users can enter a URL to scrape and process the content using the preprocessing functions we defined earlier.
  2. Ask a Question: Here, users can enter their query, and upon clicking the “Generate Response” button, the generate_response function is called to fetch relevant context from Qdrant and generate a response using the Groq model.
Streamlit app

Enhancing Dataflows with Hamilton

Creating Dataflows

In our AI application, we can leverage Hamilton to organize our code into separate dataflows for preprocessing and response generation. This modular approach improves code organization, maintainability, and reusability.

Here’s an example of how we can create dataflows using Hamilton:

from hamilton import driver

import dataflow_preprocess_text
import dataflow_generate_response

def get_hamilton_driver1() -> driver.Driver:
return (
driver.Builder()
.with_modules(dataflow_preprocess_text)
.build()
)

def get_hamilton_driver2() -> driver.Driver:
return (
driver.Builder()
.with_modules(dataflow_generate_response)
.build()
)

In this example, we define two functions, get_hamilton_driver1 and get_hamilton_driver2, which create Hamilton drivers for the preprocessing and response generation dataflows, respectively. Each driver is built by importing the corresponding dataflow module.

To visualize the dataflows and functions within them, we can use the display_all_functions method:

get_hamilton_driver1().display_all_functions()
get_hamilton_driver2().display_all_functions()

Hamilton also provides functionality to visualize the execution of specific functions within a dataflow. For example, to visualize the execution of the generate_response function, we can use the visualize_execution method:

get_hamilton_driver2().visualize_execution(['generate_response'], 'rag_generate_groq_response_function.png')

By leveraging Hamilton, we can improve the organization, readability, and maintainability of our codebase, while also benefiting from its visualization capabilities to better understand and debug our data workflows.

Results

To demonstrate the capabilities of our AI application, let’s explore some example queries and their corresponding responses.

Example 1:

URL entered: https://openai.com/index/hello-gpt-40/

Query: List the key highlights of the GPT 40.

Response:

Based on the context, the key highlights of GPT-40 are:

Faster: GPT-40 is 2x faster compared to GPT-4 Turbo.

More affordable: GPT-4o is half the price of GPT-4 Turbo.

Higher rate limits: GPT-4o has 5x higher rate limits compared to GPT-4 Turbo.

New audio and video capabilities: GPT-4 will support new audio and video capabilities in the coming weeks.

0-shot CoT: GPT-40 achieves state-of-the-art performance on visual perception benchmarks, including MMMU, MathVista, and ChartQA, with 0-shot CoT (i.e., no additional training data).

Omni-capabilities: GPT-40 accepts input in any combination of text, audio, image, and video and generates any combination of text, audio, and image outputs.

Fast response time: GPT-4 responds to audio inputs in as little as 232 milliseconds, with an average response time of 320 milliseconds, similar to human response time.

Safety features: GPT-40 has built-in safety features, including filtering training data and refining the model’s behavior through post-training, as well as new safety systems to provide guardrails on voice outputs.

Accuracy: Visiting the URL confirms that the response is quite accurate and presented clearly in a list format.

Example 2:

URL entered: https://en.wikipedia.org/wiki/Main_Page

Query 1: What’s the news about boxing?

Response: According to the context, the news about boxing is that Oleksandr Usyk defeated Tyson Fury to become the first undisputed heavyweight champion in twenty-four years.

Accuracy: Visiting the URL confirms that the response is quite accurate and presented clearly in a list format.

Query 2: What are the protests going on about?

Response: According to the text, the protests are over voting rights changes in the French territory of New Caledonia.

Accuracy: Visiting the URL confirms that the response is quite accurate and presented clearly in a list format.

Conclusion

This project successfully demonstrated the development of an interactive AI assistant using Retrieval-Augmented Generation (RAG) architecture. By combining advanced techniques for fetching and processing web content, generating text embeddings, and leveraging Qdrant on Medium for efficient retrieval, we created a robust system capable of delivering accurate and contextually relevant responses. The integration of Hamilton for dataflows, LangChain for structured prompts, and the Meta’s LLaMA 3 LLM via Groq API within a user-friendly Streamlit interface highlights the potential of RAG in enhancing information retrieval and user interaction. Future enhancements could include multilingual support, personalized responses, and integration with additional data sources, further expanding the capabilities and applications of the system.

If you found this article interesting and would like to connect, feel free to reach out to me on LinkedIn or GitHub.

--

--

Vyom Modi
Vyom Modi

Written by Vyom Modi

Dedicated GenAI Engineer breaking barriers in LLMs and RAGs. Published in IEEE. Certified by Oracle, AWS, Microsoft. Let's delve into the future of AI together.

No responses yet