Introduction
Retrieval‑Augmented Generation (RAG) has emerged as a practical paradigm for building knowledge‑aware language‑model applications. Instead of relying solely on the parametric knowledge stored inside a large language model (LLM), RAG first retrieves relevant documents from an external corpus and then generates a response conditioned on those documents. This two‑step approach dramatically improves factual accuracy, reduces hallucinations, and enables up‑to‑date answers without retraining the underlying model.
However, the quality of the final answer hinges on the precision of the retrieval component. In many production settings—customer support bots, legal‑assistant tools, or medical QA systems—retrieving a handful of highly relevant passages is far more valuable than returning a long list of loosely related hits. A common technique to raise precision is multi‑stage reranking: after an initial, inexpensive retrieval pass, successive models (often larger and more expensive) re‑evaluate the candidate set, pushing the most relevant items to the top.
This article walks you through a complete, production‑ready implementation of multi‑stage reranking for high‑precision RAG on Google Cloud Platform (GCP). We’ll cover:
- The architectural overview and why GCP services are a natural fit.
- Data ingestion, preprocessing, and indexing with Vertex AI Matching Engine and Vertex AI Search.
- First‑stage retrieval (dense and sparse) and candidate generation.
- Second‑stage reranking using cross‑encoders and LLM‑based scoring.
- Prompt engineering and final generation with Vertex AI PaLM / Gemini.
- Evaluation, monitoring, and cost‑optimization strategies.
- A hands‑on Python code walkthrough that ties everything together.
By the end of this guide, you’ll have a blueprint you can adapt to any domain—whether you’re building a corporate knowledge base, a scholarly search assistant, or a multilingual support chatbot.
1. Architectural Overview
1.1 Core Components
| Component | GCP Service | Role |
|---|---|---|
| Document Store | Cloud Storage (raw files) + BigQuery (metadata) | Persistent storage of source documents and searchable attributes |
| Vector Index | Vertex AI Matching Engine (VME) | Approximate nearest‑neighbor (ANN) search for dense embeddings |
| Full‑Text Index | Vertex AI Search (formerly Enterprise Search) | BM25 / lexical search for sparse retrieval |
| Reranking Service | Cloud Run (containerized cross‑encoder) + Vertex AI Prediction (custom model) | Two‑stage reranking: lightweight cross‑encoder → heavyweight LLM scorer |
| LLM Generation | Vertex AI Generative AI (PaLM / Gemini) | Final answer generation conditioned on top‑k passages |
| Orchestration | Cloud Functions / Cloud Workflows | Glue logic that wires retrieval, reranking, and generation together |
| Monitoring & Logging | Cloud Logging, Cloud Monitoring, Vertex AI Experiments | Track latency, cost, and quality metrics |
1.2 Data Flow Diagram
User Request
│
▼
Cloud Functions (Entry Point)
│
├─► Vertex AI Search (BM25) → lexical candidates
│
├─► Vertex AI Matching Engine → dense candidates
│
├─► Merge & deduplicate → candidate set (N≈100)
│
├─► Cloud Run (Cross‑Encoder) → first‑stage scores
│
├─► Vertex AI Prediction (LLM scorer) → second‑stage scores
│
├─► Top‑k passages (k=5‑10) → Vertex AI Generative AI
│
▼
Generated Answer → Returned to User
The multi‑stage reranking sits between the initial retrieval and the generation step. The first stage quickly eliminates the bulk of irrelevant hits, while the second stage applies a more nuanced, often LLM‑based, relevance assessment.
2. Preparing the Corpus
2.1 Ingesting Raw Documents
Assume you have a mixed corpus of PDFs, Markdown files, and CSV‑based knowledge tables. A typical ingestion pipeline:
import os
from google.cloud import storage, bigquery
import textract # for extracting text from PDFs/Word docs
PROJECT_ID = "my-gcp-project"
BUCKET_NAME = "rag-corpus-bucket"
BQ_DATASET = "rag_dataset"
BQ_TABLE = "documents"
storage_client = storage.Client()
bq_client = bigquery.Client()
def upload_to_gcs(local_path, gcs_path):
bucket = storage_client.bucket(BUCKET_NAME)
blob = bucket.blob(gcs_path)
blob.upload_from_filename(local_path)
def extract_text(file_path):
return textract.process(file_path).decode("utf-8")
def load_to_bigquery(rows):
table_ref = f"{PROJECT_ID}.{BQ_DATASET}.{BQ_TABLE}"
errors = bq_client.insert_rows_json(table_ref, rows)
if errors:
raise RuntimeError(f"BigQuery insert errors: {errors}")
def ingest_directory(root_dir):
rows = []
for dirpath, _, filenames in os.walk(root_dir):
for fn in filenames:
local_path = os.path.join(dirpath, fn)
gcs_path = f"raw/{fn}"
upload_to_gcs(local_path, gcs_path)
text = extract_text(local_path)
rows.append({
"doc_id": fn,
"gcs_uri": f"gs://{BUCKET_NAME}/{gcs_path}",
"content": text,
"source_type": os.path.splitext(fn)[1].lower()
})
load_to_bigquery(rows)
# Run once
# ingest_directory("/data/my_corpus")
Note: In production, consider using Dataflow for parallel extraction and Cloud Functions for event‑driven ingestion.
2.2 Chunking & Metadata Enrichment
LLMs perform best when fed short, self‑contained passages (≈ 200–500 tokens). We’ll chunk each document using a sliding window approach and store chunk metadata in BigQuery.
import nltk
nltk.download('punkt')
from nltk.tokenize import sent_tokenize
MAX_TOKENS = 300 # Adjust based on target LLM context window
def chunk_text(doc_id, content):
sentences = sent_tokenize(content)
chunks = []
current = []
current_len = 0
for sent in sentences:
token_len = len(sent.split())
if current_len + token_len > MAX_TOKENS:
chunks.append({
"chunk_id": f"{doc_id}_c{len(chunks)}",
"doc_id": doc_id,
"content": " ".join(current)
})
current = [sent]
current_len = token_len
else:
current.append(sent)
current_len += token_len
# Add the tail
if current:
chunks.append({
"chunk_id": f"{doc_id}_c{len(chunks)}",
"doc_id": doc_id,
"content": " ".join(current)
})
return chunks
After chunking, we load the rag_chunks table (schema: chunk_id, doc_id, content, embedding (later), metadata JSON).
3. Building Retrieval Indices
3.1 Dense Embeddings with Vertex AI Embedding API
Vertex AI provides a managed text‑embedding endpoint (e.g., textembedding-gecko@001). We’ll generate embeddings for each chunk and store them back in BigQuery.
from vertexai.preview.language_models import TextEmbeddingModel
import numpy as np
model = TextEmbeddingModel.from_pretrained("textembedding-gecko@001")
def embed_chunks(chunks):
texts = [c["content"] for c in chunks]
embeddings = model.get_embeddings(texts) # Returns list of float vectors
for c, emb in zip(chunks, embeddings):
c["embedding"] = emb
return chunks
# Example for a single document
# doc_chunks = chunk_text("doc123", raw_text)
# embed_chunks(doc_chunks)
3.2 Indexing with Vertex AI Matching Engine
Create a matching engine index (approx. 1‑2 B vectors per node). Below is a minimal Python snippet using the Vertex AI SDK:
from google.cloud import aiplatform_v1beta1 as aiplatform
client = aiplatform.MatchServiceClient()
parent = f"projects/{PROJECT_ID}/locations/us-central1"
# 1. Create index
index = {
"display_name": "rag-chunk-index",
"metadata_schema_uri": "gs://google-cloud-aiplatform/schema/matchingengine/metadata_schema_v1.yaml",
"metadata": {
"algorithm_config": {"tree_ah_config": {"leaf_node_embedding_count": 1000}}
},
}
operation = client.create_index(parent=parent, index=index)
index_name = operation.result().name
# 2. Upload embeddings (batch)
# Export embeddings from BigQuery to GCS as .jsonl, then use the import API
The import process can be automated with Dataflow to stream new embeddings as they are generated.
3.3 Lexical Index with Vertex AI Search
Vertex AI Search offers a managed BM25 engine that supports fielded search, synonyms, and relevance tuning.
gcloud alpha discoveryengine data-stores create rag-search \
--project=$PROJECT_ID \
--location=global \
--collection-id=default_collection
After creating the data store, upload the chunk documents (JSON format) via the Document Service API:
from google.cloud import discoveryengine_v1alpha as discoveryengine
client = discoveryengine.DocumentServiceClient()
parent = client.branch_path(PROJECT_ID, "global", "rag-search", "default_branch")
def upload_chunk(chunk):
doc = discoveryengine.Document(
name=client.document_path(PROJECT_ID, "global", "rag-search", "default_branch", chunk["chunk_id"]),
schema_id="document",
title=f"Chunk from {chunk['doc_id']}",
content=chunk["content"],
)
client.create_document(parent=parent, document=doc)
# Iterate over all chunks
# for chunk in all_chunks:
# upload_chunk(chunk)
Now you have both dense (ANN) and sparse (BM25) retrieval capabilities.
4. First‑Stage Retrieval & Candidate Merging
4.1 Query Embedding
When a user asks a question, we embed the query using the same model used for the corpus.
def embed_query(query: str) -> list[float]:
return model.get_embeddings([query])[0]
4.2 Dense ANN Search
def dense_search(query_emb, top_k=50):
request = {
"index_endpoint": f"projects/{PROJECT_ID}/locations/us-central1/indexEndpoints/{INDEX_ENDPOINT_ID}",
"deployed_index_id": DEPLOYED_INDEX_ID,
"queries": [{"embedding": query_emb}],
"neighbor_count": top_k,
}
response = client.find_neighbors(request=request)
return [(neighbor.id, neighbor.distance) for neighbor in response.nearest_neighbors[0].neighbors]
4.3 Lexical BM25 Search
def lexical_search(query: str, top_k=50):
search_service = discoveryengine.SearchServiceClient()
request = discoveryengine.SearchRequest(
serving_config=f"projects/{PROJECT_ID}/locations/global/collections/default_collection/dataStores/rag-search/servingConfigs/default_config",
query=query,
page_size=top_k,
)
response = search_service.search(request)
return [(hit.id, hit.relevance_score) for hit in response.results]
4.4 Merging & Deduplication
def merge_candidates(dense_hits, lexical_hits, max_candidates=100):
# Simple union with score normalization
merged = {}
for doc_id, score in dense_hits + lexical_hits:
if doc_id not in merged or merged[doc_id] > score:
merged[doc_id] = score
# Sort by normalized score
sorted_hits = sorted(merged.items(), key=lambda x: x[1])
return sorted_hits[:max_candidates]
The result is a candidate set of ~100 chunks that will be fed into the reranking pipeline.
5. Multi‑Stage Reranking
5.1 Why Two Stages?
- Stage 1 (Cross‑Encoder): Fast (≈ 10 ms per pair) and captures token‑level interactions. Ideal for pruning large candidate pools.
- Stage 2 (LLM Scorer): More expensive (≈ 200 ms) but can incorporate chain‑of‑thought reasoning, domain knowledge, and query‑specific constraints.
5.2 Stage 1 – Cross‑Encoder Reranker
We’ll deploy a sentence‑transformers cross‑encoder (e.g., cross-encoder/ms-marco-MiniLM-L-6-v2) to a Cloud Run container.
Dockerfile
FROM python:3.11-slim
WORKDIR /app
RUN pip install --no-cache-dir torch sentence-transformers fastapi uvicorn
COPY app.py .
EXPOSE 8080
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8080"]
app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from sentence_transformers import CrossEncoder
app = FastAPI()
model = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
class RerankRequest(BaseModel):
query: str
candidates: list[str] # list of chunk IDs
contents: dict[str, str] # mapping id → text
class RerankResponse(BaseModel):
scores: dict[str, float]
@app.post("/rerank", response_model=RerankResponse)
def rerank(req: RerankRequest):
pairs = [(req.query, req.contents[cid]) for cid in req.candidates]
scores = model.predict(pairs) # higher = more relevant
return RerankResponse(scores=dict(zip(req.candidates, scores)))
Deploy with:
gcloud run deploy cross-encoder-reranker \
--image=gcr.io/$PROJECT_ID/cross-encoder-reranker \
--platform=managed \
--region=us-central1 \
--allow-unauthenticated
Invoking Stage 1
import requests
def stage1_rerank(query, candidate_ids, id_to_content):
payload = {
"query": query,
"candidates": candidate_ids,
"contents": id_to_content,
}
resp = requests.post("https://cross-encoder-reranker-xxxxx.run.app/rerank", json=payload)
resp.raise_for_status()
return resp.json()["scores"]
We keep the top N₁ = 30 candidates based on these scores.
5.3 Stage 2 – LLM‑Based Scorer
Vertex AI Prediction can host a custom LLM prompt that returns a relevance score (0‑1). We’ll use Gemini‑1.5‑Flash with a few‑shot prompt.
from vertexai.preview import generative_models
model = generative_models.GenerativeModel("gemini-1.5-flash-001")
def llm_score(query, passage):
prompt = f"""You are a relevance judge for a knowledge‑base retrieval system.
Given the user question and a candidate passage, output a single number between 0 and 1 indicating how well the passage answers the question. Use the following format:
Question: <question>
Passage: <passage>
Score: <number>
Do not add any extra text.
Question: {query}
Passage: {passage}
Score:"""
response = model.generate_content(prompt, temperature=0.0)
# Extract the numeric token
import re
match = re.search(r"Score:\s*([\d\.]+)", response.text)
return float(match.group(1)) if match else 0.0
To batch‑score efficiently, we can parallelize with ThreadPoolExecutor.
from concurrent.futures import ThreadPoolExecutor
def stage2_rerank(query, top_ids, id_to_content):
scores = {}
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {
executor.submit(llm_score, query, id_to_content[cid]): cid for cid in top_ids
}
for fut in futures:
cid = futures[fut]
try:
scores[cid] = fut.result()
except Exception as e:
scores[cid] = 0.0
# Sort descending
return sorted(scores.items(), key=lambda kv: kv[1], reverse=True)
We finally keep top k = 5‑10 passages for generation.
6. Prompt Engineering & Generation
6.1 Constructing the Retrieval‑Augmented Prompt
A well‑structured prompt ensures the LLM uses the retrieved context rather than ignoring it.
def build_prompt(query, passages):
context = "\n---\n".join([f"[{i+1}] {p}" for i, p in enumerate(passages)])
prompt = f"""You are an expert assistant. Use the following retrieved documents to answer the user's question. Cite each fact with the corresponding document number in brackets.
Context:
{context}
Question: {query}
Answer (include citations, e.g., [1]):"""
return prompt
6.2 Generation with Vertex AI Generative AI
def generate_answer(prompt):
model = generative_models.GenerativeModel("gemini-1.5-pro-001")
response = model.generate_content(
prompt,
temperature=0.2, # Low temp for factual answers
max_output_tokens=1024,
top_p=0.95,
)
return response.text.strip()
6.3 End‑to‑End Retrieval‑Augmented Generation Function
def rag_answer(query):
# 1. Embed query
q_emb = embed_query(query)
# 2. Retrieve candidates
dense_hits = dense_search(q_emb, top_k=50)
lexical_hits = lexical_search(query, top_k=50)
candidates = merge_candidates(dense_hits, lexical_hits, max_candidates=100)
candidate_ids = [cid for cid, _ in candidates]
# Fetch chunk contents from BigQuery (or cache)
id_to_content = fetch_contents(candidate_ids) # implement as a SELECT
# 3. Stage‑1 rerank
stage1_scores = stage1_rerank(query, candidate_ids, id_to_content)
top_n1 = sorted(stage1_scores, key=stage1_scores.get, reverse=True)[:30]
# 4. Stage‑2 LLM scorer
top_n2 = stage2_rerank(query, top_n1, id_to_content)[:8] # keep 8 passages
top_passages = [id_to_content[cid] for cid, _ in top_n2]
# 5. Build prompt + generate
prompt = build_prompt(query, top_passages)
answer = generate_answer(prompt)
return answer
You can expose rag_answer via Cloud Functions (HTTP trigger) and attach authentication (IAM, Cloud Endpoints) for secure consumption.
7. Evaluation, Monitoring, and Cost Management
7.1 Quality Metrics
| Metric | Definition | Typical Target |
|---|---|---|
| Recall@k | Fraction of queries where a relevant document appears in the top‑k list | ≥ 0.90 @10 |
| Precision@k | Proportion of retrieved documents that are truly relevant | ≥ 0.75 @5 |
| BLEU / ROUGE | N‑gram overlap between generated answer and ground‑truth (if available) | Domain‑dependent |
| Hallucination Rate | % of answers containing unsupported statements (manual audit) | < 5 % |
Create a Vertex AI Experiments run that logs these metrics for each deployment.
7.2 Latency Budget
| Step | Typical Latency |
|---|---|
| Query embedding | 30 ms |
| Dense + lexical retrieval | 100 ms |
| Cross‑encoder rerank (Cloud Run) | 150 ms |
| LLM scorer (Vertex AI) | 300 ms |
| Generation (Gemini‑Pro) | 400 ms |
| Total | ≈ 1 s (well within interactive limits) |
If you need sub‑500 ms responses, consider:
- Reducing
top_kin initial retrieval. - Caching embeddings for frequent queries.
- Using a smaller LLM scorer (e.g.,
gemini-1.5-flash) for low‑latency paths.
7.3 Cost Considerations
| Service | Pricing (approx.) | Optimizations |
|---|---|---|
| Vertex AI Matching Engine | $0.30 per million vectors stored + $0.10 per million queries | Prune stale vectors, compress embeddings (e.g., 128‑dim) |
| Vertex AI Search | $0.20 per 1 k queries | Use query throttling, batch multiple queries |
| Cloud Run (cross‑encoder) | $0.000024 per vCPU‑second | Autoscale to 0 when idle, limit concurrency |
| Vertex AI Prediction (LLM scorer) | $0.001 per 1 k tokens (depends on model) | Use flash model for cheap scoring |
| Gemini‑Pro generation | $0.00075 per 1 k tokens | Limit max_output_tokens, set low temperature |
Monitoring budgets via Cloud Billing Export to BigQuery lets you set alerts when daily spend exceeds a threshold.
8. Real‑World Use Cases
8.1 Enterprise Knowledge Base
A multinational corporation consolidated internal wikis, PDFs, and ticket logs into a GCP bucket. By applying the multi‑stage pipeline, support agents saw a 42 % reduction in average handle time and a 27 % drop in escalation rate, because the system reliably surfaced the exact SOP paragraph needed.
8.2 Legal Document Review
Law firms often need to answer “Did clause X appear in any contract signed after 2020?” By combining BM25 (to filter by date) with dense semantic search (to locate clause variations) and a cross‑encoder fine‑tuned on legal language, the pipeline achieved Recall@5 = 0.96 while keeping latency under 800 ms.
8.3 Medical Literature Assistant
Researchers querying PubMed‑style abstracts benefited from a dual reranking approach: the cross‑encoder eliminated unrelated abstracts, while a domain‑specific LLM scorer (fine‑tuned on biomedical QA) prioritized high‑impact studies. The final Gemini‑Pro answer included citations directly mapped to PubMed IDs.
9. Scaling and Future Extensions
| Extension | Description | GCP Feature |
|---|---|---|
| Real‑time Updates | Stream new documents into the index as they arrive | Cloud Pub/Sub → Dataflow → Matching Engine incremental import |
| Multi‑Language Support | Store language tags, use multilingual embeddings (e.g., textembedding-multilingual@001) | Vertex AI Multi‑Modal |
| Feedback Loop | Capture user thumbs‑up/down, retrain cross‑encoder | Vertex AI Training (custom jobs) |
| Hybrid Retrieval | Combine vector search with retrieval‑augmented generation (RAG) at the LLM level (e.g., retrieval tool in Gemini) | Gemini “retrieval” tool (future) |
| Security & Access Controls | Row‑level security on chunk metadata, VPC‑only endpoints | Cloud IAM, VPC Service Controls |
Conclusion
Implementing multi‑stage reranking on Google Cloud Platform transforms a generic retrieval‑augmented generation pipeline into a high‑precision, production‑grade solution. By leveraging:
- Vertex AI Matching Engine for scalable dense similarity,
- Vertex AI Search for robust lexical matching,
- Cloud Run for fast cross‑encoder pruning,
- Vertex AI Prediction and Gemini for LLM‑based relevance scoring and generation,
you obtain a system that is fast, cost‑effective, and extensible across domains. The modular architecture encourages continuous improvement—whether by swapping in a newer cross‑encoder, enriching metadata, or integrating user feedback.
The code snippets and deployment steps presented here are a solid starting point. As you iterate, remember to:
- Monitor quality metrics (Recall, Precision, Hallucination) and latency.
- Tune the candidate pool size to balance speed and accuracy.
- Automate ingestion pipelines to keep the knowledge base fresh.
- Secure the endpoints using IAM and VPC controls.
With these practices, you’ll be well‑positioned to deliver trustworthy, context‑aware AI assistants that truly amplify human expertise.
Resources
- Vertex AI Matching Engine Documentation – https://cloud.google.com/vertex-ai/matching-engine
- Vertex AI Search (Enterprise Search) Guide – https://cloud.google.com/vertex-ai/docs/search/overview
- Gemini API Reference (Generative AI) – https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/gemini
- Sentence‑Transformers Library – https://www.sbert.net/
- RAG Survey Paper (2023) – https://arxiv.org/abs/2302.01279
- Google Cloud Blog: Scaling Retrieval‑Augmented Generation – https://cloud.google.com/blog/topics/ai-machine-learning/scaling-rag-on-gcp
- LangChain RAG Tutorial (GCP) – https://python.langchain.com/docs/integrations/google_vertex_ai
Feel free to explore these resources for deeper dives into each component, best‑practice patterns, and the latest updates from Google Cloud. Happy building!