Search engineering sits at one of the most demanding intersections in software: information retrieval theory, distributed systems, natural language processing, and user behavior analysis all converge in the code you write every day. The gap between a search system that “returns results” and one that “returns the RIGHT results ranked correctly for this specific user in this specific context” is massive — and it is a gap that AI coding tools struggle with because relevance is not a binary property you can validate with a unit test. Your system must understand that “Python” means the programming language for a developer searching a documentation site but means the snake for a user on a pet store. It must return results in under 200 milliseconds across billions of documents. It must combine keyword matching, vector similarity, freshness signals, popularity metrics, and business rules into a single ranked list that feels effortless to the user who typed three words into a box.
This guide evaluates every major AI coding tool through the lens of what search engineers actually build: not toy inverted indexes, not Elasticsearch-getting-started tutorials, but production search systems that serve millions of queries per day with strict latency budgets, handle every conceivable query intent, combine sparse and dense retrieval, and maintain relevance metrics that directly drive business revenue. We tested each tool on real search tasks: designing Elasticsearch mappings with custom analyzers, building complex query DSL with function_score and nested queries, implementing learning-to-rank pipelines, building hybrid search with reciprocal rank fusion, creating query understanding systems with spell correction and intent classification, computing NDCG from judgment data, and managing distributed cluster infrastructure.
If your primary focus is backend API design, see the Backend Engineers guide. If you work mainly on data pipelines feeding your search index, see the Data Engineers guide. If your focus is training embedding models or learning-to-rank models, see the ML Engineers guide. For latency optimization and profiling, see the Performance Engineers guide. For storage engine internals, see the Database Internals Engineers guide. This guide is specifically for engineers building the search systems themselves — the code that takes a user query and returns a ranked list of results that feel like magic.
Best free ($0): GitHub Copilot Free — decent Elasticsearch query DSL completions, knows common mapping patterns, 2,000 completions/mo covers personal search projects. Best overall ($20/mo): Cursor Pro — multi-file context handles mapping definitions + query builders + ranking logic together, strong Python code generation for search pipelines, project-wide awareness of your index schemas and query patterns. Best for reasoning ($20/mo): Claude Code — strongest at designing end-to-end search architectures, reasoning about relevance trade-offs between BM25 and vector search, and understanding the mathematical foundations behind ranking algorithms. Best combo ($30/mo): Claude Code + Copilot Pro — Claude for search architecture, ranking strategy, and debugging relevance issues; Copilot for fast inline completions on Elasticsearch DSL, analyzer configurations, and boilerplate API endpoints.
Why Search Engineering Is Different
Search engineering operates under constraints that most software engineers never encounter. Your inputs are ambiguous natural language. Your outputs must be ranked by a subjective quality metric that changes with context. And the gap between “works on the evaluation set” and “works in production with real user queries” is wider than in almost any other engineering discipline:
- Relevance is subjective and context-dependent: There is no objectively “correct” ranking for most queries. The right result for “Python” depends on whether the user is on Stack Overflow or a wildlife site. BM25 gives you lexical matching, but misses semantic similarity entirely. Semantic search finds conceptually related documents but hallucinates relevance for anything with similar embedding geometry. Personalization adds user history but creates filter bubbles. Query intent classification — is this navigational, informational, or transactional? — changes which ranking strategy you should even apply. AI coding tools generate search code that treats relevance as a solved problem (just call
match), when in reality it is an ongoing, context-dependent engineering challenge that requires constant measurement and tuning. - Indexing at scale is a distributed systems problem: Building an inverted index for a billion documents is not just “insert into Elasticsearch.” You need sharding strategies that balance query fan-out against index size. You need real-time indexing for fresh content while batch-rebuilding for consistency. Segment merges consume I/O and can spike query latency if not managed. Delete handling requires tombstones that accumulate until segments merge. Schema changes on live indexes require reindexing with zero-downtime alias swaps. AI tools generate single-node indexing code that breaks catastrophically at 100 million documents when segment merge storms spike p99 latency to 10 seconds.
- Query latency budgets are brutal: Users expect search results in under 200 milliseconds. Your system must parse the query, expand synonyms, classify intent, fan out to dozens of shards across multiple nodes, scatter-gather results, merge and re-rank, apply business rules, and return — all within that budget. Every millisecond matters. Query planning (which indexes to hit, whether to use a cached result, when to early-terminate) is as complex as database query optimization. Caching strategies must balance freshness against latency. AI tools generate queries that are functionally correct but contain expensive aggregations, deep pagination with
from/size, or wildcard queries on non-optimized fields that blow through latency budgets on any non-trivial index. - Ranking is a multi-signal scoring problem: Production ranking combines TF-IDF or BM25 text relevance with dozens of other signals: vector similarity from dense embeddings, document freshness (a news article from today beats one from last year), popularity (click-through rate, conversion rate), authority (PageRank-style link analysis), business rules (promoted results, inventory status), and position bias correction (users click the first result regardless of quality). Learning-to-rank models combine these features into a single score, but training data is biased by your current ranking. AI tools generate
function_scorequeries with hardcoded weights that have no empirical basis and no mechanism for evaluation or improvement. - Autocomplete and query understanding require NLP at extreme speed: Query parsing, spell correction, synonym expansion, entity extraction, and intent classification all must complete in under 50 milliseconds — before the actual search even begins. Spell correction must handle domain-specific terms (is “kubernetes” a misspelling?). Synonym expansion must be asymmetric (“NYC” expands to “New York City” but not vice versa). Entity extraction must identify brand names, categories, and attributes from free-text queries. These NLP components run on every single query at the hottest point in your system. AI tools generate offline-quality NLP code that is accurate but 100x too slow for query-time processing.
- Vector search has changed everything: Dense embeddings from transformer models enable semantic search that finds conceptually similar documents even without keyword overlap. But integrating vector search into production systems is far more complex than adding a vector field. HNSW index parameters (ef_construction, M) trade recall for speed and memory. Embedding model selection determines what “similarity” even means. Hybrid search requires score normalization between BM25 (unbounded) and cosine similarity ([0,1]). Re-ranking pipelines apply expensive cross-encoder models to the top-K results from cheap retrieval. AI tools generate vector search code that ignores normalization, uses default HNSW parameters regardless of dataset size, and skips the hybrid retrieval that production systems require.
- Testing search quality is fundamentally different: You cannot unit test relevance. A change that improves results for one query class may degrade another. Precision@K, NDCG, and MRR measure different aspects of ranking quality and often conflict. Offline evaluation with judgment sets approximates but does not replace online A/B testing with real users. Judge panels disagree on relevance (inter-annotator agreement is typically 70–80%). Position bias means click data overestimates the quality of top-ranked results. AI tools generate search tests that check “results are not empty” rather than measuring ranking quality with proper evaluation metrics.
Task Support Matrix
We tested each AI coding tool on the core tasks that define search engineering work. Ratings reflect production-quality output, not tutorial-grade code:
| Task | Cursor | Copilot | Claude Code | Windsurf | Tabnine | Amazon Q |
|---|---|---|---|---|---|---|
| Search Index Design & Mapping | A | A− | A | B+ | B | A− |
| Query DSL & Search API Development | A− | B+ | A | B+ | B− | B+ |
| Ranking & Relevance Tuning | B+ | B | A | B | C+ | B− |
| Vector Search & Embeddings | A− | B+ | A | B+ | B− | B |
| Query Understanding & NLP | B+ | B | A− | B | C+ | B− |
| Search Analytics & A/B Testing | B+ | B | A− | B− | C+ | B |
| Distributed Search Infrastructure | B+ | B | A− | B | C | A− |
How to read this table: Ratings reflect production-quality output for each domain. An “A” means the tool generates code that an experienced search engineer would accept with minor edits. A “C” means the output requires substantial rewriting or demonstrates fundamental misunderstandings of search-specific requirements. We tested with explicit, domain-specific prompts — vague prompts produce worse results across all tools.
1. Search Index Design & Mapping
Index mappings are the foundation of every search system. Get the analyzers wrong, and no amount of query tuning will produce good results. Get the field types wrong, and you cannot run the aggregations your analytics dashboard needs. Get the shard count wrong, and your cluster falls over at scale. Production mappings are not the five-field examples from the Elasticsearch tutorial — they are carefully designed schemas with custom analyzers, multi-field mappings for different search strategies, and deliberate decisions about what to index, what to store, and what to leave as doc_values only.
Custom Analyzers and Multi-Field Mappings
The single most impactful decision in search index design is analyzer configuration. Default analyzers tokenize on whitespace and lowercase — which is wildly insufficient for production search. You need edge n-gram analyzers for autocomplete, synonym filters for query expansion, language-specific stemmers, and custom tokenizers for domain-specific content like product SKUs or code identifiers.
from elasticsearch import Elasticsearch
from typing import Dict, Any, Optional, List
import logging
import time
logger = logging.getLogger(__name__)
class IndexManager:
"""Production index manager with custom analyzers, zero-downtime
reindexing, and multi-field mappings for different search strategies."""
def __init__(self, es_client: Elasticsearch, index_prefix: str = "search"):
self.es = es_client
self.index_prefix = index_prefix
def configure_analyzers(self) -> Dict[str, Any]:
"""Build analyzer settings for production search.
Returns a settings dict with:
- autocomplete_analyzer: edge n-gram for prefix matching
- search_analyzer: standard + synonyms for query-time expansion
- exact_analyzer: keyword with lowercase for exact matching
- path_analyzer: path hierarchy tokenizer for category facets
"""
return {
"analysis": {
"filter": {
"autocomplete_filter": {
"type": "edge_ngram",
"min_gram": 2,
"max_gram": 20
},
"synonym_filter": {
"type": "synonym_graph",
"synonyms_path": "synonyms.txt",
"updateable": True
},
"english_stemmer": {
"type": "stemmer",
"language": "english"
},
"english_stop": {
"type": "stop",
"stopwords": "_english_"
},
"shingle_filter": {
"type": "shingle",
"min_shingle_size": 2,
"max_shingle_size": 3,
"output_unigrams": True
}
},
"tokenizer": {
"path_tokenizer": {
"type": "path_hierarchy",
"delimiter": "/"
}
},
"analyzer": {
"autocomplete_index": {
"type": "custom",
"tokenizer": "standard",
"filter": [
"lowercase",
"autocomplete_filter"
]
},
"autocomplete_search": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase"]
},
"full_text": {
"type": "custom",
"tokenizer": "standard",
"filter": [
"lowercase",
"english_stop",
"english_stemmer"
]
},
"full_text_with_synonyms": {
"type": "custom",
"tokenizer": "standard",
"filter": [
"lowercase",
"synonym_filter",
"english_stop",
"english_stemmer"
]
},
"exact_lowercase": {
"type": "custom",
"tokenizer": "keyword",
"filter": ["lowercase"]
},
"path_analyzer": {
"type": "custom",
"tokenizer": "path_tokenizer"
},
"shingle_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": [
"lowercase",
"shingle_filter"
]
}
}
}
}
def create_index(
self,
index_name: str,
num_shards: int = 3,
num_replicas: int = 1
) -> None:
"""Create a production search index with multi-field mappings.
Each text field is indexed multiple ways:
- .raw: keyword for exact matching, sorting, aggregations
- .autocomplete: edge n-gram for prefix search
- .shingle: shingled for phrase proximity
- default: stemmed full-text for general search
"""
settings = self.configure_analyzers()
settings["number_of_shards"] = num_shards
settings["number_of_replicas"] = num_replicas
settings["refresh_interval"] = "1s"
settings["max_result_window"] = 10000
mappings = {
"dynamic": "strict",
"properties": {
"title": {
"type": "text",
"analyzer": "full_text",
"search_analyzer": "full_text_with_synonyms",
"fields": {
"raw": {"type": "keyword"},
"autocomplete": {
"type": "text",
"analyzer": "autocomplete_index",
"search_analyzer": "autocomplete_search"
},
"shingle": {
"type": "text",
"analyzer": "shingle_analyzer"
}
}
},
"body": {
"type": "text",
"analyzer": "full_text",
"search_analyzer": "full_text_with_synonyms",
"fields": {
"shingle": {
"type": "text",
"analyzer": "shingle_analyzer"
}
}
},
"category_path": {
"type": "text",
"analyzer": "path_analyzer",
"fields": {
"raw": {"type": "keyword"}
}
},
"tags": {"type": "keyword"},
"author": {
"type": "text",
"analyzer": "full_text",
"fields": {
"raw": {"type": "keyword"}
}
},
"published_at": {"type": "date"},
"updated_at": {"type": "date"},
"popularity_score": {"type": "float"},
"click_count": {"type": "integer"},
"embedding": {
"type": "dense_vector",
"dims": 768,
"index": True,
"similarity": "cosine",
"index_options": {
"type": "hnsw",
"m": 16,
"ef_construction": 200
}
},
"url": {"type": "keyword", "index": False, "doc_values": False},
"source_id": {"type": "keyword"}
}
}
body = {"settings": settings, "mappings": mappings}
self.es.indices.create(index=index_name, body=body)
logger.info(f"Created index {index_name} with {num_shards} shards")
def reindex_zero_downtime(
self,
alias_name: str,
new_index_name: str,
num_shards: int = 3
) -> None:
"""Zero-downtime reindex using alias swap.
1. Create new index with updated mappings/settings
2. Reindex documents from old index to new index
3. Atomically swap the alias from old to new
4. Delete old index after verification
"""
old_indices = []
if self.es.indices.exists_alias(name=alias_name):
alias_info = self.es.indices.get_alias(name=alias_name)
old_indices = list(alias_info.keys())
self.create_index(new_index_name, num_shards=num_shards)
if old_indices:
logger.info(f"Reindexing from {old_indices} to {new_index_name}")
self.es.reindex(
body={
"source": {"index": old_indices[0]},
"dest": {"index": new_index_name}
},
wait_for_completion=True,
request_timeout=3600
)
actions = [{"add": {"index": new_index_name, "alias": alias_name}}]
for old_idx in old_indices:
actions.append({"remove": {"index": old_idx, "alias": alias_name}})
self.es.indices.update_aliases(body={"actions": actions})
logger.info(f"Alias {alias_name} now points to {new_index_name}")
old_count = sum(
self.es.count(index=idx)["count"] for idx in old_indices
) if old_indices else 0
new_count = self.es.count(index=new_index_name)["count"]
if old_indices and new_count < old_count * 0.95:
logger.error(
f"Document count mismatch: old={old_count}, new={new_count}. "
"Keeping old indices for manual review."
)
return
for old_idx in old_indices:
self.es.indices.delete(index=old_idx)
logger.info(f"Deleted old index {old_idx}")
Notice what AI tools typically miss in index design: the separation of index-time and search-time analyzers (you index with edge n-grams but search with standard tokenization), the synonym_graph filter with updateable: True so you can reload synonyms without reindexing, the dynamic: strict setting that prevents accidental field creation, and the zero-downtime reindex pattern with document count verification before deleting the old index.
2. Query DSL & Search API Development
Elasticsearch query DSL is where search engineers spend most of their debugging time. A production search query is not a simple match — it is a carefully constructed tree of bool queries, function_score wrappers, nested queries for complex document structures, and pagination logic that does not kill your cluster. The difference between a junior and senior search engineer is visible in how they construct queries.
Production Query Builder with Function Score
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any, Tuple
from datetime import datetime, timedelta
import math
@dataclass
class SearchRequest:
"""Structured search request from the API layer."""
query: str
filters: Dict[str, Any] = field(default_factory=dict)
page_size: int = 20
search_after: Optional[List[Any]] = None # NOT from/size for deep pages
sort_by: str = "relevance"
include_aggregations: bool = True
min_score: Optional[float] = None
user_id: Optional[str] = None
class SearchQueryBuilder:
"""Builds production Elasticsearch queries with function_score,
proper pagination, and maintainable query templates."""
BOOST_TITLE = 3.0
BOOST_TITLE_SHINGLE = 4.0
BOOST_BODY = 1.0
FRESHNESS_SCALE = "30d"
FRESHNESS_DECAY = 0.5
POPULARITY_MODIFIER = "log1p"
POPULARITY_FACTOR = 2
def build_query(self, request: SearchRequest) -> Dict[str, Any]:
"""Build the complete search request body."""
text_query = self._build_text_query(request.query)
filters = self._build_filters(request.filters)
function_score = self._build_function_score(text_query, filters)
body: Dict[str, Any] = {
"query": function_score,
"size": request.page_size,
"highlight": {
"fields": {
"title": {"number_of_fragments": 0},
"body": {
"fragment_size": 150,
"number_of_fragments": 3
}
},
"pre_tags": [""],
"post_tags": [""]
},
"_source": {
"excludes": ["embedding", "body"]
},
"track_total_hits": True
}
if request.min_score is not None:
body["min_score"] = request.min_score
body = self._paginate(body, request)
if request.include_aggregations:
body["aggs"] = self._build_aggregations()
return body
def _build_text_query(self, query_text: str) -> Dict[str, Any]:
"""Multi-match across analyzed fields with field boosting."""
return {
"bool": {
"should": [
{
"multi_match": {
"query": query_text,
"fields": [
f"title^{self.BOOST_TITLE}",
f"title.shingle^{self.BOOST_TITLE_SHINGLE}",
f"body^{self.BOOST_BODY}"
],
"type": "best_fields",
"tie_breaker": 0.3
}
},
{
"match_phrase": {
"title": {
"query": query_text,
"boost": self.BOOST_TITLE * 1.5,
"slop": 1
}
}
}
],
"minimum_should_match": 1
}
}
def _build_filters(
self, filters: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Build filter clauses from structured filter parameters."""
filter_clauses = []
if "category" in filters:
categories = filters["category"]
if isinstance(categories, str):
categories = [categories]
filter_clauses.append({
"terms": {"category_path.raw": categories}
})
if "tags" in filters:
for tag in filters["tags"]:
filter_clauses.append({"term": {"tags": tag}})
if "date_from" in filters or "date_to" in filters:
date_range: Dict[str, Any] = {}
if "date_from" in filters:
date_range["gte"] = filters["date_from"]
if "date_to" in filters:
date_range["lte"] = filters["date_to"]
filter_clauses.append({
"range": {"published_at": date_range}
})
if "author" in filters:
filter_clauses.append({
"term": {"author.raw": filters["author"]}
})
return filter_clauses
def _build_function_score(
self,
text_query: Dict[str, Any],
filters: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""Wrap text query in function_score with freshness decay
and popularity boost."""
query_with_filters = {
"bool": {
"must": [text_query],
"filter": filters
}
}
return {
"function_score": {
"query": query_with_filters,
"functions": [
{
"gauss": {
"published_at": {
"origin": "now",
"scale": self.FRESHNESS_SCALE,
"decay": self.FRESHNESS_DECAY
}
},
"weight": 1.5
},
{
"field_value_factor": {
"field": "popularity_score",
"modifier": self.POPULARITY_MODIFIER,
"factor": self.POPULARITY_FACTOR,
"missing": 0
},
"weight": 1.0
}
],
"score_mode": "multiply",
"boost_mode": "multiply",
"max_boost": 10.0
}
}
def _paginate(
self,
body: Dict[str, Any],
request: SearchRequest
) -> Dict[str, Any]:
"""Paginate using search_after for deep pagination.
NEVER use from/size beyond the first few pages. At from=10000,
Elasticsearch must fetch and sort 10000+size docs on every shard,
then merge. This kills cluster performance.
"""
if request.search_after is not None:
body["search_after"] = request.search_after
body["sort"] = [
{"_score": {"order": "desc"}},
{"published_at": {"order": "desc"}},
{"_id": {"order": "asc"}}
]
else:
body["sort"] = [
{"_score": {"order": "desc"}},
{"published_at": {"order": "desc"}},
{"_id": {"order": "asc"}}
]
return body
def _build_aggregations(self) -> Dict[str, Any]:
"""Standard faceted search aggregations."""
return {
"categories": {
"terms": {
"field": "category_path.raw",
"size": 20
}
},
"tags": {
"terms": {
"field": "tags",
"size": 30
}
},
"date_histogram": {
"date_histogram": {
"field": "published_at",
"calendar_interval": "month",
"min_doc_count": 1
}
},
"authors": {
"terms": {
"field": "author.raw",
"size": 10
}
}
}
Key patterns AI tools consistently miss: using search_after instead of from/size for pagination (the single most important performance decision in search API design), the tie_breaker parameter in multi_match that prevents a single high-scoring field from dominating, match_phrase with slop for rewarding exact or near-exact phrase matches, and the max_boost cap on function_score to prevent a single document from getting an absurd combined score.
3. Ranking & Relevance Tuning
Ranking is where search engineering becomes part art, part science, and part ongoing experimentation. BM25 defaults (k1=1.2, b=0.75) are reasonable starting points for general text but suboptimal for specific content types. Short documents like product titles need lower b (less length normalization). Long-form content needs higher k1 (less term frequency saturation). Learning-to-rank adds machine learning to the mix, training models on features extracted from both the query and document to predict relevance.
Learning-to-Rank Feature Extraction and Scoring
import numpy as np
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Callable
from datetime import datetime, timezone
import math
import pickle
import logging
logger = logging.getLogger(__name__)
@dataclass
class SearchResult:
"""A single search result with extracted features for LTR."""
doc_id: str
bm25_score: float
title: str
body_snippet: str
published_at: datetime
popularity_score: float
click_count: int
category: str
features: Dict[str, float] = field(default_factory=dict)
final_score: float = 0.0
class RelevanceEngine:
"""Production relevance engine combining BM25, LTR features,
and business rules into a final ranking."""
def __init__(
self,
ltr_model_path: Optional[str] = None,
bm25_k1: float = 1.2,
bm25_b: float = 0.75
):
self.bm25_k1 = bm25_k1
self.bm25_b = bm25_b
self.ltr_model = None
if ltr_model_path:
with open(ltr_model_path, "rb") as f:
self.ltr_model = pickle.load(f)
logger.info(f"Loaded LTR model from {ltr_model_path}")
def score_results(
self,
query: str,
results: List[SearchResult],
business_rules: Optional[List[Callable]] = None
) -> List[SearchResult]:
"""Score and re-rank results using LTR features + business rules.
Pipeline:
1. Extract features for each query-document pair
2. Apply LTR model (or fallback to weighted combination)
3. Apply business rules (pinning, boosting, filtering)
4. Sort by final score
"""
for result in results:
result.features = self._extract_ltr_features(query, result)
if self.ltr_model is not None:
feature_matrix = np.array([
self._features_to_vector(r.features) for r in results
])
scores = self.ltr_model.predict(feature_matrix)
for result, score in zip(results, scores):
result.final_score = float(score)
else:
for result in results:
result.final_score = self._fallback_scoring(result.features)
if business_rules:
results = self._apply_business_rules(results, business_rules)
results.sort(key=lambda r: r.final_score, reverse=True)
return results
def _extract_ltr_features(
self, query: str, result: SearchResult
) -> Dict[str, float]:
"""Extract features for learning-to-rank.
Features must be stable, reproducible, and fast to compute.
Avoid features that change between requests for the same
query-document pair (no random, no request-time features).
"""
query_tokens = query.lower().split()
title_tokens = result.title.lower().split()
query_in_title = sum(
1 for qt in query_tokens if qt in title_tokens
)
title_coverage = (
query_in_title / len(query_tokens) if query_tokens else 0.0
)
exact_title_match = 1.0 if query.lower() == result.title.lower() else 0.0
now = datetime.now(timezone.utc)
published = result.published_at
if published.tzinfo is None:
published = published.replace(tzinfo=timezone.utc)
age_days = max((now - published).days, 0)
freshness = 1.0 / (1.0 + math.log1p(age_days))
popularity_log = math.log1p(result.popularity_score)
click_log = math.log1p(result.click_count)
query_len = len(query_tokens)
title_len = len(title_tokens)
len_ratio = min(query_len, title_len) / max(query_len, title_len, 1)
return {
"bm25_score": result.bm25_score,
"title_coverage": title_coverage,
"exact_title_match": exact_title_match,
"freshness": freshness,
"age_days": float(age_days),
"popularity_log": popularity_log,
"click_log": click_log,
"query_doc_len_ratio": len_ratio,
"query_length": float(query_len),
"title_length": float(title_len),
}
def _features_to_vector(
self, features: Dict[str, float]
) -> List[float]:
"""Convert feature dict to ordered vector for model input.
Feature order must match training data exactly."""
feature_order = [
"bm25_score", "title_coverage", "exact_title_match",
"freshness", "age_days", "popularity_log", "click_log",
"query_doc_len_ratio", "query_length", "title_length"
]
return [features.get(f, 0.0) for f in feature_order]
def _fallback_scoring(self, features: Dict[str, float]) -> float:
"""Weighted combination when no LTR model is available.
These weights should be tuned on your evaluation set."""
weights = {
"bm25_score": 0.40,
"title_coverage": 0.20,
"exact_title_match": 0.10,
"freshness": 0.10,
"popularity_log": 0.10,
"click_log": 0.10,
}
return sum(
features.get(f, 0.0) * w for f, w in weights.items()
)
def _apply_business_rules(
self,
results: List[SearchResult],
rules: List[Callable]
) -> List[SearchResult]:
"""Apply business rules after scoring.
Rules are callables that take (result, position) and return
an adjusted score. Rules run in order and can pin results to
specific positions, boost/bury by category, or filter.
"""
for rule in rules:
for i, result in enumerate(results):
adjusted = rule(result, i)
if adjusted is not None:
result.final_score = adjusted
return results
The critical pattern here is the separation between feature extraction (deterministic, reproducible, fast) and scoring (model-based or heuristic). AI tools typically generate monolithic scoring functions that mix feature computation with ranking logic, making it impossible to retrain a model or A/B test different scoring strategies. Also note the explicit feature ordering in _features_to_vector — a mismatch between training and serving feature order is one of the most common LTR bugs, and it produces silently degraded results rather than errors.
4. Vector Search & Embeddings
Vector search has fundamentally changed information retrieval. Dense embeddings from transformer models capture semantic meaning that keyword search misses entirely — a search for “affordable housing in Manhattan” can find documents about “budget apartments in New York City” even without any keyword overlap. But integrating vector search into production systems is far more complex than adding a dense_vector field. You need hybrid retrieval that combines keyword and vector results, score normalization across fundamentally different scoring scales, and re-ranking pipelines that apply expensive cross-encoder models to cheap retrieval results.
Hybrid Search with Reciprocal Rank Fusion
import numpy as np
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Tuple
from elasticsearch import Elasticsearch
import logging
import hashlib
import json
logger = logging.getLogger(__name__)
@dataclass
class HybridResult:
"""A search result from hybrid retrieval."""
doc_id: str
keyword_score: float = 0.0
vector_score: float = 0.0
rrf_score: float = 0.0
keyword_rank: int = 0
vector_rank: int = 0
metadata: Dict[str, Any] = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
class HybridSearchEngine:
"""Production hybrid search combining BM25 keyword search with
dense vector similarity, fused via Reciprocal Rank Fusion (RRF)."""
def __init__(
self,
es_client: Elasticsearch,
index_name: str,
embedding_model: Any,
embedding_cache: Optional[Dict[str, List[float]]] = None,
rrf_k: int = 60,
keyword_weight: float = 1.0,
vector_weight: float = 1.0,
):
self.es = es_client
self.index_name = index_name
self.embedding_model = embedding_model
self.embedding_cache = embedding_cache or {}
self.rrf_k = rrf_k
self.keyword_weight = keyword_weight
self.vector_weight = vector_weight
def search(
self,
query: str,
top_k: int = 20,
retrieval_k: int = 100,
filters: Optional[List[Dict[str, Any]]] = None,
) -> List[HybridResult]:
"""Execute hybrid search with RRF fusion.
1. Run BM25 keyword search for top retrieval_k results
2. Run KNN vector search for top retrieval_k results
3. Fuse via Reciprocal Rank Fusion
4. Return top_k fused results
"""
keyword_results = self._keyword_search(
query, retrieval_k, filters
)
query_embedding = self._get_embedding(query)
vector_results = self._vector_search(
query_embedding, retrieval_k, filters
)
fused = self._reciprocal_rank_fusion(
keyword_results, vector_results
)
return fused[:top_k]
def _keyword_search(
self,
query: str,
top_k: int,
filters: Optional[List[Dict[str, Any]]] = None
) -> List[HybridResult]:
"""BM25 keyword search with optional filters."""
must_clause = {
"multi_match": {
"query": query,
"fields": ["title^3", "title.shingle^4", "body"],
"type": "best_fields",
"tie_breaker": 0.3
}
}
filter_clauses = filters or []
body = {
"query": {
"bool": {
"must": [must_clause],
"filter": filter_clauses
}
},
"size": top_k,
"_source": ["title", "published_at", "category_path", "url"]
}
response = self.es.search(index=self.index_name, body=body)
results = []
for rank, hit in enumerate(response["hits"]["hits"], 1):
results.append(HybridResult(
doc_id=hit["_id"],
keyword_score=hit["_score"],
keyword_rank=rank,
metadata=hit["_source"]
))
return results
def _vector_search(
self,
query_embedding: List[float],
top_k: int,
filters: Optional[List[Dict[str, Any]]] = None
) -> List[HybridResult]:
"""KNN vector search using HNSW index."""
knn_query = {
"field": "embedding",
"query_vector": query_embedding,
"k": top_k,
"num_candidates": top_k * 5
}
if filters:
knn_query["filter"] = {"bool": {"filter": filters}}
body = {
"knn": knn_query,
"size": top_k,
"_source": ["title", "published_at", "category_path", "url"]
}
response = self.es.search(index=self.index_name, body=body)
results = []
for rank, hit in enumerate(response["hits"]["hits"], 1):
results.append(HybridResult(
doc_id=hit["_id"],
vector_score=hit["_score"],
vector_rank=rank,
metadata=hit["_source"]
))
return results
def _reciprocal_rank_fusion(
self,
keyword_results: List[HybridResult],
vector_results: List[HybridResult],
) -> List[HybridResult]:
"""Fuse two ranked lists using Reciprocal Rank Fusion.
RRF(d) = sum( weight_i / (k + rank_i(d)) )
RRF is robust to score scale differences between retrieval
methods. Unlike score normalization (min-max or z-score), RRF
only uses rank positions, so it works even when BM25 scores
are unbounded and cosine similarity is in [0, 1].
"""
doc_map: Dict[str, HybridResult] = {}
for result in keyword_results:
doc_map[result.doc_id] = HybridResult(
doc_id=result.doc_id,
keyword_score=result.keyword_score,
keyword_rank=result.keyword_rank,
metadata=result.metadata
)
for result in vector_results:
if result.doc_id in doc_map:
doc_map[result.doc_id].vector_score = result.vector_score
doc_map[result.doc_id].vector_rank = result.vector_rank
else:
doc_map[result.doc_id] = HybridResult(
doc_id=result.doc_id,
vector_score=result.vector_score,
vector_rank=result.vector_rank,
metadata=result.metadata
)
for doc_id, result in doc_map.items():
rrf_score = 0.0
if result.keyword_rank > 0:
rrf_score += self.keyword_weight / (
self.rrf_k + result.keyword_rank
)
if result.vector_rank > 0:
rrf_score += self.vector_weight / (
self.rrf_k + result.vector_rank
)
result.rrf_score = rrf_score
fused = sorted(
doc_map.values(), key=lambda r: r.rrf_score, reverse=True
)
return fused
def _get_embedding(self, text: str) -> List[float]:
"""Get embedding with caching to avoid redundant computation."""
cache_key = hashlib.md5(text.encode()).hexdigest()
if cache_key in self.embedding_cache:
return self.embedding_cache[cache_key]
embedding = self.embedding_model.encode(text).tolist()
self.embedding_cache[cache_key] = embedding
return embedding
The critical insight in hybrid search is using Reciprocal Rank Fusion rather than score normalization. AI tools often generate code that normalizes BM25 and cosine scores to [0,1] and does a weighted average — but min-max normalization is unstable (it depends on the min and max scores in each result set, which change with every query) and z-score normalization requires enough results to estimate a meaningful standard deviation. RRF only uses rank positions, making it robust across fundamentally different scoring functions. Also note the num_candidates parameter in the KNN query: this controls the HNSW recall-speed tradeoff at query time and must be tuned per use case.
5. Query Understanding & NLP
Query understanding is the invisible layer that transforms the three words a user types into a structured, enriched query that the search system can actually answer well. Without it, a search for “red nike running shoes under $100” is just five tokens to match against an inverted index. With query understanding, it becomes: color=red, brand=Nike, category=running shoes, price_max=100, intent=transactional. This transformation must happen in under 50 milliseconds for every single query.
Query Parsing, Correction, and Intent Classification
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Set, Tuple
from enum import Enum
import re
import logging
from collections import defaultdict
logger = logging.getLogger(__name__)
class QueryIntent(Enum):
NAVIGATIONAL = "navigational"
INFORMATIONAL = "informational"
TRANSACTIONAL = "transactional"
AMBIGUOUS = "ambiguous"
@dataclass
class ParsedQuery:
"""Structured representation of a user query after NLP processing."""
original: str
corrected: str
tokens: List[str] = field(default_factory=list)
expanded_tokens: List[str] = field(default_factory=list)
intent: QueryIntent = QueryIntent.AMBIGUOUS
entities: Dict[str, List[str]] = field(default_factory=dict)
is_corrected: bool = False
suggestions: List[str] = field(default_factory=list)
class QueryUnderstanding:
"""Production query understanding pipeline.
Runs in sequence: normalize -> spell correct -> expand synonyms
-> classify intent -> extract entities. Total budget: <50ms.
"""
def __init__(
self,
dictionary: Set[str],
synonyms: Dict[str, List[str]],
entity_patterns: Dict[str, List[str]],
brand_set: Optional[Set[str]] = None,
category_set: Optional[Set[str]] = None,
):
self.dictionary = dictionary
self.synonyms = synonyms
self.entity_patterns = entity_patterns
self.brands = brand_set or set()
self.categories = category_set or set()
self._char_index: Dict[str, Set[str]] = defaultdict(set)
for word in dictionary:
for i in range(len(word)):
bigram = word[i:i+2].lower()
if len(bigram) == 2:
self._char_index[bigram].add(word)
self._navigational_signals = {
"login", "sign in", "signup", "sign up", "account",
"contact", "about", "home", "dashboard", "settings"
}
self._transactional_signals = {
"buy", "price", "cheap", "discount", "deal", "order",
"purchase", "cost", "shipping", "delivery", "subscribe"
}
def process_query(self, raw_query: str) -> ParsedQuery:
"""Full query understanding pipeline."""
normalized = self._normalize(raw_query)
corrected, is_corrected = self._spell_correct(normalized)
tokens = corrected.split()
expanded = self._expand_synonyms(tokens)
intent = self._classify_intent(corrected, tokens)
entities = self._extract_entities(corrected, tokens)
suggestions = []
if is_corrected:
suggestions.append(corrected)
return ParsedQuery(
original=raw_query,
corrected=corrected,
tokens=tokens,
expanded_tokens=expanded,
intent=intent,
entities=entities,
is_corrected=is_corrected,
suggestions=suggestions,
)
def _normalize(self, query: str) -> str:
"""Normalize query: lowercase, collapse whitespace, strip."""
query = query.lower().strip()
query = re.sub(r"\s+", " ", query)
query = re.sub(r"[^\w\s\-\.]", "", query)
return query
def _spell_correct(self, query: str) -> Tuple[str, bool]:
"""Fast spell correction using edit distance 1.
Only correct tokens not in the dictionary and not recognized
as entities (brand names, product codes, etc.). Domain-specific
terms must be in the dictionary to avoid false corrections.
"""
tokens = query.split()
corrected_tokens = []
was_corrected = False
for token in tokens:
if token in self.dictionary or token in self.brands:
corrected_tokens.append(token)
continue
if len(token) <= 2:
corrected_tokens.append(token)
continue
candidates = self._edit_distance_1_candidates(token)
known_candidates = [
c for c in candidates if c in self.dictionary
]
if known_candidates:
best = max(known_candidates, key=lambda c: (
self._prefix_match_len(token, c),
-abs(len(c) - len(token))
))
corrected_tokens.append(best)
was_corrected = True
logger.debug(f"Spell corrected: {token} -> {best}")
else:
corrected_tokens.append(token)
return " ".join(corrected_tokens), was_corrected
def _edit_distance_1_candidates(self, word: str) -> Set[str]:
"""Generate all strings within edit distance 1.
Uses bigram index for fast candidate lookup."""
candidates = set()
bigrams = set()
for i in range(len(word)):
bigrams.add(word[i:i+2].lower())
for bigram in bigrams:
candidates.update(self._char_index.get(bigram, set()))
return {
c for c in candidates
if abs(len(c) - len(word)) <= 1
and self._fast_edit_distance(word, c) <= 1
}
def _fast_edit_distance(self, s1: str, s2: str) -> int:
"""Optimized Levenshtein distance with early termination."""
if abs(len(s1) - len(s2)) > 1:
return 2
if s1 == s2:
return 0
len1, len2 = len(s1), len(s2)
prev = list(range(len2 + 1))
curr = [0] * (len2 + 1)
for i in range(1, len1 + 1):
curr[0] = i
min_val = curr[0]
for j in range(1, len2 + 1):
cost = 0 if s1[i-1] == s2[j-1] else 1
curr[j] = min(
prev[j] + 1,
curr[j-1] + 1,
prev[j-1] + cost
)
min_val = min(min_val, curr[j])
if min_val > 1:
return 2
prev, curr = curr, prev
return prev[len2]
def _prefix_match_len(self, s1: str, s2: str) -> int:
"""Length of common prefix between two strings."""
for i in range(min(len(s1), len(s2))):
if s1[i] != s2[i]:
return i
return min(len(s1), len(s2))
def _expand_synonyms(self, tokens: List[str]) -> List[str]:
"""Expand query tokens with synonyms.
Synonym expansion is asymmetric: 'NYC' -> 'new york city'
but 'new york city' does NOT become 'NYC' at query time.
Expansion at query time is deliberate; index-time synonym
expansion is handled by the analyzer.
"""
expanded = list(tokens)
for token in tokens:
if token in self.synonyms:
for syn in self.synonyms[token]:
syn_tokens = syn.split()
for st in syn_tokens:
if st not in expanded:
expanded.append(st)
return expanded
def _classify_intent(
self, query: str, tokens: List[str]
) -> QueryIntent:
"""Classify query intent for routing.
- navigational: user wants a specific page
- informational: user wants to learn something
- transactional: user wants to buy/act
"""
token_set = set(tokens)
if query in self._navigational_signals:
return QueryIntent.NAVIGATIONAL
if token_set & self._transactional_signals:
return QueryIntent.TRANSACTIONAL
if any(t in query for t in ["how to", "what is", "why", "guide"]):
return QueryIntent.INFORMATIONAL
if len(tokens) <= 2 and tokens[0] in self.brands:
return QueryIntent.NAVIGATIONAL
return QueryIntent.AMBIGUOUS
def _extract_entities(
self, query: str, tokens: List[str]
) -> Dict[str, List[str]]:
"""Extract structured entities from free-text query.
Extracts: brands, categories, attributes (color, size),
price ranges, and other domain-specific entities.
"""
entities: Dict[str, List[str]] = defaultdict(list)
for token in tokens:
if token in self.brands:
entities["brand"].append(token)
for token in tokens:
if token in self.categories:
entities["category"].append(token)
price_match = re.search(
r"(?:under|below|less than|max|<)\s*\$?(\d+(?:\.\d{2})?)",
query
)
if price_match:
entities["price_max"].append(price_match.group(1))
price_range = re.search(
r"\$?(\d+)\s*(?:-|to)\s*\$?(\d+)", query
)
if price_range:
entities["price_min"].append(price_range.group(1))
entities["price_max"].append(price_range.group(2))
for entity_type, patterns in self.entity_patterns.items():
for pattern in patterns:
if re.search(pattern, query):
match = re.search(pattern, query)
entities[entity_type].append(match.group(0))
return dict(entities)
The key design decisions AI tools miss in query understanding: using a bigram index for spell correction candidates rather than generating all possible edits (the naive approach is O(n*26^n) and too slow for query-time), asymmetric synonym expansion (expanding “NYC” to “new york city” at query time but not the reverse), early termination in edit distance computation, and the explicit distinction between navigational, informational, and transactional intent that changes which search strategy to apply downstream.
6. Search Analytics & Quality Metrics
You cannot improve what you do not measure, and measuring search quality is fundamentally harder than measuring web application performance. A search system can return results quickly, with no errors, and still be completely useless if the results are not relevant. NDCG, MRR, and precision@K quantify different aspects of ranking quality, but they all require judgment data that is expensive to collect and inherently subjective. Click-through data is abundant but biased by position (users click the first result regardless of quality).
NDCG, Click Tracking, and A/B Testing
import math
import numpy as np
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple, Any
from datetime import datetime, timezone
from collections import defaultdict
import hashlib
import logging
logger = logging.getLogger(__name__)
@dataclass
class JudgmentRecord:
"""A relevance judgment for a query-document pair."""
query: str
doc_id: str
relevance: int # 0=irrelevant, 1=marginal, 2=relevant, 3=highly relevant
judge_id: str = ""
@dataclass
class ClickRecord:
"""A click event in search results."""
query: str
doc_id: str
position: int # 1-indexed position in results
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
session_id: str = ""
dwell_time_ms: int = 0
@dataclass
class ExperimentResult:
"""Results from an A/B test comparison."""
control_ndcg: float
treatment_ndcg: float
control_ctr: float
treatment_ctr: float
control_mrr: float
treatment_mrr: float
p_value_ndcg: float
sample_size: int
is_significant: bool
class SearchAnalytics:
"""Production search analytics: NDCG computation, click tracking
with position bias correction, and A/B test framework."""
POSITION_BIAS_WEIGHTS = [
1.0, 0.65, 0.50, 0.40, 0.33,
0.28, 0.24, 0.21, 0.18, 0.16
]
def __init__(self):
self._click_log: List[ClickRecord] = []
self._judgments: Dict[str, List[JudgmentRecord]] = defaultdict(list)
def compute_ndcg(
self,
query: str,
ranked_doc_ids: List[str],
judgments: List[JudgmentRecord],
k: int = 10,
) -> float:
"""Compute NDCG@K for a single query.
NDCG = DCG / IDCG where
DCG = sum( (2^rel_i - 1) / log2(i + 1) ) for i in 1..k
IDCG = DCG of the ideal ranking (sorted by relevance desc)
"""
judgment_map = {j.doc_id: j.relevance for j in judgments}
relevances = []
for doc_id in ranked_doc_ids[:k]:
relevances.append(judgment_map.get(doc_id, 0))
dcg = self._dcg(relevances)
ideal_relevances = sorted(
[j.relevance for j in judgments], reverse=True
)[:k]
idcg = self._dcg(ideal_relevances)
if idcg == 0:
return 0.0
return dcg / idcg
def compute_mrr(
self,
query: str,
ranked_doc_ids: List[str],
relevant_doc_ids: set,
) -> float:
"""Compute Mean Reciprocal Rank for a single query.
MRR = 1 / rank_of_first_relevant_result
"""
for i, doc_id in enumerate(ranked_doc_ids, 1):
if doc_id in relevant_doc_ids:
return 1.0 / i
return 0.0
def _dcg(self, relevances: List[int]) -> float:
"""Discounted Cumulative Gain."""
dcg = 0.0
for i, rel in enumerate(relevances, 1):
dcg += (2 ** rel - 1) / math.log2(i + 1)
return dcg
def track_click(
self,
query: str,
doc_id: str,
position: int,
session_id: str = "",
dwell_time_ms: int = 0,
) -> None:
"""Record a click event for analytics and evaluation."""
record = ClickRecord(
query=query,
doc_id=doc_id,
position=position,
session_id=session_id,
dwell_time_ms=dwell_time_ms,
)
self._click_log.append(record)
def compute_ctr_by_position(
self,
impressions: Dict[str, int],
) -> Dict[int, float]:
"""Compute CTR per position for position bias analysis.
Raw CTR at position 1 is always highest, but that does not
mean position-1 results are always best. Position bias
correction reveals the true relevance signal.
"""
clicks_by_position: Dict[int, int] = defaultdict(int)
for click in self._click_log:
clicks_by_position[click.position] += 1
ctr = {}
for pos, click_count in clicks_by_position.items():
total_impressions = impressions.get(str(pos), 1)
ctr[pos] = click_count / total_impressions
return ctr
def _position_bias_correction(
self,
clicks: List[ClickRecord],
) -> Dict[str, float]:
"""Correct click signals for position bias.
Divides each click by the examination probability at that
position. A click at position 5 is worth more than a click
at position 1 because users are less likely to examine
position 5 at all.
"""
corrected_relevance: Dict[str, float] = defaultdict(float)
for click in clicks:
pos_idx = min(click.position - 1, len(self.POSITION_BIAS_WEIGHTS) - 1)
examination_prob = self.POSITION_BIAS_WEIGHTS[pos_idx]
corrected_signal = 1.0 / examination_prob
if click.dwell_time_ms > 30000:
corrected_signal *= 1.5
elif click.dwell_time_ms < 5000:
corrected_signal *= 0.5
corrected_relevance[click.doc_id] += corrected_signal
return dict(corrected_relevance)
def run_ab_test(
self,
control_results: Dict[str, List[str]],
treatment_results: Dict[str, List[str]],
judgments: Dict[str, List[JudgmentRecord]],
alpha: float = 0.05,
) -> ExperimentResult:
"""Run offline A/B test comparing two ranking strategies.
Args:
control_results: {query: [doc_ids]} for control
treatment_results: {query: [doc_ids]} for treatment
judgments: {query: [JudgmentRecord]} relevance judgments
alpha: significance level
"""
control_ndcgs = []
treatment_ndcgs = []
control_mrrs = []
treatment_mrrs = []
queries = set(control_results.keys()) & set(treatment_results.keys())
queries = queries & set(judgments.keys())
for query in queries:
query_judgments = judgments[query]
relevant_ids = {
j.doc_id for j in query_judgments if j.relevance >= 2
}
c_ndcg = self.compute_ndcg(
query, control_results[query], query_judgments
)
t_ndcg = self.compute_ndcg(
query, treatment_results[query], query_judgments
)
control_ndcgs.append(c_ndcg)
treatment_ndcgs.append(t_ndcg)
c_mrr = self.compute_mrr(
query, control_results[query], relevant_ids
)
t_mrr = self.compute_mrr(
query, treatment_results[query], relevant_ids
)
control_mrrs.append(c_mrr)
treatment_mrrs.append(t_mrr)
c_ndcg_mean = float(np.mean(control_ndcgs)) if control_ndcgs else 0.0
t_ndcg_mean = float(np.mean(treatment_ndcgs)) if treatment_ndcgs else 0.0
n = len(control_ndcgs)
if n > 1:
diffs = [t - c for t, c in zip(treatment_ndcgs, control_ndcgs)]
diff_mean = np.mean(diffs)
diff_std = np.std(diffs, ddof=1)
t_stat = diff_mean / (diff_std / math.sqrt(n)) if diff_std > 0 else 0
p_value = 2.0 * (1.0 - self._t_cdf(abs(t_stat), n - 1))
else:
p_value = 1.0
return ExperimentResult(
control_ndcg=c_ndcg_mean,
treatment_ndcg=t_ndcg_mean,
control_ctr=0.0,
treatment_ctr=0.0,
control_mrr=float(np.mean(control_mrrs)) if control_mrrs else 0.0,
treatment_mrr=float(np.mean(treatment_mrrs)) if treatment_mrrs else 0.0,
p_value_ndcg=p_value,
sample_size=n,
is_significant=p_value < alpha,
)
def _t_cdf(self, t: float, df: int) -> float:
"""Approximate t-distribution CDF using normal approximation
for large df. For production, use scipy.stats.t.cdf."""
if df > 30:
from math import erf
return 0.5 * (1 + erf(t / math.sqrt(2)))
try:
from scipy.stats import t as t_dist
return t_dist.cdf(t, df)
except ImportError:
from math import erf
return 0.5 * (1 + erf(t / math.sqrt(2)))
Position bias correction is the single most overlooked aspect of search analytics. AI tools generate click-through rate analysis that treats a click at position 1 the same as a click at position 10, when in reality a click at position 10 is a much stronger relevance signal because users rarely even look at results beyond position 3. The examination probability model (users examine position 1 with probability 1.0, position 5 with probability 0.33) corrects for this bias and reveals the true quality of your ranking. Also note the dwell-time weighting: a click followed by 30+ seconds of engagement indicates satisfaction, while a sub-5-second bounce indicates a misleading snippet.
7. Distributed Search Infrastructure
At scale, search infrastructure is a distributed systems problem. Your index does not fit on a single node, so you shard it across a cluster. Your query traffic exceeds what a single node can handle, so you add replicas. Your data has different access patterns over time, so you implement lifecycle management to move old data from fast SSDs to cheap spinning disks. Cross-cluster search lets you query indexes in different data centers. Getting these infrastructure decisions wrong costs either money (over-provisioned hardware) or user experience (slow queries, outages).
Index Lifecycle Management and Cluster Operations
from elasticsearch import Elasticsearch
from typing import Dict, Any, List, Optional
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class ClusterManager:
"""Production cluster management: ILM policies, shard allocation,
cross-cluster search, and capacity planning."""
def __init__(self, es_client: Elasticsearch):
self.es = es_client
def configure_ilm(
self,
policy_name: str = "search_lifecycle",
hot_max_size: str = "50gb",
hot_max_age: str = "7d",
warm_max_age: str = "30d",
cold_max_age: str = "90d",
delete_after: str = "365d",
) -> None:
"""Configure Index Lifecycle Management for hot/warm/cold/delete.
Hot: Fast SSDs, primary indexing target, full replicas
Warm: Cheaper storage, read-only, force-merged to 1 segment
Cold: Cheapest storage, searchable snapshots, minimal replicas
Delete: Remove after retention period
"""
policy = {
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_primary_shard_size": hot_max_size,
"max_age": hot_max_age,
},
"set_priority": {"priority": 100},
}
},
"warm": {
"min_age": warm_max_age,
"actions": {
"allocate": {
"number_of_replicas": 1,
"require": {"data": "warm"}
},
"forcemerge": {"max_num_segments": 1},
"readonly": {},
"set_priority": {"priority": 50},
"shrink": {"number_of_shards": 1},
}
},
"cold": {
"min_age": cold_max_age,
"actions": {
"allocate": {
"number_of_replicas": 0,
"require": {"data": "cold"}
},
"set_priority": {"priority": 0},
}
},
"delete": {
"min_age": delete_after,
"actions": {
"delete": {}
}
}
}
}
}
self.es.ilm.put_lifecycle(name=policy_name, body=policy)
logger.info(f"ILM policy '{policy_name}' configured")
def create_ilm_index_template(
self,
template_name: str,
index_pattern: str,
policy_name: str,
num_shards: int = 3,
num_replicas: int = 1,
) -> None:
"""Create index template that applies ILM policy to new indices."""
template = {
"index_patterns": [index_pattern],
"template": {
"settings": {
"number_of_shards": num_shards,
"number_of_replicas": num_replicas,
"index.lifecycle.name": policy_name,
"index.lifecycle.rollover_alias": index_pattern.rstrip("*-"),
"index.routing.allocation.require.data": "hot",
}
}
}
self.es.indices.put_index_template(
name=template_name, body=template
)
logger.info(f"Index template '{template_name}' created")
def rebalance_shards(
self,
max_shard_size_gb: float = 50.0,
min_shard_size_gb: float = 10.0,
) -> Dict[str, Any]:
"""Analyze shard sizes and recommend rebalancing actions.
Target: each shard 10-50 GB. Too small = overhead.
Too large = slow recovery and uneven load distribution.
"""
stats = self.es.indices.stats(metric="store")
recommendations = []
for index_name, index_stats in stats["indices"].items():
primary_size_bytes = (
index_stats["primaries"]["store"]["size_in_bytes"]
)
primary_count = index_stats["primaries"]["store"].get(
"total_data_set_size_in_bytes",
primary_size_bytes
)
settings = self.es.indices.get_settings(index=index_name)
num_shards = int(
settings[index_name]["settings"]["index"]
.get("number_of_shards", 1)
)
shard_size_gb = (primary_size_bytes / num_shards) / (1024 ** 3)
if shard_size_gb > max_shard_size_gb:
ideal_shards = max(
1,
int(primary_size_bytes / (1024 ** 3) / max_shard_size_gb) + 1
)
recommendations.append({
"index": index_name,
"action": "split",
"current_shards": num_shards,
"recommended_shards": ideal_shards,
"shard_size_gb": round(shard_size_gb, 2),
"reason": f"Shards too large ({shard_size_gb:.1f} GB)"
})
elif shard_size_gb < min_shard_size_gb and num_shards > 1:
ideal_shards = max(
1,
int(primary_size_bytes / (1024 ** 3) / max_shard_size_gb) + 1
)
recommendations.append({
"index": index_name,
"action": "shrink",
"current_shards": num_shards,
"recommended_shards": ideal_shards,
"shard_size_gb": round(shard_size_gb, 2),
"reason": f"Shards too small ({shard_size_gb:.1f} GB)"
})
return {
"recommendations": recommendations,
"total_indices": len(stats["indices"]),
"indices_needing_action": len(recommendations),
}
def setup_cross_cluster(
self,
remote_clusters: Dict[str, Dict[str, Any]],
) -> None:
"""Configure cross-cluster search for multi-datacenter queries.
Each remote cluster needs seeds (transport addresses) and
connection settings. Queries fan out to all clusters and
results are merged.
"""
settings = {}
for cluster_name, config in remote_clusters.items():
prefix = f"cluster.remote.{cluster_name}"
settings[f"{prefix}.seeds"] = config["seeds"]
settings[f"{prefix}.transport.compress"] = config.get(
"compress", True
)
settings[f"{prefix}.skip_unavailable"] = config.get(
"skip_unavailable", True
)
self.es.cluster.put_settings(body={"persistent": settings})
logger.info(
f"Configured {len(remote_clusters)} remote clusters: "
f"{list(remote_clusters.keys())}"
)
def get_cluster_health_report(self) -> Dict[str, Any]:
"""Comprehensive cluster health check for monitoring."""
health = self.es.cluster.health()
stats = self.es.cluster.stats()
pending = self.es.cluster.pending_tasks()
return {
"status": health["status"],
"node_count": health["number_of_nodes"],
"data_node_count": health["number_of_data_nodes"],
"active_shards": health["active_shards"],
"relocating_shards": health["relocating_shards"],
"initializing_shards": health["initializing_shards"],
"unassigned_shards": health["unassigned_shards"],
"pending_tasks": len(pending.get("tasks", [])),
"total_indices": stats["indices"]["count"],
"total_docs": stats["indices"]["docs"]["count"],
"total_size_gb": round(
stats["indices"]["store"]["size_in_bytes"] / (1024 ** 3), 2
),
"jvm_heap_used_pct": round(
stats["nodes"]["jvm"]["mem"]["heap_used_in_bytes"]
/ max(stats["nodes"]["jvm"]["mem"]["heap_max_in_bytes"], 1)
* 100, 1
),
}
The ILM policy is where search infrastructure meets cost engineering. Hot nodes with fast SSDs handle real-time indexing and the majority of queries. Warm nodes with cheaper storage hold older data that is still searchable but does not need to be as fast. Cold nodes use the cheapest storage for archival data. The forcemerge to 1 segment in the warm phase is critical — it reduces segment overhead and improves query performance on read-only data. AI tools generate flat cluster configurations where all data lives on the same tier, which either wastes expensive storage on old data or puts fresh data on slow disks.
What AI Tools Get Wrong
After extensive testing across all major AI coding tools, these are the most common and most dangerous errors in AI-generated search code:
- Using from/size for deep pagination: This is the single most destructive pattern in search engineering. At
from=10000, size=20, Elasticsearch must fetch 10,020 documents from every shard, sort them, merge across shards, and discard 10,000. On a 10-shard index, that is 100,200 documents fetched and sorted for 20 results. This kills cluster performance for all users. Every AI tool generatesfrom/sizepagination by default. Usesearch_afterwith a sort tiebreaker for deep pagination, orscroll/PITfor bulk export. - Default analyzers for non-English text: The standard analyzer tokenizes on whitespace and lowercases. This is useless for CJK languages (Chinese, Japanese, Korean) where words are not separated by spaces. It is inadequate for German compound words, Arabic morphology, or Thai script. Production multilingual search requires ICU analyzers, language-specific stemmers, and often multiple analyzers per field. AI tools generate English-only analyzer configurations even when the prompt mentions multilingual content.
- Hardcoding BM25 defaults without tuning for content type: The default BM25 parameters (k1=1.2, b=0.75) assume medium-length documents with natural term frequency distributions. Short documents (product titles, tweets) need lower b (less length normalization because all documents are short). Long documents (academic papers, legal contracts) may need higher k1 (less term frequency saturation). AI tools never suggest tuning BM25 parameters, even when the content type is specified in the prompt.
- Missing field-level boosting in multi-match queries: A match in the title should score higher than a match in the body. A match in the product name should score higher than a match in the description. AI tools generate flat multi_match queries where all fields contribute equally, producing rankings where a keyword mention in a footnote scores the same as a keyword in the headline.
- Generating vector search without normalization: Cosine similarity produces scores in [-1, 1]. Dot product produces unbounded scores. BM25 produces scores in [0, infinity). When combining these in hybrid search, you must normalize or use rank-based fusion. AI tools generate naive score addition (
bm25_score + vector_score) that makes whichever scoring function produces larger numbers dominate the ranking entirely. - Not handling zero-result queries: 10–15% of search queries return zero results on most search systems. Without fallback strategies (relaxing filters, removing rare terms, falling back to fuzzy matching, showing related categories), users see an empty page and leave. AI tools generate the happy path only — what happens when the query matches nothing is never addressed.
- Using refresh=true on every index operation:
refresh=trueforces Elasticsearch to make the document immediately searchable by creating a new Lucene segment. On a high-throughput indexing pipeline doing 10,000 documents/second, this creates 10,000 tiny segments per second, which triggers constant segment merges that consume all available I/O and CPU. Userefresh=falseduring bulk indexing and rely on the default 1-second refresh interval, or userefresh=wait_forwhen you need near-real-time visibility. - Missing circuit breakers for expensive aggregations: A
termsaggregation on a high-cardinality field (like user_id with 100 million unique values) will attempt to build a hash map of all unique values in memory. Without circuit breakers, this single query can OOM-kill the node. AI tools generate aggregation queries without considering cardinality or settingshard_sizelimits, and never configure theindices.breaker.request.limitsetting that prevents runaway queries from taking down the cluster.
Cost Model: What Search Engineers Should Spend
| Scenario | Tool Stack | Monthly Cost | Notes |
|---|---|---|---|
| Search Hobbyist / Student Learning Elasticsearch, small personal projects |
GitHub Copilot Free + local Elasticsearch | $0 | 2,000 completions/mo covers learning and experimentation. Copilot knows ES query DSL patterns well enough for tutorials. |
| Solo Search Developer Side project with search, indie SaaS search feature |
Copilot Pro or Cursor Pro | $10–20/mo | Full completions for mapping design, query building, and API endpoints. Multi-file context helps with query builder + mapping + config coherence. |
| Growth-Stage Search Engineer Series A/B company, scaling search to millions of docs |
Claude Code + Copilot Pro | $20–40/mo | Claude for architecture (hybrid search design, ILM policies, shard strategies), Copilot for fast inline DSL completions. |
| Search Team at Scale Dedicated search team, billions of documents, custom ranking |
Cursor Business + Claude Code | $40–60/mo | Cursor’s multi-file context handles complex query pipelines; Claude reasons through LTR feature design and relevance trade-offs. |
| Enterprise Search Platform Team Shared search infrastructure, multi-tenant, compliance |
Cursor Business + Claude Code (per seat) | $60–99/seat | Enterprise features (SSO, audit logs, zero data retention) required for teams handling proprietary search data and ranking models. |
ROI reality check: Search engineers typically earn $160,000–$280,000+ (e-commerce search, ad relevance, and enterprise search platforms pay the highest). At $200K/year, a 5% productivity gain justifies $833/month in tooling. Even at a conservative 2% gain from AI coding tools (primarily from faster boilerplate generation in query building, mapping design, and evaluation scripts), a $20–60/month investment pays for itself 8–27x over. The areas where AI tools save the most time in search engineering are not the ranking core (where you need to understand every scoring decision) but the surrounding infrastructure: mapping definitions, API endpoint scaffolding, analytics dashboards, and configuration management.
Practical Recommendations
- Mapping generation and analyzer configuration: Index mappings are verbose JSON with predictable structure. AI tools generate 80% of a production mapping correctly, and the remaining 20% (custom analyzer chains, HNSW parameters) is faster to edit than to write from scratch.
- Query DSL construction: Elasticsearch query DSL is deeply nested JSON that is tedious to write by hand. AI tools excel at generating bool queries, function_score wrappers, and aggregation structures from natural language descriptions.
- Boilerplate API endpoints: Search API routes, request validation, response formatting, and error handling are repetitive and well-suited to AI generation.
- Test fixture generation: Creating test documents with realistic field values, generating judgment sets for evaluation, building query test suites — all high-volume, low-complexity tasks.
- Evaluation metric computation: NDCG, MRR, precision@K implementations are mathematically well-defined and well-represented in training data. AI tools generate correct implementations reliably.
- Relevance scoring logic: Every weight, every boost, every decay function parameter directly affects ranking quality. These values must come from evaluation data, not from AI-generated defaults.
- Shard routing decisions: Incorrect routing keys cause hot spots. Incorrect shard counts cause either too many small segments or too few large shards that slow recovery. These are capacity planning decisions, not code generation tasks.
- Custom analyzer chains: The order of token filters matters (synonym expansion before or after stemming?). Character filters, tokenizers, and token filters interact in non-obvious ways. Always test with the
_analyzeAPI. - Embedding model selection: The choice of embedding model determines what “similarity” means in your vector search. A model trained on general web text will not produce useful embeddings for legal documents or medical records. This is a domain decision, not a code decision.
- Query rewriting rules: Synonym expansion, spell correction thresholds, and intent classification boundaries all affect recall and precision in ways that require evaluation on your specific query distribution.
Related Guides
- AI Coding Tools for Backend Engineers (2026) — APIs, databases, server architecture
- AI Coding Tools for Data Engineers (2026) — Pipelines, ETL, data warehousing
- AI Coding Tools for ML Engineers (2026) — Model training, experiment tracking, MLOps
- AI Coding Tools for Performance Engineers (2026) — Profiling, optimization, latency reduction
- AI Coding Tools for Database Internals Engineers (2026) — Storage engines, query optimizers, replication
- AI Coding Tools for API Developers (2026) — REST, GraphQL, API design and documentation