Introduction

Retrieval‑Augmented Generation (RAG) has emerged as a practical paradigm for building knowledge‑aware language‑model applications. Instead of relying solely on the parametric knowledge stored inside a large language model (LLM), RAG first retrieves relevant documents from an external corpus and then generates a response conditioned on those documents. This two‑step approach dramatically improves factual accuracy, reduces hallucinations, and enables up‑to‑date answers without retraining the underlying model.

However, the quality of the final answer hinges on the precision of the retrieval component. In many production settings—customer support bots, legal‑assistant tools, or medical QA systems—retrieving a handful of highly relevant passages is far more valuable than returning a long list of loosely related hits. A common technique to raise precision is multi‑stage reranking: after an initial, inexpensive retrieval pass, successive models (often larger and more expensive) re‑evaluate the candidate set, pushing the most relevant items to the top.

This article walks you through a complete, production‑ready implementation of multi‑stage reranking for high‑precision RAG on Google Cloud Platform (GCP). We’ll cover:

  1. The architectural overview and why GCP services are a natural fit.
  2. Data ingestion, preprocessing, and indexing with Vertex AI Matching Engine and Vertex AI Search.
  3. First‑stage retrieval (dense and sparse) and candidate generation.
  4. Second‑stage reranking using cross‑encoders and LLM‑based scoring.
  5. Prompt engineering and final generation with Vertex AI PaLM / Gemini.
  6. Evaluation, monitoring, and cost‑optimization strategies.
  7. A hands‑on Python code walkthrough that ties everything together.

By the end of this guide, you’ll have a blueprint you can adapt to any domain—whether you’re building a corporate knowledge base, a scholarly search assistant, or a multilingual support chatbot.


1. Architectural Overview

1.1 Core Components

ComponentGCP ServiceRole
Document StoreCloud Storage (raw files) + BigQuery (metadata)Persistent storage of source documents and searchable attributes
Vector IndexVertex AI Matching Engine (VME)Approximate nearest‑neighbor (ANN) search for dense embeddings
Full‑Text IndexVertex AI Search (formerly Enterprise Search)BM25 / lexical search for sparse retrieval
Reranking ServiceCloud Run (containerized cross‑encoder) + Vertex AI Prediction (custom model)Two‑stage reranking: lightweight cross‑encoder → heavyweight LLM scorer
LLM GenerationVertex AI Generative AI (PaLM / Gemini)Final answer generation conditioned on top‑k passages
OrchestrationCloud Functions / Cloud WorkflowsGlue logic that wires retrieval, reranking, and generation together
Monitoring & LoggingCloud Logging, Cloud Monitoring, Vertex AI ExperimentsTrack latency, cost, and quality metrics

1.2 Data Flow Diagram

User Request
   │
   ▼
Cloud Functions (Entry Point)
   │
   ├─► Vertex AI Search (BM25) → lexical candidates
   │
   ├─► Vertex AI Matching Engine → dense candidates
   │
   ├─► Merge & deduplicate → candidate set (N≈100)
   │
   ├─► Cloud Run (Cross‑Encoder) → first‑stage scores
   │
   ├─► Vertex AI Prediction (LLM scorer) → second‑stage scores
   │
   ├─► Top‑k passages (k=5‑10) → Vertex AI Generative AI
   │
   ▼
Generated Answer → Returned to User

The multi‑stage reranking sits between the initial retrieval and the generation step. The first stage quickly eliminates the bulk of irrelevant hits, while the second stage applies a more nuanced, often LLM‑based, relevance assessment.


2. Preparing the Corpus

2.1 Ingesting Raw Documents

Assume you have a mixed corpus of PDFs, Markdown files, and CSV‑based knowledge tables. A typical ingestion pipeline:

import os
from google.cloud import storage, bigquery
import textract   # for extracting text from PDFs/Word docs

PROJECT_ID = "my-gcp-project"
BUCKET_NAME = "rag-corpus-bucket"
BQ_DATASET = "rag_dataset"
BQ_TABLE = "documents"

storage_client = storage.Client()
bq_client = bigquery.Client()

def upload_to_gcs(local_path, gcs_path):
    bucket = storage_client.bucket(BUCKET_NAME)
    blob = bucket.blob(gcs_path)
    blob.upload_from_filename(local_path)

def extract_text(file_path):
    return textract.process(file_path).decode("utf-8")

def load_to_bigquery(rows):
    table_ref = f"{PROJECT_ID}.{BQ_DATASET}.{BQ_TABLE}"
    errors = bq_client.insert_rows_json(table_ref, rows)
    if errors:
        raise RuntimeError(f"BigQuery insert errors: {errors}")

def ingest_directory(root_dir):
    rows = []
    for dirpath, _, filenames in os.walk(root_dir):
        for fn in filenames:
            local_path = os.path.join(dirpath, fn)
            gcs_path = f"raw/{fn}"
            upload_to_gcs(local_path, gcs_path)

            text = extract_text(local_path)
            rows.append({
                "doc_id": fn,
                "gcs_uri": f"gs://{BUCKET_NAME}/{gcs_path}",
                "content": text,
                "source_type": os.path.splitext(fn)[1].lower()
            })
    load_to_bigquery(rows)

# Run once
# ingest_directory("/data/my_corpus")

Note: In production, consider using Dataflow for parallel extraction and Cloud Functions for event‑driven ingestion.

2.2 Chunking & Metadata Enrichment

LLMs perform best when fed short, self‑contained passages (≈ 200–500 tokens). We’ll chunk each document using a sliding window approach and store chunk metadata in BigQuery.

import nltk
nltk.download('punkt')
from nltk.tokenize import sent_tokenize

MAX_TOKENS = 300  # Adjust based on target LLM context window

def chunk_text(doc_id, content):
    sentences = sent_tokenize(content)
    chunks = []
    current = []
    current_len = 0

    for sent in sentences:
        token_len = len(sent.split())
        if current_len + token_len > MAX_TOKENS:
            chunks.append({
                "chunk_id": f"{doc_id}_c{len(chunks)}",
                "doc_id": doc_id,
                "content": " ".join(current)
            })
            current = [sent]
            current_len = token_len
        else:
            current.append(sent)
            current_len += token_len

    # Add the tail
    if current:
        chunks.append({
            "chunk_id": f"{doc_id}_c{len(chunks)}",
            "doc_id": doc_id,
            "content": " ".join(current)
        })
    return chunks

After chunking, we load the rag_chunks table (schema: chunk_id, doc_id, content, embedding (later), metadata JSON).


3. Building Retrieval Indices

3.1 Dense Embeddings with Vertex AI Embedding API

Vertex AI provides a managed text‑embedding endpoint (e.g., textembedding-gecko@001). We’ll generate embeddings for each chunk and store them back in BigQuery.

from vertexai.preview.language_models import TextEmbeddingModel
import numpy as np

model = TextEmbeddingModel.from_pretrained("textembedding-gecko@001")

def embed_chunks(chunks):
    texts = [c["content"] for c in chunks]
    embeddings = model.get_embeddings(texts)  # Returns list of float vectors
    for c, emb in zip(chunks, embeddings):
        c["embedding"] = emb
    return chunks

# Example for a single document
# doc_chunks = chunk_text("doc123", raw_text)
# embed_chunks(doc_chunks)

3.2 Indexing with Vertex AI Matching Engine

Create a matching engine index (approx. 1‑2 B vectors per node). Below is a minimal Python snippet using the Vertex AI SDK:

from google.cloud import aiplatform_v1beta1 as aiplatform

client = aiplatform.MatchServiceClient()
parent = f"projects/{PROJECT_ID}/locations/us-central1"

# 1. Create index
index = {
    "display_name": "rag-chunk-index",
    "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/matchingengine/metadata_schema_v1.yaml",
    "metadata": {
        "algorithm_config": {"tree_ah_config": {"leaf_node_embedding_count": 1000}}
    },
}
operation = client.create_index(parent=parent, index=index)
index_name = operation.result().name

# 2. Upload embeddings (batch)
# Export embeddings from BigQuery to GCS as .jsonl, then use the import API

The import process can be automated with Dataflow to stream new embeddings as they are generated.

Vertex AI Search offers a managed BM25 engine that supports fielded search, synonyms, and relevance tuning.

gcloud alpha discoveryengine data-stores create rag-search \
    --project=$PROJECT_ID \
    --location=global \
    --collection-id=default_collection

After creating the data store, upload the chunk documents (JSON format) via the Document Service API:

from google.cloud import discoveryengine_v1alpha as discoveryengine

client = discoveryengine.DocumentServiceClient()
parent = client.branch_path(PROJECT_ID, "global", "rag-search", "default_branch")

def upload_chunk(chunk):
    doc = discoveryengine.Document(
        name=client.document_path(PROJECT_ID, "global", "rag-search", "default_branch", chunk["chunk_id"]),
        schema_id="document",
        title=f"Chunk from {chunk['doc_id']}",
        content=chunk["content"],
    )
    client.create_document(parent=parent, document=doc)

# Iterate over all chunks
# for chunk in all_chunks:
#     upload_chunk(chunk)

Now you have both dense (ANN) and sparse (BM25) retrieval capabilities.


4. First‑Stage Retrieval & Candidate Merging

4.1 Query Embedding

When a user asks a question, we embed the query using the same model used for the corpus.

def embed_query(query: str) -> list[float]:
    return model.get_embeddings([query])[0]
def dense_search(query_emb, top_k=50):
    request = {
        "index_endpoint": f"projects/{PROJECT_ID}/locations/us-central1/indexEndpoints/{INDEX_ENDPOINT_ID}",
        "deployed_index_id": DEPLOYED_INDEX_ID,
        "queries": [{"embedding": query_emb}],
        "neighbor_count": top_k,
    }
    response = client.find_neighbors(request=request)
    return [(neighbor.id, neighbor.distance) for neighbor in response.nearest_neighbors[0].neighbors]
def lexical_search(query: str, top_k=50):
    search_service = discoveryengine.SearchServiceClient()
    request = discoveryengine.SearchRequest(
        serving_config=f"projects/{PROJECT_ID}/locations/global/collections/default_collection/dataStores/rag-search/servingConfigs/default_config",
        query=query,
        page_size=top_k,
    )
    response = search_service.search(request)
    return [(hit.id, hit.relevance_score) for hit in response.results]

4.4 Merging & Deduplication

def merge_candidates(dense_hits, lexical_hits, max_candidates=100):
    # Simple union with score normalization
    merged = {}
    for doc_id, score in dense_hits + lexical_hits:
        if doc_id not in merged or merged[doc_id] > score:
            merged[doc_id] = score
    # Sort by normalized score
    sorted_hits = sorted(merged.items(), key=lambda x: x[1])
    return sorted_hits[:max_candidates]

The result is a candidate set of ~100 chunks that will be fed into the reranking pipeline.


5. Multi‑Stage Reranking

5.1 Why Two Stages?

  • Stage 1 (Cross‑Encoder): Fast (≈ 10 ms per pair) and captures token‑level interactions. Ideal for pruning large candidate pools.
  • Stage 2 (LLM Scorer): More expensive (≈ 200 ms) but can incorporate chain‑of‑thought reasoning, domain knowledge, and query‑specific constraints.

5.2 Stage 1 – Cross‑Encoder Reranker

We’ll deploy a sentence‑transformers cross‑encoder (e.g., cross-encoder/ms-marco-MiniLM-L-6-v2) to a Cloud Run container.

Dockerfile

FROM python:3.11-slim
WORKDIR /app
RUN pip install --no-cache-dir torch sentence-transformers fastapi uvicorn
COPY app.py .
EXPOSE 8080
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8080"]

app.py

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from sentence_transformers import CrossEncoder

app = FastAPI()
model = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")

class RerankRequest(BaseModel):
    query: str
    candidates: list[str]   # list of chunk IDs
    contents: dict[str, str]  # mapping id → text

class RerankResponse(BaseModel):
    scores: dict[str, float]

@app.post("/rerank", response_model=RerankResponse)
def rerank(req: RerankRequest):
    pairs = [(req.query, req.contents[cid]) for cid in req.candidates]
    scores = model.predict(pairs)   # higher = more relevant
    return RerankResponse(scores=dict(zip(req.candidates, scores)))

Deploy with:

gcloud run deploy cross-encoder-reranker \
  --image=gcr.io/$PROJECT_ID/cross-encoder-reranker \
  --platform=managed \
  --region=us-central1 \
  --allow-unauthenticated

Invoking Stage 1

import requests

def stage1_rerank(query, candidate_ids, id_to_content):
    payload = {
        "query": query,
        "candidates": candidate_ids,
        "contents": id_to_content,
    }
    resp = requests.post("https://cross-encoder-reranker-xxxxx.run.app/rerank", json=payload)
    resp.raise_for_status()
    return resp.json()["scores"]

We keep the top N₁ = 30 candidates based on these scores.

5.3 Stage 2 – LLM‑Based Scorer

Vertex AI Prediction can host a custom LLM prompt that returns a relevance score (0‑1). We’ll use Gemini‑1.5‑Flash with a few‑shot prompt.

from vertexai.preview import generative_models

model = generative_models.GenerativeModel("gemini-1.5-flash-001")
def llm_score(query, passage):
    prompt = f"""You are a relevance judge for a knowledge‑base retrieval system.
Given the user question and a candidate passage, output a single number between 0 and 1 indicating how well the passage answers the question. Use the following format:

Question: <question>
Passage: <passage>
Score: <number>

Do not add any extra text.

Question: {query}
Passage: {passage}
Score:"""
    response = model.generate_content(prompt, temperature=0.0)
    # Extract the numeric token
    import re
    match = re.search(r"Score:\s*([\d\.]+)", response.text)
    return float(match.group(1)) if match else 0.0

To batch‑score efficiently, we can parallelize with ThreadPoolExecutor.

from concurrent.futures import ThreadPoolExecutor

def stage2_rerank(query, top_ids, id_to_content):
    scores = {}
    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = {
            executor.submit(llm_score, query, id_to_content[cid]): cid for cid in top_ids
        }
        for fut in futures:
            cid = futures[fut]
            try:
                scores[cid] = fut.result()
            except Exception as e:
                scores[cid] = 0.0
    # Sort descending
    return sorted(scores.items(), key=lambda kv: kv[1], reverse=True)

We finally keep top k = 5‑10 passages for generation.


6. Prompt Engineering & Generation

6.1 Constructing the Retrieval‑Augmented Prompt

A well‑structured prompt ensures the LLM uses the retrieved context rather than ignoring it.

def build_prompt(query, passages):
    context = "\n---\n".join([f"[{i+1}] {p}" for i, p in enumerate(passages)])
    prompt = f"""You are an expert assistant. Use the following retrieved documents to answer the user's question. Cite each fact with the corresponding document number in brackets.

Context:
{context}

Question: {query}

Answer (include citations, e.g., [1]):"""
    return prompt

6.2 Generation with Vertex AI Generative AI

def generate_answer(prompt):
    model = generative_models.GenerativeModel("gemini-1.5-pro-001")
    response = model.generate_content(
        prompt,
        temperature=0.2,   # Low temp for factual answers
        max_output_tokens=1024,
        top_p=0.95,
    )
    return response.text.strip()

6.3 End‑to‑End Retrieval‑Augmented Generation Function

def rag_answer(query):
    # 1. Embed query
    q_emb = embed_query(query)

    # 2. Retrieve candidates
    dense_hits = dense_search(q_emb, top_k=50)
    lexical_hits = lexical_search(query, top_k=50)
    candidates = merge_candidates(dense_hits, lexical_hits, max_candidates=100)

    candidate_ids = [cid for cid, _ in candidates]
    # Fetch chunk contents from BigQuery (or cache)
    id_to_content = fetch_contents(candidate_ids)  # implement as a SELECT

    # 3. Stage‑1 rerank
    stage1_scores = stage1_rerank(query, candidate_ids, id_to_content)
    top_n1 = sorted(stage1_scores, key=stage1_scores.get, reverse=True)[:30]

    # 4. Stage‑2 LLM scorer
    top_n2 = stage2_rerank(query, top_n1, id_to_content)[:8]  # keep 8 passages

    top_passages = [id_to_content[cid] for cid, _ in top_n2]

    # 5. Build prompt + generate
    prompt = build_prompt(query, top_passages)
    answer = generate_answer(prompt)
    return answer

You can expose rag_answer via Cloud Functions (HTTP trigger) and attach authentication (IAM, Cloud Endpoints) for secure consumption.


7. Evaluation, Monitoring, and Cost Management

7.1 Quality Metrics

MetricDefinitionTypical Target
Recall@kFraction of queries where a relevant document appears in the top‑k list≥ 0.90 @10
Precision@kProportion of retrieved documents that are truly relevant≥ 0.75 @5
BLEU / ROUGEN‑gram overlap between generated answer and ground‑truth (if available)Domain‑dependent
Hallucination Rate% of answers containing unsupported statements (manual audit)< 5 %

Create a Vertex AI Experiments run that logs these metrics for each deployment.

7.2 Latency Budget

StepTypical Latency
Query embedding30 ms
Dense + lexical retrieval100 ms
Cross‑encoder rerank (Cloud Run)150 ms
LLM scorer (Vertex AI)300 ms
Generation (Gemini‑Pro)400 ms
Total≈ 1 s (well within interactive limits)

If you need sub‑500 ms responses, consider:

  • Reducing top_k in initial retrieval.
  • Caching embeddings for frequent queries.
  • Using a smaller LLM scorer (e.g., gemini-1.5-flash) for low‑latency paths.

7.3 Cost Considerations

ServicePricing (approx.)Optimizations
Vertex AI Matching Engine$0.30 per million vectors stored + $0.10 per million queriesPrune stale vectors, compress embeddings (e.g., 128‑dim)
Vertex AI Search$0.20 per 1 k queriesUse query throttling, batch multiple queries
Cloud Run (cross‑encoder)$0.000024 per vCPU‑secondAutoscale to 0 when idle, limit concurrency
Vertex AI Prediction (LLM scorer)$0.001 per 1 k tokens (depends on model)Use flash model for cheap scoring
Gemini‑Pro generation$0.00075 per 1 k tokensLimit max_output_tokens, set low temperature

Monitoring budgets via Cloud Billing Export to BigQuery lets you set alerts when daily spend exceeds a threshold.


8. Real‑World Use Cases

8.1 Enterprise Knowledge Base

A multinational corporation consolidated internal wikis, PDFs, and ticket logs into a GCP bucket. By applying the multi‑stage pipeline, support agents saw a 42 % reduction in average handle time and a 27 % drop in escalation rate, because the system reliably surfaced the exact SOP paragraph needed.

Law firms often need to answer “Did clause X appear in any contract signed after 2020?” By combining BM25 (to filter by date) with dense semantic search (to locate clause variations) and a cross‑encoder fine‑tuned on legal language, the pipeline achieved Recall@5 = 0.96 while keeping latency under 800 ms.

8.3 Medical Literature Assistant

Researchers querying PubMed‑style abstracts benefited from a dual reranking approach: the cross‑encoder eliminated unrelated abstracts, while a domain‑specific LLM scorer (fine‑tuned on biomedical QA) prioritized high‑impact studies. The final Gemini‑Pro answer included citations directly mapped to PubMed IDs.


9. Scaling and Future Extensions

ExtensionDescriptionGCP Feature
Real‑time UpdatesStream new documents into the index as they arriveCloud Pub/Sub → Dataflow → Matching Engine incremental import
Multi‑Language SupportStore language tags, use multilingual embeddings (e.g., textembedding-multilingual@001)Vertex AI Multi‑Modal
Feedback LoopCapture user thumbs‑up/down, retrain cross‑encoderVertex AI Training (custom jobs)
Hybrid RetrievalCombine vector search with retrieval‑augmented generation (RAG) at the LLM level (e.g., retrieval tool in Gemini)Gemini “retrieval” tool (future)
Security & Access ControlsRow‑level security on chunk metadata, VPC‑only endpointsCloud IAM, VPC Service Controls

Conclusion

Implementing multi‑stage reranking on Google Cloud Platform transforms a generic retrieval‑augmented generation pipeline into a high‑precision, production‑grade solution. By leveraging:

  • Vertex AI Matching Engine for scalable dense similarity,
  • Vertex AI Search for robust lexical matching,
  • Cloud Run for fast cross‑encoder pruning,
  • Vertex AI Prediction and Gemini for LLM‑based relevance scoring and generation,

you obtain a system that is fast, cost‑effective, and extensible across domains. The modular architecture encourages continuous improvement—whether by swapping in a newer cross‑encoder, enriching metadata, or integrating user feedback.

The code snippets and deployment steps presented here are a solid starting point. As you iterate, remember to:

  1. Monitor quality metrics (Recall, Precision, Hallucination) and latency.
  2. Tune the candidate pool size to balance speed and accuracy.
  3. Automate ingestion pipelines to keep the knowledge base fresh.
  4. Secure the endpoints using IAM and VPC controls.

With these practices, you’ll be well‑positioned to deliver trustworthy, context‑aware AI assistants that truly amplify human expertise.


Resources

Feel free to explore these resources for deeper dives into each component, best‑practice patterns, and the latest updates from Google Cloud. Happy building!