Table of Contents
- Introduction
- Why Rust for Retrieval‑Augmented Generation (RAG)?
- Core Components of a High‑Throughput RAG System
- Designing Rust Microservices for RAG
- Choosing a Distributed Vector Database
- Integration Patterns Between Rust Services and the Vector Store
- Building a High‑Throughput Ingestion Pipeline
- Constructing a Low‑Latency Query Pipeline
- 8.1 [Hybrid Search (BM25 + ANN)]
- 8.2 [Reranking with Small LLMs]
- 8.3 [Prompt Construction & LLM Invocation]
- Performance Engineering in Rust
- 9.1 [Zero‑Copy Deserialization (Serde + Bytes)]
- 9.2 CPU Pinning & SIMD for Distance Computation
- 9.3 Back‑pressure and Circuit Breakers
- Observability, Logging, and Tracing
- Security & Multi‑Tenant Isolation
12 [Deployment on Kubernetes]
13 [Real‑World Example: End‑to‑End Rust RAG Service]
14 Conclusion
15 Resources
Introduction
Retrieval‑Augmented Generation (RAG) has become the de‑facto pattern for building knowledge‑aware language‑model applications. By grounding a generative model in a dynamic external knowledge base, RAG enables:
- Up‑to‑date answers that reflect recent documents, regulations, or product catalogs.
- Reduced hallucinations, because the model can cite concrete sources.
- Cost efficiency, as the LLM only needs to generate based on a few retrieved passages instead of memorizing everything.
When RAG moves from prototype to production, the throughput and latency requirements explode. Enterprises often need to serve hundreds or thousands of queries per second while ingesting gigabytes of new text every hour. Achieving that scale demands a carefully engineered stack:
- Rust microservices for deterministic performance, low memory overhead, and strong type safety.
- Distributed vector databases (Milvus, Qdrant, Vespa, etc.) that can store billions of embeddings and perform approximate nearest‑neighbor (ANN) search at sub‑millisecond latency.
- Observability and orchestration that keep the system reliable under load.
This article walks through the architectural decisions, concrete Rust code, and operational best practices needed to build a high‑throughput RAG pipeline. It is aimed at engineers who already understand the basics of RAG and want to scale it to production‑grade workloads.
Why Rust for Retrieval‑Augmented Generation (RAG)?
| Criterion | Rust | Python (typical) | Go |
|---|---|---|---|
| Raw performance | Near‑C speed; zero‑cost abstractions | Interpreter overhead; GIL limits concurrency | Good, but lacks SIMD‑friendly math libraries for dense vectors |
| Memory safety | Compile‑time guarantees, no segfaults | Runtime errors, GC pauses | Garbage‑collected, but no ownership model |
| Concurrency model | Tokio async runtime, lightweight tasks | Thread‑based, GIL bottleneck | Goroutine scheduler, but limited SIMD |
| Ecosystem for embeddings | rust-bert, tokenizers, ort (ONNX Runtime) | Hugging Face Transformers (dominant) | Fewer ML bindings |
| Binary deployment | Single static binary, easy Docker layer | Requires Python runtime, many wheels | Simple static binary as well |
Rust’s ownership model eliminates data races, which is crucial when multiple ingestion workers and query servers share memory pools for embeddings. Moreover, the Tokio asynchronous runtime enables millions of concurrent I/O operations with a tiny memory footprint—perfect for a microservice that must handle both high‑volume ingestion and low‑latency query traffic.
Core Components of a High‑Throughput RAG System
A production RAG pipeline can be broken into three logical layers:
3.1 Document Ingestion & Embedding
- Source connectors (Kafka, S3, web crawlers).
- Chunker – splits raw text into overlapping windows (e.g., 512 tokens).
- Embedder – calls a sentence‑transformer or a small on‑prem LLM to obtain dense vectors.
- Metadata enrichers – add timestamps, source IDs, and custom tags.
3.2 Distributed Vector Store
- Stores (vector, metadata) pairs.
- Provides ANN search, filtering, and vector‑level security.
- Must support horizontal scaling, replication, and consistent upserts for incremental ingestion.
3.3 Query Service & LLM Orchestration
- Retriever – queries the vector store, optionally combines with BM25.
- Reranker – lightweight cross‑encoder that re‑orders top‑k results.
- Prompt Builder – formats retrieved passages into a prompt for the generative model.
- LLM Caller – could be an external API (OpenAI, Anthropic) or an on‑prem inference server (vLLM, Text Generation Inference).
- Response Formatter – adds citations, post‑processes token streams.
All three layers can be implemented as independent Rust microservices, communicating via gRPC or HTTP/2. This separation enables independent scaling, versioning, and failure isolation.
Designing Rust Microservices for RAG
4.1 Async Foundations with Tokio
Tokio provides a multi‑threaded, work‑stealing scheduler that drives futures to completion. A typical service entry point looks like:
use axum::{Router, routing::post, Json};
use tokio::sync::mpsc;
use serde::{Deserialize, Serialize};
#[derive(Deserialize)]
struct IngestRequest {
source_id: String,
payload: String,
}
#[tokio::main]
async fn main() {
// Channel for background workers
let (tx, rx) = mpsc::channel::<IngestRequest>(10_000);
// Spawn a pool of embedding workers
for _ in 0..num_cpus::get() {
let mut worker_rx = rx.clone();
tokio::spawn(async move {
while let Some(req) = worker_rx.recv().await {
// 1️⃣ Chunk → 2️⃣ Embed → 3️⃣ Upsert
process_ingest(req).await;
}
});
}
// HTTP API
let app = Router::new()
.route("/ingest", post(move |Json(payload): Json<IngestRequest>| {
let tx = tx.clone();
async move {
tx.send(payload).await.map_err(|e| (axum::http::StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Ok::<_, (axum::http::StatusCode, String)>(Json(serde_json::json!({"status":"queued"})))
}
}));
axum::Server::bind(&"0.0.0.0:8080".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
}
The service is fully non‑blocking: the HTTP handler simply enqueues the request, while a pool of Tokio workers performs CPU‑heavy embedding and upserts.
4.2 HTTP APIs with Axum/Actix‑Web
Both Axum and Actix‑Web are production‑ready, but Axum integrates tightly with tower middleware, making tracing, rate‑limiting, and authentication composable.
use tower_http::trace::TraceLayer;
use axum::middleware::from_extractor;
// Example: add request ID middleware
let app = Router::new()
.route("/search", post(search_handler))
.layer(TraceLayer::new_for_http())
.layer(from_extractor::<RequestIdExtractor>());
4.3 Serialization & Schema Evolution
RAG systems evolve quickly—new fields (e.g., source_confidence) appear. Using Serde with #[serde(default)] and #[serde(skip_serializing_if = "Option::is_none")] helps keep backward compatibility.
#[derive(Serialize, Deserialize, Debug, Clone)]
struct DocumentMetadata {
#[serde(default)]
source_id: String,
#[serde(default)]
created_at: Option<chrono::DateTime<chrono::Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
tags: Option<Vec<String>>,
}
For binary transport, consider bincode or prost (Protobuf). Protobuf is especially handy when you need gRPC between services.
Choosing a Distributed Vector Database
5.1 Milvus vs. Qdrant vs. Vespa
| Feature | Milvus | Qdrant | Vespa |
|---|---|---|---|
| Open‑source license | Apache 2.0 | Apache 2.0 | Apache 2.0 |
| Supported ANN indexes | IVF_FLAT, HNSW, ANNOY, etc. | HNSW (primary), IVF (experimental) | HNSW, ANN (via Lucene) |
| Hybrid search (BM25 + ANN) | Native via Milvus‑Hybrid plugin | Not built‑in; external pipeline needed | First‑class hybrid search |
| Scalability | Horizontal sharding via Milvus‑Standalone + Milvus‑Cluster | Distributed mode with Qdrant‑Cloud; community version single‑node | Designed for massive clusters (Petabyte‑scale) |
| Rust client | milvus-sdk-rust (community) | qdrant-client (official) | No native Rust client; use HTTP/JSON |
| Metadata filtering | ✅ (structured) | ✅ (payload filters) | ✅ (document schema) |
| Observability | Prometheus + Grafana | Prometheus + Jaeger | Built‑in metrics, OpenTelemetry |
Recommendation: For most mid‑size enterprises (≤10 B vectors, <100 k QPS) Qdrant offers a simple deployment model and a first‑class Rust client. If you anticipate multi‑tenant, hybrid search or need massive scale, Vespa is the better choice, albeit with a steeper learning curve.
5.2 Replication, Sharding, and Consistency Models
| DB | Replication | Sharding | Consistency |
|---|---|---|---|
| Milvus | Raft‑based leader/follower | Hash‑based collection sharding | Strong (linearizable) |
| Qdrant | Leader‑follower (async) | Collection‑level partitioning | Eventual (configurable) |
| Vespa | Synchronous replicas | Document‑id based sharding | Strong (per‑document) |
When ingesting continuously, choose a DB with asynchronous replication to avoid back‑pressure on the embedder. For real‑time query latency, strong consistency may be unnecessary; eventual consistency is acceptable as long as newly upserted vectors become searchable within a few seconds.
Integration Patterns Between Rust Services and the Vector Store
6.1 gRPC vs. REST vs. Native SDKs
| Integration | Pros | Cons |
|---|---|---|
| gRPC (Protobuf) | Binary, low latency, streaming support, strong typing | Requires code generation, more complex devops |
| REST/JSON | Universally understood, easy debugging | Higher overhead, no streaming |
Native Rust SDK (e.g., qdrant-client) | Direct async API, zero‑copy, ergonomic | Tied to a specific DB version, may lag behind features |
Pattern: Use the native SDK for the ingestion path (high QPS, low latency) and expose a gRPC façade for external clients (e.g., UI front‑ends) that need streaming search results.
6.2 Batching & Streaming Embedding Requests
Embedding models often achieve higher throughput when processing batches. The ingestion pipeline can accumulate up to BATCH_SIZE chunks before invoking the model:
const BATCH_SIZE: usize = 64;
async fn embedding_worker(mut rx: mpsc::Receiver<Chunk>) {
let mut buffer = Vec::with_capacity(BATCH_SIZE);
while let Some(chunk) = rx.recv().await {
buffer.push(chunk);
if buffer.len() == BATCH_SIZE {
let embeddings = embed_batch(&buffer).await;
// Upsert embeddings to vector DB
upsert_embeddings(embeddings).await;
buffer.clear();
}
}
}
For query time, you can stream the top‑k results back to the caller as they become available, reducing perceived latency:
use tonic::{Request, Response, Status, Streaming};
#[tonic::async_trait]
impl Retriever for MyRetriever {
type SearchStream = Pin<Box<dyn Stream<Item = Result<SearchResult, Status>> + Send>>;
async fn search(
&self,
request: Request<SearchRequest>,
) -> Result<Response<Self::SearchStream>, Status> {
let query = request.into_inner();
let stream = async_stream::try_stream! {
// 1. ANN search
let mut results = self.vector_db.search(query.vector.clone(), query.top_k).await?;
// 2. Optional rerank in a loop, yielding each result
for (i, doc) in results.drain(..).enumerate() {
let reranked = self.reranker.rerank(&doc).await?;
yield SearchResult {
rank: i as u32,
doc_id: doc.id,
score: reranked.score,
snippet: doc.metadata.snippet,
};
}
};
Ok(Response::new(Box::pin(stream) as Self::SearchStream))
}
}
Building a High‑Throughput Ingestion Pipeline
7.1 Chunking Strategies
Choosing the right chunk size balances retrieval relevance and embedding cost.
| Strategy | Typical Size | Overlap | When to Use |
|---|---|---|---|
| Fixed token window | 256‑512 tokens | 50‑100 tokens | Uniform documents (e.g., policy PDFs) |
| Semantic split (sentence‑boundary) | Variable | 0 | Narrative text where sentence semantics matter |
| Hierarchical (section → paragraph) | Multi‑level | 0‑200 | Long technical manuals where hierarchy aids retrieval |
Implementation tip: Use tokenizers crate (Rust binding to HuggingFace tokenizers) to split on token count accurately.
use tokenizers::Tokenizer;
use tokenizers::models::bpe::BPE;
use tokenizers::normalizers::unicode::NFC;
use tokenizers::pre_tokenizers::whitespace::Whitespace;
fn chunk_text(text: &str, max_tokens: usize, overlap: usize) -> Vec<String> {
let tokenizer = Tokenizer::new(BPE::default());
let mut chunks = Vec::new();
let mut start = 0;
let ids = tokenizer.encode(text, false).unwrap().get_ids().to_vec();
while start < ids.len() {
let end = (start + max_tokens).min(ids.len());
let slice = &ids[start..end];
let decoded = tokenizer.decode(slice.to_vec(), false).unwrap();
chunks.push(decoded);
if end == ids.len() { break; }
start = end - overlap; // overlap for context continuity
}
chunks
}
7.2 Embedding Workers
Use ONNX Runtime (ort) for model inference because it provides a Rust binding and runs on CPU/GPU with minimal overhead.
use ort::{Environment, SessionBuilder, Value};
async fn embed_batch(chunks: &[Chunk]) -> Vec<Embedding> {
// Load ONNX session once (static global)
static SESSION: OnceCell<ort::Session> = OnceCell::new();
let session = SESSION.get_or_init(|| {
let env = Environment::builder()
.with_name("rag-embedder")
.build()
.unwrap();
SessionBuilder::new(&env)
.unwrap()
.with_model_from_file("sentence-transformer.onnx")
.unwrap()
});
// Prepare input tensor (batch, seq_len)
let input_tensor = // ... convert chunks to token ids, pad, etc.
let outputs = session.run(vec![input_tensor]).unwrap();
// Assume first output is [batch, dim] float tensor
let embeddings = outputs[0].try_extract::<f32>().unwrap();
embeddings
.chunks(EMBED_DIM)
.map(|slice| Embedding(slice.to_vec()))
.collect()
}
7.3 Bulk Upserts to the Vector Store
Both Qdrant and Milvus support batch upserts that dramatically reduce network overhead.
use qdrant_client::qdrant::PointsBatch;
async fn upsert_embeddings(embeds: Vec<EmbeddingWithMeta>) {
let points: Vec<_> = embeds.iter().map(|e| {
qdrant_client::qdrant::PointStruct {
id: Some(qdrant_client::qdrant::point_id::PointId::Uuid(e.id)),
vectors: Some(qdrant_client::qdrant::vectors::Vectors::Vector(
qdrant_client::qdrant::vectors::Vector::from(e.vector.clone()))),
payload: e.metadata.clone(),
}
}).collect();
let batch = PointsBatch {
collection_name: "rag_docs".to_string(),
points,
..Default::default()
};
// The client automatically uses gRPC streaming under the hood
qdrant_client::QdrantClient::from_url("http://qdrant:6334")
.await
.unwrap()
.upsert_batch(batch)
.await
.unwrap();
}
Tip: Tune the batch size (e.g., 2 k points) based on network MTU and server capacity. Larger batches improve throughput but increase latency for the individual document; a dual‑queue (fast path for urgent updates, bulk path for bulk import) can mitigate this.
Constructing a Low‑Latency Query Pipeline
8.1 Hybrid Search (BM25 + ANN)
Hybrid search improves recall for short queries where term matching matters.
async fn hybrid_search(query: &str, vector: Vec<f32>, top_k: usize) -> Vec<SearchResult> {
// 1️⃣ BM25 via Elasticsearch (or Qdrant payload full‑text)
let bm25_hits = elastic_client.search(query).await?;
// 2️⃣ ANN via Qdrant
let ann_hits = qdrant_client.search(vector.clone(), top_k).await?;
// 3️⃣ Merge & re‑rank
let mut merged = merge_hits(bm25_hits, ann_hits);
merged.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap());
merged.truncate(top_k);
merged
}
The merge step can be a simple linear combination of scores (α * bm25 + β * ann) or a learned model.
8.2 Reranking with Small LLMs
A cross‑encoder (e.g., cross-encoder/ms-marco-MiniLM-L-12-v2) can be run on CPU using ort to produce a more accurate relevance score for the top‑k candidates.
async fn rerank(candidates: &[Document]) -> Vec<Reranked> {
// Batch the candidate pairs (query, passage)
let inputs = candidates.iter().map(|doc| {
format!("query: {} passage: {}", query_text, doc.snippet)
}).collect::<Vec<_>>();
let scores = cross_encoder.embed_batch(&inputs).await; // returns similarity logits
candidates.iter().zip(scores).map(|(doc, score)| {
Reranked { doc: doc.clone(), score }
}).collect()
}
Because the reranker runs after the ANN stage, the total number of forward passes stays low (often < 32 per request).
8.3 Prompt Construction & LLM Invocation
fn build_prompt(context: &[String], user_query: &str) -> String {
let mut prompt = String::new();
prompt.push_str("You are a knowledgeable assistant. Answer the question using only the provided context. Cite sources.\n\n");
for (i, snippet) in context.iter().enumerate() {
prompt.push_str(&format!("[{}] {}\n", i + 1, snippet));
}
prompt.push_str("\nQuestion: ");
prompt.push_str(user_query);
prompt
}
When invoking an external API (OpenAI, Anthropic), stream the response back to the client to keep latency low:
let response = client
.post("https://api.openai.com/v1/chat/completions")
.bearer_auth(api_key)
.json(&json!({
"model": "gpt-4o-mini",
"messages": [{ "role": "user", "content": prompt }],
"stream": true
}))
.send()
.await?
.bytes_stream(); // yields chunks as they arrive
The Rust microservice can forward this stream directly to the HTTP client, preserving back‑pressure semantics.
Performance Engineering in Rust
9.1 Zero‑Copy Deserialization (Serde + Bytes)
When receiving large JSON payloads (e.g., bulk upserts), avoid unnecessary allocations:
use serde::Deserialize;
use bytes::Bytes;
#[derive(Deserialize)]
struct BulkPayload<'a> {
#[serde(borrow)]
documents: &'a [Document],
}
Using serde_json::from_slice::<BulkPayload>(bytes) reads directly from the incoming Bytes buffer without copying.
9.2 CPU Pinning & SIMD for Distance Computation
If you decide to perform ANN search in‑process (e.g., for a small subset), use faiss-rs or annoy-rs, both of which leverage SIMD.
use faiss::index_factory;
use faiss::Index;
let mut index = index_factory(128, "IVF10,PQ4", faiss::MetricType::InnerProduct).unwrap();
index.train(&vectors).unwrap();
index.add(&vectors).unwrap();
// Pin to a dedicated core for deterministic latency
let _guard = affinity::set_thread_affinity([2]).unwrap();
let distances = index.search(&query_vec, k).unwrap();
Tip: Reserve a few CPU cores exclusively for search to avoid interference from embedding workers.
9.3 Back‑pressure and Circuit Breakers
Use tower middleware to implement rate limiting and circuit breaking:
use tower::{ServiceBuilder, limit::ConcurrencyLimitLayer, timeout::TimeoutLayer};
use std::time::Duration;
let app = Router::new()
.route("/search", post(search_handler))
.layer(
ServiceBuilder::new()
.layer(ConcurrencyLimitLayer::new(2000)) // max concurrent searches
.layer(TimeoutLayer::new(Duration::from_secs(2))) // fail fast
);
When the vector store becomes saturated, the circuit breaker can return a 503 with a Retry-After header, prompting the client to back off.
Observability, Logging, and Tracing
A production RAG service must be observable at every stage:
| Aspect | Tool | Integration |
|---|---|---|
| Metrics | Prometheus + metrics crate | Export counters for ingest_requests, search_latency_ms, embedding_qps |
| Traces | OpenTelemetry + tracing crate | Span across ingestion → embedding → upsert → search → LLM call |
| Logs | tracing-subscriber with JSON output | Structured logs (request_id, service, level) |
| Distributed Tracing UI | Jaeger / Grafana Tempo | Export OTLP over gRPC |
Example of a trace that spans microservices:
#[tracing::instrument(name = "search", skip_all, fields(request_id = %req_id))]
async fn search_handler(Json(payload): Json<SearchRequest>) -> Json<SearchResponse> {
// Start a child span for vector DB query
let vec_span = tracing::info_span!("vector_search", collection = %payload.collection);
let results = vec_span.in_scope(|| async {
vector_client.search(payload.vector.clone(), payload.top_k).await
}).await?;
// Rerank span
let rerank_span = tracing::info_span!("rerank");
let reranked = rerank_span.in_scope(|| async {
reranker.rerank(results).await
}).await?;
Json(SearchResponse { results: reranked })
}
All spans automatically propagate the trace context via HTTP headers (traceparent, tracestate), enabling end‑to‑end latency breakdown.
Security & Multi‑Tenant Isolation
- Authentication – Use mutual TLS for inter‑service gRPC and OAuth2 for external APIs.
- Authorization – Encode tenant IDs in the vector collection name (
tenant_{id}_docs) and enforce it in middleware. - Data Encryption – Enable TLS for Qdrant/Milvus and at‑rest encryption (e.g., AWS EBS‑encrypted volumes).
- Rate Limiting per Tenant – Combine
tower::limit::RateLimitLayerwith a per‑tenant token bucket.
fn tenant_rate_limiter(tenant_id: &str) -> impl Service<Request, Response = Response, Error = Infallible> {
let limit = match tenant_id {
"premium" => 5000,
_ => 500,
};
ServiceBuilder::new()
.layer(ConcurrencyLimitLayer::new(limit))
.into_inner()
}
Deployment on Kubernetes
A typical deployment consists of:
ingest-service– Rust binary, 2‑CPU, 4 GiB, autoscaled via HorizontalPodAutoscaler (HPA) based on CPU and custom metricingest_queue_length.search-service– Rust binary, 4‑CPU, 8 GiB, also HPA‑scaled onsearch_qps.vector-db– StatefulSet (Qdrant) with persistent volume claims (PVCs), PodDisruptionBudget for high availability.embedding-worker– Optional GPU‑enabled node pool running ONNX Runtime with CUDA; uses KEDA (Kubernetes Event‑Driven Autoscaling) to spin up workers when the ingestion queue grows.
Helm chart snippet for the search service:
apiVersion: apps/v1
kind: Deployment
metadata:
name: search-service
spec:
replicas: 2
selector:
matchLabels:
app: search-service
template:
metadata:
labels:
app: search-service
spec:
containers:
- name: search
image: ghcr.io/yourorg/search-service:latest
ports:
- containerPort: 8080
resources:
limits:
cpu: "4"
memory: "8Gi"
env:
- name: QDRANT_ENDPOINT
value: "http://qdrant:6334"
readinessProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
Service Mesh (e.g., Istio) can add mTLS, traffic splitting, and fault injection for chaos testing.
Real‑World Example: End‑to‑End Rust RAG Service
Below is a minimal but functional example that ties together ingestion, embedding, and search. It uses Axum, Tokio, Qdrant, and ONNX Runtime.
// Cargo.toml (excerpt)
[dependencies]
tokio = { version = "1.35", features = ["full"] }
axum = "0.7"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
qdrant-client = "0.5"
ort = "0.13"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["fmt", "json"] }
once_cell = "1.19"
bytes = "1.5"
tokenizers = "0.15"
Main Entrypoint (src/main.rs)
mod ingest;
mod search;
mod embed;
mod vector;
use axum::{Router, routing::{post, get}, Json};
use tracing_subscriber::{fmt, EnvFilter};
use std::net::SocketAddr;
#[tokio::main]
async fn main() {
// Initialize tracing
tracing_subscriber::registry()
.with(EnvFilter::from_default_env())
.with(fmt::layer().json())
.init();
// Shared clients
let qdrant = vector::client().await;
let embedder = embed::Embedder::new("sentence-transformer.onnx");
// Build router
let app = Router::new()
.route("/ingest", post(ingest::handler))
.route("/search", post(search::handler))
.layer(axum::AddExtensionLayer::new((qdrant, embedder)));
// Run server
let addr = SocketAddr::from(([0, 0, 0, 0], 8080));
tracing::info!("Listening on {}", addr);
axum::Server::bind(&addr).serve(app.into_make_service()).await.unwrap();
}
Ingestion Handler (src/ingest.rs)
use axum::{extract::Extension, Json};
use serde::Deserialize;
use crate::embed::Embedder;
use crate::vector::VectorClient;
use std::sync::Arc;
#[derive(Deserialize)]
pub struct IngestPayload {
pub source_id: String,
pub text: String,
}
pub async fn handler(
Extension((vector, embedder)): Extension<(Arc<VectorClient>, Arc<Embedder>)>,
Json(payload): Json<IngestPayload>,
) -> Json<serde_json::Value> {
// 1️⃣ Chunk
let chunks = crate::embed::chunk_text(&payload.text, 256, 32);
// 2️⃣ Embed batch
let embeddings = embedder.embed_batch(&chunks).await;
// 3️⃣ Upsert
vector.bulk_upsert(payload.source_id, embeddings, chunks).await.unwrap();
Json(serde_json::json!({"status":"ok"}))
}
Search Handler (src/search.rs)
use axum::{extract::Extension, Json};
use serde::{Deserialize, Serialize};
use crate::vector::VectorClient;
#[derive(Deserialize)]
pub struct SearchRequest {
pub query: String,
pub top_k: usize,
}
#[derive(Serialize)]
pub struct SearchResult {
pub doc_id: String,
pub snippet: String,
pub score: f32,
}
pub async fn handler(
Extension(vector): Extension<Arc<VectorClient>>,
Json(req): Json<SearchRequest>,
) -> Json<Vec<SearchResult>> {
// Convert query to vector using the same embedder (omitted for brevity)
let query_vec = vector.embed_query(&req.query).await.unwrap();
// ANN search
let hits = vector.search(query_vec, req.top_k).await.unwrap();
// Map to API model
let results = hits.into_iter().map(|h| SearchResult {
doc_id: h.id,
snippet: h.payload.get("snippet").and_then(|v| v.as_str()).unwrap_or("").to_string(),
score: h.score,
}).collect();
Json(results)
}
Embedding Module (src/embed.rs)
use ort::{Environment, SessionBuilder, Value};
use tokenizers::Tokenizer;
use once_cell::sync::Lazy;
use std::sync::Arc;
static TOKENIZER: Lazy<Tokenizer> = Lazy::new(|| {
Tokenizer::from_pretrained("bert-base-uncased", None).unwrap()
});
pub struct Embedder {
session: Arc<ort::Session>,
}
impl Embedder {
pub fn new(model_path: &str) -> Arc<Self> {
let env = Environment::builder()
.with_name("embedder")
.build()
.unwrap();
let session = SessionBuilder::new(&env)
.unwrap()
.with_model_from_file(model_path)
.unwrap();
Arc::new(Self { session: Arc::new(session) })
}
pub async fn embed_batch(&self, texts: &[String]) -> Vec<Vec<f32>> {
// Tokenize & pad
let ids: Vec<Vec<u32>> = texts.iter()
.map(|t| TOKENIZER.encode(t, false).unwrap().get_ids().to_vec())
.collect();
// Convert to ort::tensor (batch, seq_len)
let input = Value::from_array(
ndarray::Array2::from_shape_vec((ids.len(), MAX_SEQ_LEN), flatten_ids(&ids)).unwrap()
);
let outputs = self.session.run(vec![input]).unwrap();
let embeddings = outputs[0].try_extract::<f32>().unwrap();
// Assume dim = 768
embeddings.chunks(768).map(|c| c.to_vec()).collect()
}
}
// Helper to flatten and pad ID vectors
fn flatten_ids(batched: &[Vec<u32>]) -> Vec<u32> {
const PAD_ID: u32 = 0;
let mut flat = Vec::new();
for ids in batched {
flat.extend_from_slice(ids);
if ids.len() < MAX_SEQ_LEN {
flat.extend(std::iter::repeat(PAD_ID).take(MAX_SEQ_LEN - ids.len()));
}
}
flat
}
Vector Client (src/vector.rs)
use qdrant_client::qdrant::{self, points_batch::PointsBatch, PointsSelector, Vector};
use qdrant_client::QdrantClient;
use std::sync::Arc;
pub struct VectorClient {
client: Arc<QdrantClient>,
collection: String,
}
impl VectorClient {
pub async fn new(collection: &str) -> Arc<Self> {
let client = QdrantClient::from_url("http://qdrant:6334")
.await
.unwrap();
Arc::new(Self { client: Arc::new(client), collection: collection.to_string() })
}
pub async fn bulk_upsert(
&self,
source_id: String,
embeddings: Vec<Vec<f32>>,
snippets: Vec<String>,
) -> Result<(), qdrant::Status> {
let points = embeddings.into_iter().enumerate().map(|(i, vec)| {
qdrant::PointStruct {
id: Some(qdrant::point_id::PointId::Uuid(uuid::Uuid::new_v4())),
vectors: Some(qdrant::vectors::Vectors::Vector(
qdrant::vectors::Vector::from(vec))),
payload: Some(qdrant::Payload::from_json(serde_json::json!({
"source_id": source_id,
"snippet": snippets[i]
}))),
}
}).collect();
let batch = PointsBatch {
collection_name: self.collection.clone(),
points,
..Default::default()
};
self.client.upsert_batch(batch).await?;
Ok(())
}
pub async fn search(&self, query_vec: Vec<f32>, top_k: usize) -> Result<Vec<qdrant::ScoredPoint>, qdrant::Status> {
let request = qdrant::SearchPoints {
collection_name: self.collection.clone(),
vector: Some(Vector::from(query_vec)),
limit: top_k as u64,
with_payload: Some(qdrant::WithPayloadSelector::Enable(true)),
..Default::default()
};
let resp = self.client.search_points(request).await?;
Ok(resp.result)
}
// Helper for embedding the query using the same model (omitted for brevity)
pub async fn embed_query(&self, query: &str) -> Result<Vec<f32>, anyhow::Error> {
// Reuse Embedder singleton or call external service
unimplemented!()
}
}
Running the example
docker compose up -d qdrant
cargo run --release
You can now POST /ingest with a JSON body containing source_id and text, then POST /search with a query. The service will store vectors in Qdrant, retrieve them, and return snippets with scores – a complete RAG loop built entirely in Rust.
Conclusion
Architecting a high‑throughput Retrieval‑Augmented Generation pipeline demands careful choices across language, data store, networking, and deployment. Rust gives you deterministic performance, zero‑cost abstractions, and a robust async ecosystem that can handle both massive ingestion and sub‑second query latency. Pairing Rust microservices with a distributed vector database such as Qdrant, Milvus, or Vespa provides the scalability needed for billions of embeddings and millions of queries per day.
Key takeaways:
- Chunk, embed, and bulk upsert – batching at every stage maximizes throughput.
- Hybrid search (BM25 + ANN) plus a lightweight reranker yields higher recall without sacrificing latency.
- Observability (metrics, tracing, logs) and circuit breakers are essential for reliability under load.
- Security (mTLS, tenant isolation) should be baked in from day one.
- Kubernetes + Service Mesh gives you the horizontal scaling and traffic management required for production workloads.
By following the patterns and code snippets in this article, you can move from a proof‑of‑concept RAG prototype to a battle‑tested, production‑grade system that serves real users with speed, accuracy, and resilience.
Resources
- Qdrant Documentation – Comprehensive guide to vector search, payload filtering, and scaling: Qdrant Docs
- Rust‑Bert & ONNX Runtime – How to run transformer models efficiently in Rust: Rust‑Bert GitHub
- Milvus Blog on Hybrid Search – Deep dive into combining BM25 with ANN: Milvus Hybrid Search
- Vespa AI – Production‑grade search engine with built‑in vector capabilities: Vespa.ai
- OpenTelemetry for Rust – Instrumentation guide for tracing microservices: OpenTelemetry Rust