- Published on
Building a Schema-Agnostic Hybrid Search System in PostgreSQL
- Authors

- Name
- Tim Frohlich
Overview
Searching over dynamic, user-defined schemas raises challenges that go beyond traditional full-text retrieval. Queries must span both structured and unstructured data, remain type-safe despite generic storage, and ideally avoid the synchronization and operational overhead of separate search infrastructure when a relational database is already the source of truth. When this search layer is exposed to AI agents, additional constraints apply: queries must be validated, constrained to supported operations, and protected against invalid or hallucinated behavior.
In this article we walk through the design of a type-safe, schema-agnostic search system built entirely on PostgreSQL. We combine Entity-Attribute-Value (EAV) indexing with ltree to index nested, dynamic schemas; hybrid retrieval using pgvector and pg_trgm with Reciprocal Rank Fusion; and a Pydantic-based query DSL that compiles into validated SQL.
The same Pydantic query models are used as structured tool arguments in PydanticAI, allowing an AI agent to construct queries without generating raw SQL. This constrains LLM outputs to valid operations, enables multi-layer validation and self-correction, and provides end-to-end type safety from natural language input through query execution.
Table Of Contents
- Table Of Contents
- Introduction
- Schema-Agnostic Indexing
- Retrieval
- Query Domain-Specific Language
- Connecting an Agent to the Query layer
- Discussion
- Conclusion
Introduction
Orchestration frameworks play an increasingly central role in managing complex systems composed of many interdependent components. Recent advances in large language models and autonomous agents have further accelerated this trend, increasing the need for flexible orchestration layers that can reason over dynamic system state and user-defined workflows. Frameworks such as N8N, Zapier, and Make illustrate this shift toward declarative composition and automation across heterogeneous services.
In this article we focus on orchestrator-core, an open-source orchestration framework for managing product lifecycles and workflows.
The framework operates on domain models in which subscriptions instantiate products assembled from reusable product blocks. Product blocks may nest arbitrarily, resulting in highly compositional domain models where a single subscription can contain hundreds of attributes distributed across deeply nested structures. The resulting data model is dynamic, user-defined, and cannot be described by a fixed relational schema. (Domain model details)
These characteristics impose non-trivial requirements on the search layer. Users must be able to search across subscriptions and all nested attributes, including fields that are not known at design time. In addition, search must support filtering on non-string data types such as timestamps, enums, and booleans. Because orchestrator-core is built on PostgreSQL, FastAPI, Pydantic, and SQLAlchemy, any search solution must integrate naturally with this stack and operate directly on the primary relational database.
While the domain models define a small set of base attributes, consumers are free to introduce their own product structures. As a result, the searchable schema emerges from the data itself and evolves over time. This precludes schema-driven indexing strategies and requires a schema-agnostic approach that supports both structured filtering and unstructured search. Furthermore, because valid query predicates cannot be derived from a static schema, the system must discover available fields and their types at runtime in order to construct safe and correct queries.
This article presents the design and implementation of a schema-agnostic, PostgreSQL-native hybrid search system for dynamic domain models. We describe the indexing strategy used to represent nested, user-defined schemas, a type-safe Query Domain-Specific Language (DSL) for constructing and validating queries, and the agent-facing interface that enables AI agents to safely generate and execute queries against the indexed data.
Technology stack
- Python
- PostgreSQL
- pgvector
- ltree
- pg_trgm
- SQLAlchemy
- Pydantic
- PydanticAI
- FastAPI
- AG-UI protocol
Schema-Agnostic Indexing
Dynamic Schemas
In conventional relational data models, schemas are relatively stable and evolve through controlled migrations. New fields are introduced explicitly, columns are added to tables, and indexes are rebuilt as part of deployment workflows.
The entities indexed in this system do not follow this model. Apart from a small set of base attributes, users are free to define their own domain models by composing reusable product blocks. These blocks can nest arbitrarily, resulting in deeply nested and highly variable structures whose shape cannot be predicted ahead of time.
Under these conditions, traditional column-oriented indexing strategies are infeasible. Supporting search over arbitrarily nested attributes requires an indexing approach that does not rely on a predefined schema and can represent hierarchical structure at runtime.
Entity–Attribute–Value Indexing with Hierarchical Paths
To represent dynamic, nested schemas, we adopt an Entity-Attribute-Value (EAV) model combined with PostgreSQL's ltree extension.
This approach represents each indexed attribute of an entity as a separate row in an index table. Each row records the hierarchical path of the attribute, its value, and its inferred type. An embedding vector is stored for attributes that participate in semantic search, while non-textual attributes are indexed for structured filtering.
class AiSearchIndex(BaseModel):
# Entity identification
entity_type: str # SUBSCRIPTION, PRODUCT, WORKFLOW, PROCESS
entity_id: UUID # Entity Identifier
entity_title: str # Human-readable title
# Hierarchical path using ltree
path: Ltree # e.g., "subscription.product.name"
value: str # String representation of the value
value_type: FieldType # STRING, INTEGER, FLOAT, DATETIME, UUID, BOOLEAN
# Vector embedding for semantic search
embedding: Vector(dim) # pgvector type for similarity search
# Change detection
content_hash: str # SHA-256 hash for detecting changes
The path column uses the ltree data type provided by PostgreSQL's ltree extension, which is designed for representing and querying hierarchical data. An ltree path may look like subscription.product.block.0.name, where dots separate levels in the hierarchy. The ltree data type enables efficient exact matching and unique identification of fields within an entity.
A simple subscription might generate rows such as:
| entity_id | path | value | value_type |
|---|---|---|---|
| uuid-123 | subscription.customer_id | "Company" | STRING |
| uuid-123 | subscription.product.name | "Basic Plan" | STRING |
| uuid-123 | subscription.start_date | "2024-01-01" | DATETIME |
| uuid-123 | subscription.product.price | "99.00" | FLOAT |
Because attributes are represented as path–value pairs, new fields can be indexed without schema changes by adding additional rows with new paths. The primary tradeoff is increased query complexity: retrieving entities requires reasoning over sets of path–value pairs rather than fixed columns. In addition, deeply nested or highly compositional domain models can produce a large number of index rows per entity, increasing index size and the number of database writes during indexing. These costs are addressed through incremental indexing, field-level change detection, and explicit constraints on query shape and candidate selection, as described in later sections.
Recursive Model Traversal and Field Extraction
To populate the EAV index, domain models must be decomposed into a flat set of path–value pairs. This is achieved by recursively traversing the Pydantic model structure and introspecting field definitions to produce typed path–value pairs for each attribute.
# Pseudocode
class BaseTraverser:
def traverse(model, path=""):
fields = model.model_fields + model.computed_fields
for (name, field_def) in fields:
value = safe_getattr(model, name)
if value is None:
continue
field_path = join_path(path, name) # e.g. "subscription.product.name"
annotation = field_def.annotation
yield from emit(value, field_path, annotation)
def emit(value, path, annotation):
if is_list(value):
element_type = get_list_element_type(annotation)
for (i, item) in enumerate(value):
yield from emit(item, f"{path}.{i}", element_type)
elif is_pydantic_model(value):
yield from traverse(value, path)
else:
value_type = infer_field_type(annotation)
scalar = unwrap_enum(value)
yield ExtractedField(path, str(scalar), value_type)
The traversal logic distinguishes between three categories:
- Lists: Elements are iterated with indexed paths (
product_block.0,product_block.1, etc.), preserving order within the hierarchy. - Nested Pydantic models: Nested models are traversed recursively with the extended path.
- Scalar values (strings, numbers, dates, UUIDs): Each value is emitted as an ExtractedField containing the path, a string representation of the value, and its inferred type.
Type inference is performed using FieldType.from_type_hint(annotation), which maps Python type annotations (str, int, datetime, UUID) to corresponding database-level field types (STRING, INTEGER, DATETIME, UUID). This inferred type is stored in the value_type column and later determines which comparison operators are valid during query construction (for example, gte for numeric fields, or like for strings).
For example, given a subscription model such as:
class Product(BaseModel):
name: str
price: float
class Subscription(SubscriptionModel):
customer_id: str
product: Product
start_date: datetime
subscription = Subscription(
customer_id="Company",
product=Product(name="Basic Plan", price=99.00),
start_date=datetime(2024, 1, 1)
)
an instance of this subscription is decomposed into the following extracted fields:
ExtractedField("subscription.customer_id", "Company", STRING)
ExtractedField("subscription.product.name", "Basic Plan", STRING)
ExtractedField("subscription.product.price", "99.00", FLOAT)
ExtractedField("subscription.start_date", "2024-01-01", DATETIME)
Each entity type (Subscription, Product, Workflow, Process) provides a dedicated traverser that loads the appropriate Pydantic model from the underlying database entity prior to traversal. The implementation accounts for the most practical edge cases, including sanitizing path components for ltree compatibility, enforcing a maximum recursion depth to prevent infinite traversal, and handling inaccessible or failing fields.
Incremental Indexing
Change detection with content hashing
During indexing, all previously stored hashes for a batch of entities are fetched in a single query. Each currently traversed field is then compared against its stored hash:
current_hash = _compute_content_hash(
field.path,
field.value,
field.value_type,
entity_title
)
if field.path not in existing_hashes or existing_hashes[field.path] != current_hash:
fields_to_upsert.append(field) # Changed or new
else:
skip_count += 1 # Identical, skip
The hash is computed from the field’s path, value, inferred type, and entity title. If the hash matches, the field is skipped entirely and thus no embedding generation and no index update.
Handling schema evolution and removed fields
After traversal, any paths present in the index but missing from the current entity structure are treated as stale and removed in batched delete operations.
This ensures that the index accurately reflects the current domain model when fields are renamed, removed, or when nested product blocks are detached, without requiring a full re-index run.
Token-aware batching with dual buffers
After filtering out unchanged fields, embeddings must be generated for the remaining fields. Naive batching strategies based on item counts are insufficient, as embedding models enforce token limits rather than record limits. For example, a batch containing many short strings may fit within the context window, while a smaller number of long text fields may exceed it.
A dual-buffer approach is used, separating fields by type and tracking token consumption in real-time:
The system maintains two parallel buffers:
Embeddable buffer (STRING fields): Accumulates text fields while tracking a running token count. Each field is tokenized using the embedding model's tokenizer, and tokens are summed against a budget (model max context - safety margin). When adding the next field would exceed the budget, the buffer flushes.
Non-embeddable buffer (INTEGER, FLOAT, DATETIME, UUID, BOOLEAN): These fields don't need embeddings, so they don't contribute to token counting and are stored with
embedding=NULL.
Batches yield under three conditions:
- Token budget exceeded: Adding the next STRING field would exceed the token budget
- Batch size limit reached: The embeddable buffer reaches the maximum batch size (if configured)
- Chunk boundary: All entities in the current chunk have been processed
On flush, the embeddable buffer is sent as a single batch embedding request, then merged with the non-embeddable records into one combined UPSERT statement. This approach minimizes API calls while respecting token and memory limits.
Retrieval
Retrieval Strategies and Ranking
The retrieval layer applies ranking strategies to candidate entities produced by filtering. It is implemented using the Strategy pattern, with a shared BaseRetriever abstraction and multiple concrete implementations. Each retriever generates a ranked SQL query using PostgreSQL-native operators, allowing semantic, fuzzy, structured, or hybrid retrieval to be selected dynamically at query planning time.
Structured Retriever
The structured retriever is used for queries that specify only structured filters and no search term. All matching entities are assigned a constant score and ordered by entity id to provide stable, deterministic results.
This retriever acts as the fallback strategy when no semantic or fuzzy search criteria are present, for example: "show all subscriptions where status=active and created_date > 2024-01-01".
Semantic Retriever
The semantic retriever ranks entities based on vector similarity between indexed field embeddings and the query embedding. It uses pgvector's L2 distance operator (<->) to compute distances at the field level. For each entity, the minimum distance across all of its fields determines its relevance score. This distance is then converted into a similarity score normalizing as ensuring that smaller distances correspond to higher scores and that scores are bounded and comparable across queries.
Fuzzy Retriever
The fuzzy retriever ranks entities using trigram-based text similarity provided by PostgreSQL’s pg_trgm extension. The word_similarity() function computes how closely the search term matches field values, while the <% operator filters candidates. For each entity, the maximum similarity score across all matching fields is used as the entity’s ranking score.
Hybrid retriever
The hybrid retriever combines semantic and fuzzy retrieval using Reciprocal Rank Fusion (RRF):
- Ranks entities separately by semantic and fuzzy similarity
- Computes RRF score: where
- Detects "perfect matches" (fuzzy score ≥ 0.9) and applies a dynamic boost that exceeds the maximum possible RRF score
- Normalizes the final score to [0, 1]
The perfect match boost ensures that exact or near-exact text matches always rank above semantic-only matches, even when semantic similarity is higher. This handles cases where users search for specific identifiers (UUIDs, customer IDs, product names) that should return exact results first.
Entity-Level Aggregation and Field-Level Matching
All retrieval strategies operate at the path-value level. Matching and scoring are performed on individual indexed fields, after which results are aggregated to the entity level and returned with highlight information indicating which field produced the match. This parent–child retrieval pattern enables search across deeply nested attributes while presenting clean, entity-level results.
Retriever Routing Strategy
The system automatically selects the appropriate retriever based on the available search inputs. Routing is implemented in the Retriever.route() class method, which inspects the query plan and selects the optimal ranking strategy:
def route(query, cursor, query_embedding):
fuzzy_term = query.fuzzy_term
# Hybrid: both embedding and fuzzy term available
if query_embedding is not None and fuzzy_term is not None:
return RrfHybridRetriever(query_embedding, fuzzy_term, cursor)
# Semantic: only embedding available
if query_embedding is not None:
return SemanticRetriever(query_embedding, cursor)
# Fuzzy: only text term available
if fuzzy_term is not None:
return FuzzyRetriever(fuzzy_term, cursor)
# Structured: only filters, no search term
return StructuredRetriever(cursor)
Decision criteria:
- RrfHybridRetriever: Both query embedding and fuzzy term available → combine semantic + fuzzy with RRF
- SemanticRetriever: Only embedding available (fuzzy term is None or empty) → pure vector similarity search
- FuzzyRetriever: Only text term available (embedding generation failed or disabled) → pure trigram matching
- StructuredRetriever: No search term at all → filter-only query with dummy score
Fallback behavior: If the query specifies a vector_query but embedding generation fails (API error, timeout, etc.), the system falls back to fuzzy search using the full query_text as the fuzzy term. This ensures queries never fail completely due to embedding issues.
The routing happens once per query during query execution planning. All retrievers share the same interface (apply(candidate_query)) and operate on the same candidate set produced by filters, making the strategies fully interchangeable.
Keyset Pagination
Traditional offset-based pagination (LIMIT 10 OFFSET 20) does not scale well for large offsets, as PostgreSQL must scan and discard all skipped rows. To avoid this, the system uses keyset pagination, where each page is defined by a cursor containing (score, entity_id, query_id):
class PageCursor(BaseModel):
score: float # Last item's score on previous page
id: str # Last item's entity_id on previous page
query_id: UUID # Reference to persisted query state
Pagination logic:
if cursor is not None:
stmt = stmt.where(
or_(
score < cursor.score, # Next page: lower scores
and_(score == cursor.score, entity_id > cursor.id) # Tie-breaking
)
)
Query state persistence ensures stable pagination across requests. On the first page request, the system persists the query plan (filters, search term, embedding) and assigns a unique query_id. This identifier is embedded in all subsequent page cursors, ensuring later pages use the same query plan and embedding as the initial request. This prevents pagination instability that would otherwise arise from new entities being indexed or from regenerating embeddings, both of which can change scores and invalidate cursor ordering.
For convenient use in URLs, the cursor can be encoded as a single base64 string:
cursor = PageCursor(score=0.87, id="uuid-123", query_id=uuid-456)
encoded = cursor.encode()
Retrievers implement pagination by adding the WHERE clause shown above before ordering by score DESC, entity_id ASC. This logic is shared across all retriever strategies (semantic, fuzzy, hybrid, structured).
Query Domain-Specific Language
Querying on an EAV model
In traditional relational databases, queries are written against a fixed set of columns known at compile time:
SELECT *
FROM subscriptions
WHERE status = 'active'
AND start_date > '2024-01-01'
With an EAV schema, all values are stored in generic path and value columns. The set of indexed attributes is discovered at runtime: valid paths are not known until the index is queried. This introduces several challenges:
- Unsafe query construction: String interpolation such as
WHERE path = '{user_input}'opens SQL injection vulnerabilities - Loss of operator–type coupling: Because all values are stored in a single column, the database no longer enforces which operators are valid for a given field. Comparisons such as
>on TEXT values execute but use lexicographic ordering; numeric or date comparisons require casts that can raise runtime errors without type guards. - No static validation: Paths are data, not schema. Misspelled or non-existent paths (e.g. subscription.statuss) cannot be detected at parse time and typically fail silently by producing empty result sets unless explicitly guarded against.
- Complex nesting: Building nested AND/OR groups by hand quickly becomes error-prone over an EAV index.
Filter Predicate DSL
To address these constraints, we define a small domain-specific language (DSL) for expressing filter predicates over the EAV index. Rather than generating SQL strings directly, filter predicates are constructed as validated objects with explicit structure and semantics. This makes invalid query shapes unrepresentable and ensures that only valid predicates reach the database.
The predicate language is built around two core models.
PathFilter represents an atomic predicate over a single indexed attribute:
class PathFilter(BaseModel):
path: str # ltree path, e.g., "subscription.status"
condition: FilterCondition # Union of filter types (Date, Numeric, String, Ltree, Equality)
value_kind: UIType # STRING, NUMBER, DATETIME, BOOLEAN, etc.
def to_expression(self, value_column, value_type_column) -> SQLExpression:
# Type guard: only match compatible field types
allowed_types = [
ft for ft in FieldType
if UIType.from_field_type(ft) == self.value_kind
]
type_guard = value_type_column.in_(allowed_types)
return and_(
type_guard,
self.condition.to_expression(value_column, self.path)
)
The value_kind field enforces operator–type compatibility at the predicate level.
FilterTree represents a recursive Boolean expression tree. Internal nodes group child predicates under an explicit AND/OR operator, while leaf nodes are PathFilter instances. This structure mirrors the resulting SQL: each leaf compiles to a correlated EXISTS subquery, while the tree determines how those subqueries are combined and parenthesized.
class FilterTree(BaseModel):
MAX_DEPTH: int = 5 # Prevent infinite nesting
op: BooleanOperator # AND | OR
children: list[FilterTree | PathFilter] # Recursive Boolean expression tree
Example filter tree:
FilterTree(
op="AND",
children=[
PathFilter(
path="subscription.status",
condition={"op": "eq", "value": "active"},
value_kind="string"
),
FilterTree(
op="OR",
children=[
PathFilter(
path="subscription.start_date",
condition={"op": "gt", "value": "2024-01-01"},
value_kind="datetime"
),
PathFilter(
path="subscription.customer_id",
condition={"op": "eq", "value": "acme-corp"},
value_kind="string"
),
]
)
]
)
The corresponding SQL reflects the structure of the filter tree:
WHERE EXISTS (
SELECT 1
FROM ai_search_index
WHERE entity_id = parent.entity_id
AND path = 'subscription.status'
AND value_type IN ('STRING')
AND value = 'active'
)
AND (
EXISTS (
SELECT 1
FROM ai_search_index
WHERE entity_id = parent.entity_id
AND path = 'subscription.start_date'
AND value_type IN ('DATETIME')
AND value > '2024-01-01'
)
OR EXISTS (
SELECT 1
FROM ai_search_index
WHERE entity_id = parent.entity_id
AND path = 'subscription.customer_id'
AND value_type IN ('STRING')
AND value = 'acme-corp'
)
)
Query DSL
Beyond filters, the query layer must support various query operations: ranked retrieval, bulk export, counting, and statistical aggregation. Each operation has distinct requirements in terms of inputs, limits, and permissible clauses. Rather than defining a single monolithic query object, the system uses mixin-based composition to construct narrowly scoped query types. This ensures that only valid combinations of capabilities (search, grouping, aggregation) can be expressed and validated.
BaseQuery defines the shared base model for all query types. It captures the parameters common across operations: the target entity type and an optional filter predicate tree. Operation-specific capabilities, such as ranking, grouping, or aggregation, are added incrementally via mixins.
class BaseQuery(BaseModel):
MIN_LIMIT: ClassVar[int] = 1
DEFAULT_LIMIT: ClassVar[int] = 10
MAX_LIMIT: ClassVar[int] = 30
DEFAULT_EXPORT_LIMIT: ClassVar[int] = 1000
MAX_EXPORT_LIMIT: ClassVar[int] = 10000
entity_type: EntityType # SUBSCRIPTION, PRODUCT, WORKFLOW, PROCESS
filters: FilterTree | None # Optional structured filters
Three mixins add specific capabilities:
SearchMixin: Adds an optional
query_textand derives two computed properties used by the retrieval router:vector_query:query_textunless it is empty or parses as a UUID (UUIDs are not vectorized)fuzzy_term:query_textonly for single-word queries (multi-word input disables trigram search)
GroupingMixin: Adds grouping controls for COUNT and AGGREGATE operations:
group_by,temporal_group_by,order_by, andcumulative. It also enforces cross-field constraints at validation time:order_byrequires at least one grouping dimensioncumulativerequires exactly one temporal grouping
AggregationMixin: Adds an explicit
aggregationslist (min length 1) defining which statistics to compute (SUM, AVG, MIN, MAX, COUNT).
These mixins are composed into a small set of concrete query types that define the supported operations. Each type admits only the fields relevant to its operation and rejects unsupported combinations at validation time:
class SelectQuery(BaseQuery, SearchMixin):
query_type: Literal["select"] = "select"
limit: int = Field(default=10, ge=1, le=30) # Strict limit for UI
class ExportQuery(BaseQuery, SearchMixin):
query_type: Literal["export"] = "export"
limit: int = Field(default=1000, ge=1, le=10000) # Higher limit for bulk
class CountQuery(BaseQuery, GroupingMixin):
query_type: Literal["count"] = "count" # No search mixin - counting only uses filters and grouping
class AggregateQuery(BaseQuery, GroupingMixin, AggregationMixin):
query_type: Literal["aggregate"] = "aggregate" # Full aggregation capabilities with grouping
The query_type field defines a discriminated union over the supported queries:
Query = Annotated[
Union[SelectQuery, ExportQuery, CountQuery, AggregateQuery],
Discriminator("query_type")
]
At parse time, Pydantic selects the corresponding model and enforces its field constraints. Invalid combinations such as aggregations on a select query are caught before execution.
Example SelectQuery:
{
"query_type": "select",
"entity_type": "SUBSCRIPTION",
"query_text": "acme",
"filters": {
"op": "AND",
"children": [
{
"path": "subscription.status",
"condition": { "op": "eq", "value": "active" },
"value_kind": "string"
}
]
},
"limit": 20
}
Example AggregateQuery:
{
"query_type": "aggregate",
"entity_type": "SUBSCRIPTION",
"group_by": ["subscription.product.name"],
"temporal_group_by": [{ "field": "subscription.start_date", "interval": "month" }],
"aggregations": [
{ "type": "count", "alias": "total_subscriptions" },
{ "type": "sum", "field": "subscription.product.price", "alias": "total_revenue" }
],
"cumulative": true,
"order_by": [{ "field": "start_date_month", "direction": "asc" }]
}
EAV Pivoting for Aggregations
Aggregations pose a distinct challenge in an EAV index: how do you group entities by attributes that are stored as rows rather than columns?
In a relational schema, aggregation operates directly on fields:
SELECT product_name, COUNT(*)
FROM subscriptions
GROUP BY product_name
In an EAV index, product_name is not a column, it's scattered across multiple rows in the index table:
| entity_id | path | value |
|---|---|---|
| uuid-1 | subscription.product.name | "Basic Plan" |
| uuid-2 | subscription.product.name | "Pro Plan" |
| uuid-1 | subscription.start_date | "2024-01-01" |
To enable grouping and aggregation, the query first pivots the EAV rows into an entity-aligned, column-oriented representation. This is implemented using a CTE that projects selected paths into columns via CASE WHEN expressions and groups by entity_id:
def \_build_pivot_cte(base_query, pivot_fields):
pivot_columns = [AiSearchIndex.entity_id.label("entity_id")]
for field_path in pivot_fields:
pivot_columns.append(
func.max(
case(
(AiSearchIndex.path == field_path, AiSearchIndex.value),
else_=None
)
).label(field_to_alias(field_path))
)
return (
select(*pivot_columns)
.where(AiSearchIndex.path.in_(pivot_fields))
.group_by(AiSearchIndex.entity_id)
.cte("pivoted_entities")
)
The resulting SQL is:
WITH pivoted_entities AS (
SELECT
entity_id,
MAX(CASE WHEN path = 'subscription.product.name' THEN value END) AS product_name,
MAX(CASE WHEN path = 'subscription.start_date' THEN value END) AS start_date
FROM ai_search_index
WHERE path IN ('subscription.product.name', 'subscription.start_date')
GROUP BY entity_id
)
SELECT product_name, COUNT(*) as total
FROM pivoted_entities
GROUP BY product_name
The pivot CTE converts field-level rows into an entity-aligned representation. For clarity, the example below shows only the subscription.product.name field.
Before (EAV rows):
| entity_id | path | value |
|---|---|---|
| uuid-1 | subscription.product.name | "Basic Plan" |
| uuid-2 | subscription.product.name | "Pro Plan" |
| uuid-3 | subscription.product.name | "Basic Plan" |
After (pivoted columns):
| entity_id | product_name |
|---|---|
| uuid-1 | "Basic Plan" |
| uuid-2 | "Pro Plan" |
| uuid-3 | "Basic Plan" |
Once pivoted, standard relational aggregation operators (GROUP BY, COUNT, SUM, …) can be applied directly to the resulting columns.
Temporal grouping extends this with date_trunc():
# For temporal_group_by: [{"field": "subscription.start_date", "interval": "month"}]
date_trunc('month', CAST(pivoted.start_date AS TIMESTAMP)) AS start_date_month
Cumulative aggregations are expressed using window functions over the grouped result set:
SUM(COUNT(*)) OVER (ORDER BY start_date_month) AS cumulative_total
This enables temporal and cumulative aggregations (e.g. monthly counts with running totals) to be evaluated entirely within PostgreSQL.
Query State Persistence
Every executed query is persisted as a database record containing:
class SearchQueryTable(BaseModel):
query_id: UUID # Unique identifier
parameters: JSONB # Serialized query object (SelectQuery, AggregateQuery, etc.)
query_embedding: Vector | None # Embedding used for semantic retrieval
run_id: UUID | None # Optional link to an agent conversation
query_number: int # Sequence number within a run
This persisted state serves three distinct purposes:
1. Pagination consistency
As covered earlier, the initial page request persists the query along with its embedding. Subsequent pages reference this state via the query_id embedded in the cursor, ensuring that pagination remains stable even if:
- New entities are indexed between page requests
- The user's original query text is lost
- Embedding generation is non-deterministic across requests
2. Export efficiency
Exports are executed from persisted query state rather than from a fresh request payload. The export handler reloads a given SelectQuery by query_id (and its embedding snapshot) from the database and derives an ExportQuery with export-appropriate defaults (e.g. higher limits), then re-executes against the index:
# Load SelectQuery from persisted query state
query_state = QueryState.load_from_id(query_id, SelectQuery)
# Convert to ExportQuery with export-appropriate limit
export_query = ExportQuery(
entity_type=query_state.query.entity_type,
filters=query_state.query.filters,
query_text=query_state.query.query_text,
)
export_records = await engine.execute_export(
export_query,
db.session,
query_state.query_embedding,
)
This avoids reconstructing queries or regenerating embeddings and ensures that exports are executed against the same validated query definition and embedding snapshot as the original search. This is particularly important when queries are constructed by an agent and may involve expensive reasoning or external tool calls.
3. Agent conversation tracking
When queries are constructed by an agent, each execution is persisted with a run_id that links it to the conversation. This makes query executions traceable and reusable outside the agent’s context.
In practice, persisted query state allows the system to answer questions such as:
- Which queries were executed as part of a given agent run
- What filter and query parameters were used at each step
- Which embedding snapshot was used for retrieval
(Agent) responses return query_id references, allowing users or downstream systems to re-run or export results later without requiring additional agent tool calls or repeated query construction.
Lifecycle:
- Query constructed (manually or by an agent)
- Embedding is generated (if query_text present)
- Query is executed and results are returned
- Query state is persisted with a
query_id - The
query_idis returned to the caller (e.g. in response or cursor) - Future requests reload the query state via
QueryState.load_from_id()
More generally, persisted query state can be reused to support higher-level features such as query reuse, search suggestions, or other tooling built on previously executed queries.
Connecting an Agent to the Query layer
The Schema Discovery Problem
In a relational database, the schema is static metadata known upfront. This is the model assumed by most database tooling, including agent-facing MCP servers built on top of relational schemas.
An ORM or schema introspection tool can query information_schema to discover tables and columns:
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'subscriptions'
This yields a fixed list of fields (e.g. status (varchar), start_date (timestamp)), which an agent can safely treat as stable schema metadata.
In an EAV index, the index exposes only generic columns (path, value, value_type), and the set of available paths and their types must be discovered by querying the indexed data itself.
In practice, the set of indexed paths can be very large and continuously evolving, making it impractical to materialize or expose the full path space upfront (e.g. to an agent context or client).
SELECT DISTINCT path, value_type
FROM ai_search_index
WHERE entity_type = 'SUBSCRIPTION'
ORDER BY path
This might return:
subscription.customer_id(STRING)subscription.product.name(STRING)subscription.product.price(FLOAT)subscription.start_date(DATETIME)
Because the schema is data-driven, large, and continuously evolving, schema discovery must occur at runtime and be scoped to the agent’s intent, rather than exposed as a global schema snapshot.
async def list_paths(prefix: str, q: str, entity_type: EntityType, limit: int) -> PathsResponse:
stmt = build_paths_query(entity_type, prefix, q)
rows = db.session.execute(stmt).fetchall()
# Process into leaves (terminal fields) and components (nested objects)
leaves, components = process_path_rows(rows)
return PathsResponse(leaves=leaves, components=components)
This endpoint:
- Uses ltree pattern matching (
subscription.product*) for prefix autocomplete - Uses pg_trgm similarity for fuzzy search ("prodct" finds "product")
- Returns leaves (terminal fields like
name,price) with their types and full paths - Returns components (nested objects like
product,customer) for hierarchical browsing
The agent calls this endpoint via the discover_filter_paths tool to incrementally discover valid fields before constructing queries, rather than relying on a static schema snapshot.
Dynamic Schema Discovery
Discovery is driven by the user’s intent: the agent only asks for paths relevant to the fields mentioned in the request.
How it works:
When a user request implies a filter on a specific field (e.g. “show subscriptions where status is active”), the agent performs schema discovery before constructing a filter:
Extract the field name from the request (e.g.
"status")Call
discover_filter_pathswith the extracted name and entity type:
result = await discover_filter_paths(
["status"],
entity_type=SUBSCRIPTION
)
- Receive a discovery result describing matching paths and their types:
{
"status": {
"status": "OK",
"guidance": "Found 1 field(s) for 'status'",
"leaves": [
{
"name": "status",
"value_kind": ["string"],
"paths": ["subscription.status"]
}
],
"components": []
}
}
- Builds a
PathFilterwith the validated path and inferred type:
PathFilter(
path="subscription.status",
condition={"op": "eq", "value": "active"},
value_kind="string"
)
Handling ambiguity:
A single field name may correspond to multiple indexed paths. For example, a request referring to “name” may match:
subscription.customer.name(STRING)subscription.product.name(STRING)subscription.product_block.name(STRING)
In such cases, the discovery result returns all matching paths:
{
"name": {
"status": "OK",
"leaves": [
{
"name": "name",
"value_kind": ["string"],
"paths": [
"subscription.customer.name",
"subscription.product.name",
"subscription.product_block.name"
]
}
]
}
}
The agent must then resolve the ambiguity before constructing a filter. Depending on the request, it may:
- select a single path based on contextual cues in the user’s query, or
- construct a
FilterTreewith anORgroup spanning all candidate paths.
If the intent remains unclear, the agent can request clarification.
Handling missing fields:
If a requested field does not exist in the indexed data, the discovery result explicitly indicates failure:
{
"invalid_field": {
"status": "NOT_FOUND",
"guidance": "No filterable paths found containing 'invalid_field'. Do not create a filter for this.",
"leaves": [],
"components": []
}
}
In this case, no filter is constructed. Instead, the agent surfaces the guidance to the user, indicating that the requested field is not available for the selected entity type. This discovery-first approach ensures the agent only constructs queries using fields that actually exist, preventing validation errors and providing helpful feedback when users reference non-existent fields.
Agent Architecture
Tool Schema Integration
The agent is built on PydanticAI, which integrates natively with Pydantic models. When tools are defined using Pydantic-typed parameters, PydanticAI automatically introspects those models and exposes their JSON schemas to the LLM for structured tool calling.
Automatic model discovery from tool signatures:
@search_toolset.tool
async def set_filter_tree(
ctx: RunContext[StateDeps[SearchState]],
filters: FilterTree | None # ← Pydantic model
) -> StateSnapshotEvent:
...
From this signature, PydanticAI:
- Infers the full structure of the
FilterTreemodel from type hints - Derives a JSON schema, including all nested models and discriminated unions
- Exposes this schema to the LLM as the tool’s input contract
- Validates tool calls against the same Pydantic models at runtime
As a result, the agent learns the structure of FilterTree, PathFilter, and related types directly from the Python models, without manually defined schemas or prompt-level instructions.
The query models introduced in the Query DSL chapter (SelectQuery, AggregateQuery, FilterTree) therefore serve both as the internal query DSL and as the tool-calling interface for the LLM.
State management across tool calls:
Agent execution is stateful. Rather than constructing a full query in a single step, the agent incrementally builds and executes queries across multiple tool calls using a shared SearchState object:
When used with the ag-ui integration (as in the orchestrator-core package), this stateful execution model also improves observability: each intermediate state update is emitted as a structured snapshot, allowing the UI to display progress, surface partial results, and pinpoint where query construction or validation failed.
Stateful Execution and Self-Correction
class SearchState(BaseModel):
run_id: UUID | None # Conversation identifier
query_id: UUID | None # Last executed query reference
action: ActionType | None # SELECT, COUNT, or AGGREGATE
query: Query | None # Current query being built
results_count: int | None # Results from last execution
State persists across tool calls, allowing the agent to incrementally construct and execute queries rather than assembling a full query in a single step:
# Tool call 1: Start search
start_new_search(entity_type=SUBSCRIPTION, action=SELECT)
# > Creates SelectQuery in state
# Tool call 2: Add filters
set_filter_tree(filters=FilterTree(...))
# > Updates state.query.filters
# Tool call 3: Execute
run_search()
# > Executes query from state and persists with query_id
When a tool invocation fails validation, the agent does not terminate. Instead, validation errors are returned to the model via ModelRetry, allowing it to revise the query, invoke tools if needed, and retry with corrected input.
This creates a self-correcting loop in which invalid assumptions (non-existent paths, incompatible operators, missing grouping fields) are resolved dynamically at runtime.

The Workflow: Discover, Build and Execute
Query construction follows a consistent three-phase workflow that reflects the constraints of a schema-agnostic EAV index.
Discover The agent first discovers which fields actually exist in the indexed data, along with their types.
Build Using the discovered paths, the agent constructs a typed query object (filters, grouping, aggregations) that is structurally and semantically valid. Invalid paths, incompatible operators, or incomplete specifications are rejected before execution.
Execute The validated query is executed against the index, persisted as query state, and returns a stable
query_idthat can be paginated, exported, or reused.
Discussion
Limitations
Index Growth with Deep Nesting
Field-level indexing introduces scalability challenges for deeply nested domain models. Because each indexed field becomes a separate row, entities with complex or highly compositional structures can generate hundreds of index rows per entity.
At scale, this amplifies quickly. For tenants with thousands of subscriptions, even moderate nesting can result in millions of rows in the search index, impacting:
- Indexing time: More database writes per entity change
- Storage: Larger tables and secondary indexes
- Query performance: Increased scan and pivot cost during filtering and aggregation
Existing mitigations reduce but do not eliminate this overhead:
- Maximum traversal depth to prevent unbounded recursion
- Content hashing to avoid re-indexing unchanged fields
- Automatic removal of stale paths when fields are deleted
Aggregation Performance Overhead
Aggregations over EAV-indexed data are inherently more expensive than equivalent queries on traditional columnar schemas. Grouping requires pivoting field-level rows into an entity-aligned representation before applying GROUP BY logic.
As a result, aggregation cost increases with both schema complexity and result set size. Overhead becomes particularly noticeable with:
- Multiple grouping dimensions
- Large candidate sets produced by search or filtering
- Temporal groupings that apply
date_truncover pivoted values
While this approach enables flexible, schema-agnostic aggregations, it trades execution efficiency for generality. Aggregation queries with stable, frequently executed query plans can potentially benefit from PostgreSQL optimizations such as materialized views or pre-aggregated tables, reducing repeated EAV pivoting at query time.
Agent Latency
Agent-driven queries incur higher latency than direct query execution because query construction is performed incrementally through multiple tool calls (schema discovery, validation, query assembly, execution), each involving LLM inference and database interaction.
As a result, the agent is not suited for low-latency or high-throughput workloads. The agent is instead optimized for exploratory, ad-hoc analysis where flexibility and safety are more important than raw response time. For latency-sensitive operations, the system exposes direct REST endpoints that accept pre-constructed SelectQuery objects and bypass the agent entirely, while reusing the same query DSL and execution engine.
Future Work
Graph-Based Agent Workflows
The current agent relies on free-form tool calling, where the LLM decides which tools to invoke based on the conversation context and current state. This provides flexibility, but introduces unnecessary latency and non-determinism for common query patterns.
At the same time, the agent does not operate over an unbounded action space. Query construction is constrained by a finite, typed Query DSL (SelectQuery, AggregateQuery, FilterTree, grouping and aggregation mixins etc.), and the set of valid state transitions is explicitly defined by these models. The LLM cannot invent new query shapes or execution paths outside this DSL.
A graph-based workflow engine (e.g. pydantic_ai[graph]) could encode these as state machines, reducing typical interactions from multiple LLM tool calls to one or two. The LLM would only make decisions at genuine branching points (e.g. “search vs. aggregation”), with deterministic transitions handling the remainder of query construction and execution.
Multi-Agent Orchestration
The current implementation uses a single search-focused agent. Future work could explore multi-agent orchestration, where specialized agents collaborate on different aspects of a user request.
For example:
- A search agent: responsible for query construction and execution
- A database assistant agent: to explain schema structure or query behavior
- An analytics agent: to suggest aggregations or interpret results
These agents would communicate via a coordinator agent that routes user requests to the appropriate specialist and synthesizes their responses.
Agent-to-Agent Communication (A2A) and MCP Integration
To support extensibility beyond the core search use case, the system could integrate emerging standards for agent interoperability.
Model Context Protocol (MCP): could allow external tools and data sources to be registered dynamically, enabling domain-specific extensions without modifying the core search or query infrastructure.
Agent-to-Agent (A2A) communication: could allow the search agent to delegate subtasks to external agents and incorporate their responses, while remaining responsible for query construction and execution.
This would allow the search layer to integrate into broader agent workflows while remaining decoupled from specific external systems or tooling.
Technical Improvements
Beyond architectural evolution, several concrete optimizations could improve performance:
Selective field indexing: Allow administrators to configure which paths to index per entity type, reducing index growth for deeply nested or rarely queried structures.
Materialized aggregation views: Cache the results of stable, frequently executed aggregation query plans using PostgreSQL materialized views, avoiding repeated EAV pivoting at query time.
Query result caching: Cache schema discovery results and common query executions to reduce both database load and LLM tool calls.
Conclusion
This article presents a schema-agnostic search system that combines EAV indexing, PostgreSQL-native hybrid search and agent-driven query construction. The system addresses the challenge of searching over dynamically evolving domain models by indexing at field-level granularity with ltree paths, enabling runtime schema discovery and type-safe filtering without database migrations.
Key contributions include:
- PostgreSQL-native hybrid search system combining pgvector semantic similarity with pg_trgm fuzzy matching via Reciprocal Rank Fusion
- A Pydantic-based query DSL that compiles to type-safe SQL with multi-layer validation
- An agent architecture with PydanticAI that discovers schema at runtime and self-corrects validation errors through structured tool calling.
The implementation shows how PostgreSQL’s native capabilities (ltree, pgvector, pg_trgm, window functions) can be composed to support a wide variety of query patterns over both structured and unstructured data, while avoiding the complexity of introducing additional search or indexing dependencies.
Code Availability: The presented system is implemented in the orchestrator-core package. The search module is available at orchestrator/search under the Apache 2.0 license.