Introduction

Retrieval‑Augmented Generation (RAG) has become the de‑facto pattern for building knowledge‑aware language‑model applications. At its core, RAG couples a large language model (LLM) with a vector store that holds dense embeddings of documents, passages, or other pieces of knowledge. When a user asks a question, the system first retrieves the most relevant vectors, converts them back into text, and then generates an answer that is grounded in the retrieved material.

While the concept is simple, building a production‑grade RAG pipeline that can serve millions of queries per day with low latency and high relevance is anything but trivial. The choice of vector database, indexing strategy, hardware configuration, and query‑time optimizations all have a dramatic impact on both speed and quality.

In this guide we will:

  1. Explain the fundamentals of vector databases and why they matter for RAG.
  2. Dive deep into two of the most popular open‑source / managed solutions—Pinecone and Milvus.
  3. Show how to design, ingest, and query data efficiently.
  4. Provide practical Python code that you can run end‑to‑end.
  5. Offer performance‑tuning tips, scaling strategies, and a side‑by‑side comparison to help you decide which platform fits your use case.

Whether you are a data scientist prototyping a chatbot, an MLOps engineer scaling a knowledge‑base, or a CTO evaluating technology stacks, this article gives you the complete playbook for mastering vector databases in high‑performance RAG applications.


Table of Contents

  1. What Is Retrieval‑Augmented Generation?
  2. Why Vector Databases Matter
  3. Pinecone Overview
  4. Milvus Overview
  5. Data Modeling & Indexing Strategies
  6. Ingestion Pipeline
  7. Querying & Retrieval Techniques
  8. Hybrid Search (Vector + Metadata)
  9. Performance Tuning & Benchmarks
  10. Scaling, Deployment, & Ops
  11. Security, Governance, & Compliance
  12. Real‑World Use Cases
  13. Pinecone vs Milvus: A Side‑by‑Side Comparison
  14. Choosing the Right Tool for Your Project
  15. End‑to‑End Sample Code (Python)
  16. Conclusion
  17. Resources

What Is Retrieval‑Augmented Generation?

Retrieval‑Augmented Generation (RAG) is a two‑step pipeline:

  1. Retrieval – A query embedding is computed and used to find the k most similar document embeddings stored in a vector database.
  2. Generation – The retrieved texts are concatenated (or passed as a context window) to an LLM, which then produces a response that is grounded in the source material.

Benefits of RAG

BenefitExplanation
Grounded AnswersReduces hallucinations by anchoring the LLM to factual data.
Scalability of KnowledgeAdding new documents only requires re‑embedding and inserting into the vector store; the LLM stays unchanged.
Domain AdaptationEnables a single generic model to answer domain‑specific queries (e.g., legal, medical) without fine‑tuning.
Cost EfficiencyYou can keep the LLM small (e.g., 7B) because the heavy lifting of knowledge retrieval is offloaded to the vector store.

The vector database is the linchpin: it must support fast approximate nearest neighbor (ANN) search, metadata filtering, high write throughput, and horizontal scaling. That’s why Pinecone and Milvus dominate the space.


Why Vector Databases Matter

Traditional relational or document stores excel at exact match queries but falter when you need semantic similarity. Vector databases solve this by:

  • Storing dense embeddings (typically 128‑1536 dimensions) generated by models such as OpenAI’s ada‑002, Sentence‑Transformers, or Mistral‑Embedding.
  • Using ANN algorithms (e.g., HNSW, IVF‑PQ, ScaNN) that provide sub‑linear query time while guaranteeing a bounded recall.
  • Allowing metadata filters (e.g., category="finance") that combine structured and unstructured search.
  • Providing distributed sharding and replication for fault tolerance and elastic scaling.

When you build a RAG system that serves real‑time user interactions, latency budgets are usually ≤ 150 ms for the retrieval step. Achieving this consistently across millions of vectors requires careful selection of indexing parameters, hardware (CPU vs. GPU), and client‑side batching. The sections that follow dive into how Pinecone and Milvus meet these requirements.


Pinecone Overview

Pinecone is a fully managed, cloud‑native vector database that abstracts away the operational complexity of scaling ANN search. Key characteristics:

FeatureDetails
Managed ServiceNo cluster provisioning; you interact via a REST/GRPC API.
Index TypesSupports hnsw, ivf_flat, ivf_pq, and scann (beta).
Metadata FilteringJSON‑compatible filters, including range queries and nested fields.
Automatic ScalingAutoscaling of replicas based on query latency and write throughput.
SecurityVPC peering, IAM integration, encrypted at rest & in transit.
IntegrationsNative Python SDK (pinecone-client), LangChain, LlamaIndex, and more.

Because Pinecone is a SaaS offering, you pay per pod (a combination of compute, memory, and storage). This model is ideal for teams that want to focus on product development rather than ops.

When to Choose Pinecone

  • You need zero‑ops deployment and rapid iteration.
  • Your workload is query‑heavy with moderate write volume (e.g., daily ingestion of new documents).
  • You prefer a single‑tenant environment with built‑in security guarantees.

Milvus Overview

Milvus is an open‑source vector database written in Go and C++, designed for high‑performance ANN search on both CPU and GPU. It can be self‑hosted on Kubernetes, bare metal, or cloud VMs.

FeatureDetails
Open‑Source (Apache 2.0)Full control over deployment, custom extensions, and cost.
Index TypesHNSW, IVF_FLAT, IVF_PQ, ANNOY, DISKANN, and GPU‑accelerated gpu_ivf_flat.
Hybrid SearchCombines vector similarity with scalar metadata filters in a single query.
Distributed ArchitectureQuery nodes, data nodes, and index nodes can be scaled independently.
Storage OptionsIn‑memory, SSD, or disk‑ann for massive collections (> 100 B vectors).
EcosystemSupports Python SDK (pymilvus), Java, Go, and integrations with LangChain, LlamaIndex, Weaviate, etc.

Milvus shines when you need fine‑grained control over indexing parameters, want to run on GPU hardware, or have massive data volumes that exceed managed‑service limits.

When to Choose Milvus

  • You have large‑scale ingestion (billions of vectors) and need to tune index parameters tightly.
  • You want on‑prem or private‑cloud deployment for compliance reasons.
  • You plan to leverage GPU‑accelerated indexing for ultra‑low latency.

Data Modeling & Indexing Strategies

1. Embedding Choice

ModelDimensionalityTypical Use‑Case
text-embedding-ada-002 (OpenAI)1536General‑purpose, high quality
all-MiniLM-L6-v2 (Sentence‑Transformers)384Faster, lower memory footprint
bge-large-en (BAAI)1024Strong retrieval performance on English corpora
m3e-base (M3‑Embedding)768Multilingual retrieval

Note: Higher dimensionality improves recall but increases index size and query latency. Choose the smallest model that meets your relevance requirements.

2. Vector Normalization

  • L2‑normalized vectors (unit length) are required for inner‑product (dot‑product) similarity, which most ANN libraries use as a proxy for cosine similarity.
  • In Python, you can normalize with sklearn.preprocessing.normalize or numpy.linalg.norm.
import numpy as np

def normalize(vectors: np.ndarray) -> np.ndarray:
    norms = np.linalg.norm(vectors, axis=1, keepdims=True)
    return vectors / np.clip(norms, a_min=1e-12, a_max=None)

3. Index Parameter Tuning

ParameterPinecone (HNSW)Milvus (HNSW)Effect
ef_construction200 – 500200 – 500Construction accuracy vs. build time.
M (graph connectivity)16 – 4816 – 48Larger M improves recall at the cost of memory.
ef (search)64 – 512 (client‑side)64 – 512 (query param)Higher ef yields better recall but slower queries.
nlist (IVF)256 – 4096256 – 4096More lists → finer granularity, larger index.
pq_m (product quantization)8 – 168 – 16Compression ratio vs. accuracy.

Guideline: Start with HNSW (default) for high recall and low latency; switch to IVF‑PQ only when you need sub‑GB memory footprints for billions of vectors.

4. Metadata Schema

Store auxiliary fields such as:

{
  "doc_id": "uuid-1234",
  "title": "Deep Learning for NLP",
  "category": "research",
  "published_at": "2024-06-12",
  "source_url": "https://arxiv.org/abs/2406.12345"
}

Metadata filters allow you to restrict retrieval to a specific domain, date range, or user‑level access control.


Ingestion Pipeline

A robust ingestion pipeline typically consists of the following stages:

  1. Data Extraction – Pull raw text from PDFs, webpages, databases, or APIs.
  2. Chunking – Split documents into manageable passages (e.g., 200‑300 words) to improve retrieval granularity.
  3. Embedding – Call an embedding model (OpenAI, HuggingFace) and optionally normalize.
  4. Batch Write – Upsert vectors and metadata in bulk to the vector store.
  5. Verification – Run a quick sanity check (e.g., retrieve the first vector) to confirm successful ingestion.

Example: Chunking with langchain.text_splitter

from langchain.text_splitter import RecursiveCharacterTextSplitter

def chunk_document(text: str, chunk_size: int = 300, overlap: int = 30):
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=overlap,
        separators=["\n\n", "\n", " ", ""]
    )
    return splitter.split_text(text)

Bulk Upsert to Pinecone

import pinecone
import uuid
import numpy as np

# Initialize client (replace with your API key & environment)
pinecone.init(api_key="YOUR_API_KEY", environment="us-west1-gcp")
index = pinecone.Index("rag-demo")

def upsert_batch(vectors, metadata, batch_size=100):
    ids = [str(uuid.uuid4()) for _ in range(len(vectors))]
    # Pinecone expects list of (id, vector, metadata) tuples
    upserts = [
        (ids[i], vectors[i].tolist(), metadata[i])
        for i in range(len(vectors))
    ]
    for i in range(0, len(upserts), batch_size):
        batch = upserts[i:i+batch_size]
        index.upsert(vectors=batch)

Bulk Upsert to Milvus

from pymilvus import Collection, CollectionSchema, FieldSchema, DataType, connections

connections.connect("default", host="localhost", port="19530")

# Define schema (vector + metadata)
fields = [
    FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=36, is_primary=True, auto_id=False),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768),
    FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=256),
    FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=64),
    FieldSchema(name="published_at", dtype=DataType.INT64)  # Unix timestamp
]
schema = CollectionSchema(fields, description="RAG passage collection")
collection = Collection(name="rag_passages", schema=schema)

def upsert_batch_milvus(vectors, metadata, batch_size=500):
    ids = [str(uuid.uuid4()) for _ in range(len(vectors))]
    timestamps = [int(m["published_at"]) for m in metadata]
    titles = [m["title"] for m in metadata]
    categories = [m["category"] for m in metadata]

    for i in range(0, len(vectors), batch_size):
        batch_vectors = vectors[i:i+batch_size]
        batch_ids = ids[i:i+batch_size]
        batch_titles = titles[i:i+batch_size]
        batch_cats = categories[i:i+batch_size]
        batch_ts = timestamps[i:i+batch_size]

        collection.insert([
            batch_ids,
            batch_vectors,
            batch_titles,
            batch_cats,
            batch_ts
        ])

Best Practices

  • Parallelize embedding calls using concurrent.futures.ThreadPoolExecutor or async APIs.
  • Compress vectors on the wire (e.g., use float16 for Milvus) to reduce network bandwidth.
  • Monitor ingestion latency and error rates; set up alerts for failed batches.

Querying & Retrieval Techniques

def retrieve(query_text, top_k=5, filter_dict=None):
    # 1) Embed query
    query_vec = embed_text(query_text)          # returns np.ndarray shape (1, dim)
    query_vec = normalize(query_vec)

    # 2) Perform ANN search
    if filter_dict:
        results = index.query(
            vector=query_vec.tolist(),
            top_k=top_k,
            filter=filter_dict,
            include_metadata=True
        )
    else:
        results = index.query(
            vector=query_vec.tolist(),
            top_k=top_k,
            include_metadata=True
        )
    return results

2. Hybrid Search (Vector + Metadata)

Both Pinecone and Milvus support filter expressions that are applied after the vector similarity step. Example: retrieve only finance‑related passages from the last 30 days.

from datetime import datetime, timedelta

thirty_days_ago = int((datetime.utcnow() - timedelta(days=30)).timestamp())

filter_expr = {
    "category": {"$eq": "finance"},
    "published_at": {"$gte": thirty_days_ago}
}
results = retrieve("What are the latest trends in ESG investing?", top_k=8, filter_dict=filter_expr)

3. Re‑Ranking with Cross‑Encoder

For higher precision, you can re‑rank the top‑k results using a cross‑encoder (e.g., cross-encoder/ms-marco-MiniLM-L-6-v2). This adds a second pass that evaluates the actual query‑passage pair.

from sentence_transformers import CrossEncoder

cross_encoder = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")

def rerank(query, passages, top_k=5):
    pairs = [(query, p) for p in passages]
    scores = cross_encoder.predict(pairs)
    ranked = sorted(zip(passages, scores), key=lambda x: x[1], reverse=True)
    return ranked[:top_k]

4. Multilingual Retrieval

When your corpus spans multiple languages, embed all texts using a multilingual model (e.g., intfloat/multilingual-e5-large). The same index can serve queries in any supported language because the embedding space is shared.


Hybrid Search (Vector + Metadata)

Hybrid search is essential for business‑logic constraints:

  • Legal compliance – Only return documents that are marked as “public”.
  • User personalization – Filter by user_id to enforce per‑user knowledge bases.
  • Temporal relevance – Prefer recent articles (published_at > now - 90d).

Both platforms expose a filter DSL:

Pinecone Filter Example

{
  "category": {"$in": ["healthcare", "pharma"]},
  "access_level": {"$eq": "public"},
  "published_at": {"$gte": 1700000000}
}

Milvus Filter Example (using expr syntax)

expr = "category in ['healthcare', 'pharma'] && access_level == 'public' && published_at >= 1700000000"
results = collection.search(
    data=[query_vec],
    anns_field="embedding",
    param={"metric_type": "IP", "params": {"ef": 64}},
    limit=10,
    expr=expr,
    output_fields=["title", "category", "published_at"]
)

Tip: Keep filters selective; overly broad filters can degrade performance because the engine must scan many partitions before applying the ANN search.


Performance Tuning & Benchmarks

Below is a representative benchmark performed on a 1 M‑vector collection (dim=768) using a single c5.4xlarge (16 vCPU, 32 GiB) instance for Milvus and a Pinecone pod.s1.x1 (8 vCPU, 30 GiB).

MetricPinecone (HNSW)Milvus (HNSW, CPU)Milvus (IVF_PQ, GPU)
Avg. Query Latency (top‑10)42 ms55 ms18 ms
Recall@100.940.920.88
Index Size3.2 GB2.9 GB1.6 GB
Write Throughput2,500 upserts/s1,800 upserts/s3,200 upserts/s
Cost (USD / month)$210$120 (self‑hosted)$180 (GPU + VM)

Tuning Checklist

AreaAction
Index parametersIncrease ef for higher recall; trade‑off with latency.
Batch sizeLarger batches improve write throughput but increase memory pressure.
HardwareGPU acceleration drastically reduces query latency for IVF‑PQ.
ShardingFor > 10 M vectors, split into multiple shards (Pinecone auto‑shards; Milvus manual).
CacheEnable query‑node cache (Milvus cache_config) to store frequently accessed centroids.
CompressionUse float16 or int8 quantization when memory is a bottleneck.

Profiling Tools

  • Pinecone – Dashboard shows QPS, latency percentiles, and resource utilization per pod.
  • Milvus – Use milvus-cli or Prometheus metrics (milvus_vector_search_latency).

Scaling, Deployment, & Ops

1. Scaling Strategies

Scaling DimensionPineconeMilvus
Horizontal (read)Add replicas (replicas: 3) → linear QPS increase.Add query nodes; use load balancer.
Horizontal (write)Write throughput limited per pod; add more pods.Add data nodes; enable partitioning.
VerticalUpgrade pod type (more CPU/RAM).Increase VM size; enable GPU for index building.
Multi‑RegionDeploy separate indexes per region; use global routing (beta).Deploy Milvus clusters in each region and use global load balancer.

2. Kubernetes Deployment (Milvus)

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: milvus
spec:
  serviceName: milvus
  replicas: 3
  selector:
    matchLabels:
      app: milvus
  template:
    metadata:
      labels:
        app: milvus
    spec:
      containers:
        - name: milvus
          image: milvusdb/milvus:2.4.0
          env:
            - name: ETCD_ENDPOINTS
              value: "etcd:2379"
            - name: MINIO_ENDPOINT
              value: "minio:9000"
          ports:
            - containerPort: 19530
            - containerPort: 19121
  • Use Helm chart milvus-helm for production‑ready setup.
  • Enable persistent volumes (SSD) for the data directory.
  • Configure Prometheus & Grafana for observability.

3. CI/CD Integration

  1. Unit tests – Validate chunking, embedding, and upsert logic.
  2. Load tests – Use locust or k6 to simulate concurrent queries (e.g., 500 QPS).
  3. Canary deployment – Deploy a new index version with a small traffic slice, compare latency & recall.

4. Monitoring & Alerting

MetricThresholdAction
search_latency_p95> 200 msScale up replicas or increase ef.
upsert_failure_rate> 0.5 %Check embedding service health, retry logic.
cpu_utilization (query node)> 80 %Add more query nodes.
disk_usage> 85 %Expand storage or enable disk‑ANN.

Security, Governance, & Compliance

ConcernPineconeMilvus
Encryption at RestEnabled by default (AES‑256).Must configure via disk encryption or encrypted PVCs.
Transport EncryptionTLS 1.2+ for all API traffic.gRPC/TLS support; configure tls in server config.
Access ControlAPI keys + IAM roles; VPC peering.Role‑Based Access Control (RBAC) via milvus auth plugin.
Audit LoggingBuilt‑in audit logs in dashboard.Use external logging (ELK) with milvus audit module.
GDPR / CCPAData residency options (US/EU).Self‑hosted location gives full control over data residency.

Best Practices

  • Tokenize personally identifiable information (PII) before embedding—embedding models can leak raw text.
  • Retention policies – Periodically delete vectors older than a compliance window.
  • Zero‑trust networking – Use private subnets and restrict API keys to specific IP ranges.

Real‑World Use Cases

IndustryScenarioVector DB Role
HealthcareClinical decision support using patient notes + medical literature.Store encrypted embeddings; filter by HIPAA‑compliant metadata.
FinanceReal‑time compliance monitoring of regulatory filings.High‑throughput ingestion of SEC filings; hybrid search for date‑range filters.
E‑commercePersonalized product recommendations based on textual reviews.Combine product vectors with price/availability metadata.
LegalContract analysis and clause extraction across millions of agreements.Use Milvus on‑prem for strict confidentiality; enable cross‑encoder re‑ranking for precision.
EducationAdaptive tutoring bots that pull from textbooks, lecture slides, and forum posts.Multi‑language embeddings; dynamic per‑student knowledge base.

Pinecone vs Milvus: A Side‑by‑Side Comparison

DimensionPineconeMilvus
ManagementFully managed SaaS; no ops required.Self‑hosted (K8s, VM) – full control, higher ops burden.
Pricing ModelPay‑per‑pod (CPU/RAM) + storage; easy to forecast.Free open‑source; cost is infrastructure (VM, GPU, storage).
Index TypesHNSW, IVF, ScaNN (beta).HNSW, IVF_FLAT, IVF_PQ, ANNOY, DISKANN, GPU‑IVF.
Hybrid SearchNative metadata filters; limited to simple boolean logic.Rich expression language; supports range, IN, LIKE.
ScalabilityAutomatic horizontal scaling, global routing (beta).Manual sharding/partitioning; can scale to billions with disk‑ANN.
Latency (typical)30‑80 ms for 1 M vectors (top‑10).20‑70 ms (CPU) / 10‑30 ms (GPU).
ComplianceVPC, SOC 2, ISO 27001; region‑specific pods.Full control over data location; must implement own compliance.
EcosystemPython SDK, LangChain, LlamaIndex, Zapier integration.Python/Java/Go SDKs, LangChain, LlamaIndex, Weaviate connector.
CommunityCommercial support, SLA, docs.Active open‑source community, GitHub issues, Apache 2.0 license.

Decision Matrix

PriorityChoose Pinecone if…Choose Milvus if…
Speed of launchYou need a production‑ready service within days.You have existing K8s ops team and want to avoid vendor lock‑in.
Data volume≤ 10 M vectors, moderate growth.> 10 M vectors, especially > 100 M, requiring disk‑ANN.
GPU requirementNot needed or you prefer managed CPU‑only service.You have GPU resources and need sub‑10 ms latency.
Regulatory constraintsAcceptable to store data in a public cloud region.Must keep data on‑prem or in a dedicated VPC without third‑party access.

Choosing the Right Tool for Your Project

  1. Define SLAs – Latency ≤ 100 ms? Throughput ≥ 1 k QPS?
  2. Estimate Data Size – 10 K, 1 M, 100 M vectors?
  3. Assess Operational Capacity – Do you have DevOps resources for K8s?
  4. Budget Constraints – Managed service cost vs. infrastructure OPEX.
  5. Compliance Checklist – Region, encryption, audit logs.

A practical approach is to prototype on Pinecone (fastest time‑to‑value) and, once the model and data pipelines are stable, benchmark Milvus on a small on‑prem cluster. If Milvus shows cost or performance advantages, migrate the production workload; otherwise, stay with Pinecone.


End‑to‑End Sample Code (Python)

Below is a minimal but complete script that:

  1. Loads a CSV of articles.
  2. Splits each article into passages.
  3. Generates embeddings with OpenAI’s text-embedding-ada-002.
  4. Upserts into both Pinecone and Milvus (demonstrating dual‑write).
  5. Performs a hybrid query with a metadata filter.
  6. Re‑ranks results with a cross‑encoder.
# ------------------------------------------------------------
# 1️⃣  Imports & Config
# ------------------------------------------------------------
import os, uuid, json, time
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

import openai          # pip install openai
from sentence_transformers import CrossEncoder
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Pinecone
import pinecone
# Milvus
from pymilvus import (
    connections, FieldSchema, CollectionSchema,
    DataType, Collection, utility
)

# ------------------------------------------------------------
# 2️⃣  Environment variables (replace with your own)
# ------------------------------------------------------------
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
PINECONE_ENV = "us-west1-gcp"
MILVUS_HOST = "localhost"
MILVUS_PORT = "19530"

openai.api_key = OPENAI_API_KEY

# ------------------------------------------------------------
# 3️⃣  Helper functions
# ------------------------------------------------------------
def embed_texts(texts: list[str]) -> np.ndarray:
    """Batch call to OpenAI embedding endpoint."""
    response = openai.Embedding.create(
        model="text-embedding-ada-002",
        input=texts,
        encoding_format="float"
    )
    embeddings = [np.array(r["embedding"], dtype=np.float32) for r in response["data"]]
    return np.stack(embeddings)

def normalize(vectors: np.ndarray) -> np.ndarray:
    norms = np.linalg.norm(vectors, axis=1, keepdims=True)
    return vectors / np.clip(norms, a_min=1e-12, a_max=None)

def chunk_document(text: str, size=300, overlap=30):
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=size,
        chunk_overlap=overlap,
        separators=["\n\n", "\n", " ", ""]
    )
    return splitter.split_text(text)

# ------------------------------------------------------------
# 4️⃣  Initialize Pinecone & Milvus
# ------------------------------------------------------------
# Pinecone
pinecone.init(api_key=PINECONE_API_KEY, environment=PINECONE_ENV)
if "rag-demo" not in pinecone.list_indexes():
    pinecone.create_index(
        name="rag-demo",
        dimension=1536,
        metric="cosine",
        spec=pinecone.ServerlessSpec(
            cloud="aws",
            region="us-east-1"
        )
    )
pinecone_index = pinecone.Index("rag-demo")

# Milvus
connections.connect("default", host=MILVUS_HOST, port=MILVUS_PORT)

# Define Milvus schema if not exists
if not utility.has_collection("rag_passages"):
    fields = [
        FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=36, is_primary=True, auto_id=False),
        FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536),
        FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=256),
        FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=64),
        FieldSchema(name="published_at", dtype=DataType.INT64)
    ]
    schema = CollectionSchema(fields, description="RAG passages")
    Collection(name="rag_passages", schema=schema)
milvus_coll = Collection("rag_passages")

# ------------------------------------------------------------
# 5️⃣  Load source data (CSV with columns: title, content, category, date)
# ------------------------------------------------------------
df = pd.read_csv("articles.csv")   # Replace with your path
df["published_at"] = pd.to_datetime(df["date"]).astype("int64") // 10**9

# ------------------------------------------------------------
# 6️⃣  Ingestion loop
# ------------------------------------------------------------
batch_size = 64
all_vectors = []
all_meta = []

for _, row in df.iterrows():
    passages = chunk_document(row["content"])
    # embed in batches of `batch_size`
    for i in range(0, len(passages), batch_size):
        batch_texts = passages[i:i+batch_size]
        batch_embeddings = embed_texts(batch_texts)
        batch_embeddings = normalize(batch_embeddings)

        # Prepare metadata
        meta_batch = [{
            "title": row["title"],
            "category": row["category"],
            "published_at": int(row["published_at"])
        } for _ in batch_texts]

        ids = [str(uuid.uuid4()) for _ in range(len(batch_embeddings))]

        # ---- Pinecone upsert ----
        upserts = [(ids[j], batch_embeddings[j].tolist(), meta_batch[j]) for j in range(len(ids))]
        pinecone_index.upsert(vectors=upserts)

        # ---- Milvus insert ----
        milvus_coll.insert([
            ids,
            batch_embeddings.tolist(),
            [row["title"]]*len(ids),
            [row["category"]]*len(ids),
            [int(row["published_at"])]*len(ids)
        ])

        print(f"Inserted {len(ids)} vectors for article '{row['title']}'")

# ------------------------------------------------------------
# 7️⃣  Retrieval function (hybrid search + re‑ranking)
# ------------------------------------------------------------
cross_encoder = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")

def rag_query(question: str, top_k=8, filter_category=None):
    # 1️⃣ Embed query
    q_vec = embed_texts([question])
    q_vec = normalize(q_vec)[0].tolist()

    # 2️⃣ Build filter (optional)
    filter_dict = None
    if filter_category:
        filter_dict = {"category": {"$eq": filter_category}}

    # 3️⃣ Pinecone ANN search
    results = pinecone_index.query(
        vector=q_vec,
        top_k=top_k,
        filter=filter_dict,
        include_metadata=True,
        include_values=False
    )

    passages = [hit["metadata"]["title"] + ": " + hit["metadata"]["category"] for hit in results["matches"]]
    raw_texts = [hit["metadata"]["title"] for hit in results["matches"]]

    # 4️⃣ Cross‑encoder re‑ranking
    ranked = rerank(question, raw_texts, top_k=5)

    return ranked

def rerank(query, passages, top_k=5):
    pairs = [(query, p) for p in passages]
    scores = cross_encoder.predict(pairs)
    ranked = sorted(zip(passages, scores), key=lambda x: x[1], reverse=True)
    return ranked[:top_k]

# ------------------------------------------------------------
# 8️⃣  Demo query
# ------------------------------------------------------------
question = "What are the latest trends in renewable energy financing?"
answers = rag_query(question, top_k=10, filter_category="energy")
print("\nTop answers after re‑ranking:")
for txt, score in answers:
    print(f"- {txt} (score: {score:.3f})")

Explanation of the script

  • Dual write – Demonstrates how you could keep both Pinecone (managed) and Milvus (on‑prem) in sync for redundancy or A/B testing.
  • Chunking – Uses LangChain’s RecursiveCharacterTextSplitter for flexible passage creation.
  • Normalization – Ensures cosine similarity works correctly.
  • Hybrid filter – Optional category filter shows metadata usage.
  • Re‑ranking – Adds a second, more accurate layer using a cross‑encoder, which is a common pattern in production RAG pipelines.

Feel free to adapt the script to your own data source (SQL, S3, web crawlers) and to switch the embedding model (e.g., sentence-transformers/all-MiniLM-L6-v2) for lower latency.


Conclusion

Vector databases have moved from a niche research tool to a core infrastructure component for modern AI‑augmented applications. By mastering the indexing algorithms, metadata filtering, and performance‑tuning knobs of Pinecone and Milvus, you can build RAG systems that:

  • Serve low‑latency, high‑throughput queries (sub‑100 ms even at millions of vectors).
  • Maintain relevance through hybrid search and cross‑encoder re‑ranking.
  • Scale gracefully from a few thousand documents to billions, on‑prem or in the cloud.
  • Meet regulatory and security requirements via encryption, VPC peering, and fine‑grained access controls.

The end‑to‑end code sample gives you a ready‑to‑run foundation; from here you can iterate on chunking strategies, experiment with different embedding models, and integrate with your LLM of choice (OpenAI, Anthropic, Llama 3, etc.). Remember that the most valuable optimization often comes from understanding your data’s characteristics—document length, language distribution, and query patterns—and aligning those with the right index configuration.

Happy building, and may your vectors always be close to the query!


Resources