From 28e564aeb2eeb683d3164941b5be8975eedf1ca6 Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Sat, 11 Apr 2026 19:28:03 -0400 Subject: [PATCH 1/4] save_agent_log: write flat context array matching AST order MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The old code wrote a JSON object with named section keys, which serde_json serialized in alphabetical order — putting conversation before system, making logs misleading. Write a single flat array in section order instead, matching what the model actually sees. Co-Authored-By: Proof of Concept --- src/agent/context.rs | 2 +- src/mind/unconscious.rs | 13 +++++-------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/agent/context.rs b/src/agent/context.rs index 2b8bf34..2e54391 100644 --- a/src/agent/context.rs +++ b/src/agent/context.rs @@ -764,7 +764,7 @@ impl ContextState { pub fn conversation(&self) -> &[AstNode] { &self.conversation } pub fn conversation_mut(&mut self) -> &mut Vec { &mut self.conversation } - fn sections(&self) -> [&Vec; 4] { + pub fn sections(&self) -> [&Vec; 4] { [&self.system, &self.identity, &self.journal, &self.conversation] } } diff --git a/src/mind/unconscious.rs b/src/mind/unconscious.rs index a137fe0..0f87b17 100644 --- a/src/mind/unconscious.rs +++ b/src/mind/unconscious.rs @@ -309,14 +309,11 @@ pub async fn save_agent_log(name: &str, agent: &std::sync::Arc = Vec::new(); + for section in ctx.sections() { + context.extend(section); + } + if let Ok(json) = serde_json::to_string_pretty(&context) { let _ = std::fs::write(&path, json); } } From 1c0967c4ec1d3df8914afe257b070ce521ab4b83 Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Sat, 11 Apr 2026 19:43:24 -0400 Subject: [PATCH 2/4] Agent::new: tool definitions from caller's tool list MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The system prompt was advertising all tools to every agent, but the runtime only dispatched the agent's actual subset. This caused unconscious agents to call tools that returned "Unknown tool." Agent::new now takes the tool list explicitly. Each caller passes its own tools — the prompt and runtime always match. MCP tool definitions are still appended for agents that use them. Co-Authored-By: Proof of Concept --- src/agent/mod.rs | 7 +++++-- src/agent/oneshot.rs | 1 + src/agent/tools/mod.rs | 4 ++++ src/mind/mod.rs | 1 + src/mind/unconscious.rs | 2 +- 5 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/agent/mod.rs b/src/agent/mod.rs index e371a53..349fe91 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -192,12 +192,15 @@ impl Agent { prompt_file: String, conversation_log: Option, active_tools: tools::ActiveTools, + agent_tools: Vec, ) -> Arc { let mut context = ContextState::new(); context.conversation_log = conversation_log; context.push_no_log(Section::System, AstNode::system_msg(&system_prompt)); - let tool_defs = tools::all_tool_definitions().await; + let mut tool_defs: Vec = agent_tools.iter().map(|t| t.to_json()).collect(); + tool_defs.extend(tools::all_mcp_tool_definitions().await); + if !tool_defs.is_empty() { let tools_text = format!( "# Tools\n\nYou have access to the following functions:\n\n\n{}\n\n\n\ @@ -223,7 +226,7 @@ impl Agent { session_id, context: tokio::sync::Mutex::new(context), state: tokio::sync::Mutex::new(AgentState { - tools: tools::tools(), + tools: agent_tools, mcp_tools: McpToolAccess::All, last_prompt_tokens: 0, reasoning_effort: "none".to_string(), diff --git a/src/agent/oneshot.rs b/src/agent/oneshot.rs index cc590bb..9a2fed9 100644 --- a/src/agent/oneshot.rs +++ b/src/agent/oneshot.rs @@ -138,6 +138,7 @@ impl AutoAgent { app, String::new(), None, super::tools::ActiveTools::new(), + super::tools::tools(), ).await; { let mut st = agent.state.lock().await; diff --git a/src/agent/tools/mod.rs b/src/agent/tools/mod.rs index ce42c9e..b873a11 100644 --- a/src/agent/tools/mod.rs +++ b/src/agent/tools/mod.rs @@ -195,6 +195,10 @@ pub async fn all_tool_definitions() -> Vec { defs } +pub async fn all_mcp_tool_definitions() -> Vec { + mcp_client::tool_definitions_json().await +} + /// Memory + journal tools only — for subconscious agents. pub fn memory_and_journal_tools() -> Vec { let mut all = memory::memory_tools().to_vec(); diff --git a/src/mind/mod.rs b/src/mind/mod.rs index 010829f..38e8fdf 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -298,6 +298,7 @@ impl Mind { config.prompt_file.clone(), conversation_log, crate::agent::tools::ActiveTools::new(), + crate::agent::tools::tools(), ).await; let shared = Arc::new(std::sync::Mutex::new(MindState::new(config.app.dmn.max_turns))); diff --git a/src/mind/unconscious.rs b/src/mind/unconscious.rs index 0f87b17..8b4af40 100644 --- a/src/mind/unconscious.rs +++ b/src/mind/unconscious.rs @@ -282,11 +282,11 @@ impl Unconscious { client, system_prompt, personality, app, String::new(), None, crate::agent::tools::ActiveTools::new(), + auto.tools.clone(), ).await; { let mut st = agent.state.lock().await; st.provenance = format!("unconscious:{}", auto.name); - st.tools = auto.tools.clone(); st.priority = Some(10); } From bc991c3521882c4edf7d9d919e4c3b726501670c Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Sat, 11 Apr 2026 19:54:18 -0400 Subject: [PATCH 3/4] unconscious: memory tools as base, agent def adds extras MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Every unconscious agent gets memory_tools() as baseline. The tools field in the agent def specifies additional tools on top of that — digest agent now gets journal_tail, journal_new, journal_update. Co-Authored-By: Proof of Concept --- src/mind/unconscious.rs | 17 ++++++++--------- src/subconscious/agents/digest.agent | 2 +- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/mind/unconscious.rs b/src/mind/unconscious.rs index 8b4af40..5678b19 100644 --- a/src/mind/unconscious.rs +++ b/src/mind/unconscious.rs @@ -77,20 +77,19 @@ impl Unconscious { // Scan all .agent files, exclude subconscious-* and surface-observe let mut agents: Vec = Vec::new(); - let all_tools = tools::memory::memory_tools().to_vec(); + let base_tools = tools::memory::memory_tools().to_vec(); + let extra_tools = tools::memory::journal_tools().to_vec(); for def in defs::load_defs() { if def.agent.starts_with("subconscious-") { continue; } if def.agent == "surface-observe" { continue; } let enabled = enabled_map.get(&def.agent).copied() .unwrap_or(false); - let effective_tools: Vec = if def.tools.is_empty() { - all_tools.clone() - } else { - all_tools.iter() - .filter(|t| def.tools.iter().any(|w| w == t.name)) - .cloned() - .collect() - }; + let mut effective_tools = base_tools.clone(); + for name in &def.tools { + if let Some(t) = extra_tools.iter().find(|t| t.name == name) { + effective_tools.push(t.clone()); + } + } let steps: Vec = def.steps.iter().map(|s| AutoStep { prompt: s.prompt.clone(), phase: s.phase.clone(), diff --git a/src/subconscious/agents/digest.agent b/src/subconscious/agents/digest.agent index a8e47d3..c342d25 100644 --- a/src/subconscious/agents/digest.agent +++ b/src/subconscious/agents/digest.agent @@ -1,4 +1,4 @@ -{"agent": "digest", "schedule": "daily"} +{"agent": "digest", "schedule": "daily", "tools": ["journal_tail", "journal_new", "journal_update"]} # Digest Agent — Episodic Consolidation From aad227e48740531244b87a5223eacb3b88ea8c9f Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Sat, 11 Apr 2026 20:42:58 -0400 Subject: [PATCH 4/4] query: unify PEG and engine parsers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PEG parser now handles both expression syntax (degree > 5 | sort degree) and pipeline syntax (all | type:episodic | sort:timestamp). Deleted Stage::parse() and helpers from engine.rs — it's now pure execution. All callers use parse_stages() from parser.rs as the single entry point. Co-Authored-By: Proof of Concept --- research/query-language-unification.md | 94 ++++++++++ research/sparse-kernel-napkin-math.md | 198 ++++++++++++++++++++ src/agent/tools/memory.rs | 55 +++++- src/cli/agent.rs | 2 +- src/cli/misc.rs | 17 +- src/hippocampus/query/engine.rs | 241 +++++-------------------- src/hippocampus/query/parser.rs | 206 ++++++++++++++++----- src/subconscious/defs.rs | 2 +- 8 files changed, 562 insertions(+), 253 deletions(-) create mode 100644 research/query-language-unification.md create mode 100644 research/sparse-kernel-napkin-math.md diff --git a/research/query-language-unification.md b/research/query-language-unification.md new file mode 100644 index 0000000..57ceb98 --- /dev/null +++ b/research/query-language-unification.md @@ -0,0 +1,94 @@ +# Query Language Unification Plan + +**Status: DONE** (2026-04-11) + +## Problem (was) + +Two query parsers that didn't agree on syntax: + +1. **PEG parser** (`hippocampus/query/parser.rs`) — boolean logic, general + comparisons, operator precedence, parentheses. Used by CLI and compact + format path in `query()` tool. + +2. **Pipeline parser** (`hippocampus/query/engine.rs`) — domain-specific + filters (type, age, provenance), graph algorithms (spread, spectral). + Used by full format path in `query()` tool. + +`journal_tail` generates pipeline syntax but gets routed through the PEG +parser on the compact path. Result: parse errors. + +## Approach + +Keep the PEG parser (has the harder-to-build structural foundation), +extend it with the pipeline parser's domain features. + +## Expression extensions (add to `expr` rule in parser.rs) + +- `field:value` shorthand for `field = 'value'` (colon-separated equality) +- `*` already works as `Expr::All` +- `key ~ 'glob'` already works via match operator + +## New stages (add to `stage` rule in parser.rs) + +Domain filter stages from engine.rs: +- `type:X` — filter by node type (episodic, daily, weekly, monthly, semantic) +- `age:<7d` — duration comparison on timestamp +- `key:GLOB` — glob match on key +- `provenance:X` — provenance filter +- `weight:>N` — weight comparison (may already work via general comparison) +- `content-len:>N` — content size filter + +Sort/limit syntax variants: +- `sort:field` alongside existing `sort field` +- `limit:N` alongside existing `limit N` + +Graph algorithms: +- `spread` — spreading activation +- `spectral` — spectral nearest neighbors +- `confluence` — multi-source reachability +- `geodesic` — straightest spectral paths +- `manifold` — extrapolation along seed direction + +## What changes + +1. `parser.rs` — add field:value shorthand to expr, add domain stages +2. `engine.rs` — keep run_pipeline execution logic, have PEG parser emit + compatible Stage types (or convert PEG AST to Stage at boundary) +3. `query()` tool handler (memory.rs) — one parser path for all formats +4. `journal_tail` (memory.rs) — generate unified syntax +5. CLI `poc-memory query` — uses unified parser + +## Migration path + +1. Add field:value shorthand and type/age/key stages to PEG parser +2. Route query() through PEG parser for all formats +3. Migrate journal_tail and any other pipeline-syntax callers +4. Remove the pipeline parser (or keep as internal execution layer) + +## What was done + +**Deleted from engine.rs (-153 lines):** +- `Stage::parse()` and `Stage::parse_pipeline()` — redundant with PEG +- `parse_cmp()`, `parse_duration_or_number()`, `parse_composite_sort()`, + `parse_node_type()`, `parse_sort_field()` — helper functions for deleted parser + +**Added to parser.rs (+120 lines):** +- Pipeline syntax in PEG grammar (`type:X`, `age:` +- Grammar helper functions + +**Net: +17 lines** + +**Architecture now:** +- parser.rs: PEG grammar handles ALL parsing (both syntaxes) +- engine.rs: Pure execution — types and `run_query()`, no parsing + +Result: `all | type:episodic | sort:timestamp | limit:5` works everywhere. +Mixed syntax like `degree > 5 | type:semantic | sort degree` also works. + +## What NOT to change (original note) + +The run_pipeline execution logic stays — it's correct and well-tested. +Only the parsing front-end unifies. The pipeline parser's Stage enum +becomes the internal representation that both the PEG parser and any +remaining direct callers produce. diff --git a/research/sparse-kernel-napkin-math.md b/research/sparse-kernel-napkin-math.md new file mode 100644 index 0000000..b732b55 --- /dev/null +++ b/research/sparse-kernel-napkin-math.md @@ -0,0 +1,198 @@ +# Sparse Kernel Compilation: Napkin Math for Qwen3.5-27B + +## Architecture recap + +| Parameter | Value | +|-----------|-------| +| Layers | 64 (48 linear attention, 16 full attention) | +| Hidden dim (H) | 5,120 | +| Query heads | 24 | +| KV heads | 4 (GQA) | +| Head dim | 256 | +| FFN intermediate | 17,408 | +| Full attention interval | every 4th layer | +| Total params | ~27.5B | + +**Key discovery**: Qwen3.5 already uses sparse attention — 48/64 layers use +linear attention (O(N)), only 16 use full O(N^2) attention. The attention +sparsity question is partially answered by the architecture itself. The +remaining opportunity is **weight sparsity** in the projection and FFN matrices. + +## Measured weight sparsity (from B200, all 11 shards) + +| Component | Count | Total params | <0.001 | <0.005 | <0.01 | <0.02 | +|-----------|-------|-------------|--------|--------|-------|-------| +| down_proj | 65 | 5,793,382,400 | 7.4% | 35.8% | 64.3% | 92.8% | +| gate_proj | 65 | 5,793,382,400 | 7.2% | 35.0% | 63.3% | 92.3% | +| up_proj | 65 | 5,793,382,400 | 7.3% | 35.2% | 63.5% | 92.5% | +| q_proj | 17 | 1,069,547,520 | 5.7% | 27.9% | 51.9% | 82.8% | +| k_proj | 17 | 89,128,960 | 6.0% | 29.4% | 54.1% | 84.1% | +| v_proj | 17 | 89,128,960 | 4.8% | 23.7% | 44.7% | 74.2% | +| o_proj | 17 | 534,773,760 | 5.3% | 25.9% | 48.7% | 79.6% | +| other | 605 | 6,070,652,272 | 5.4% | 26.4% | 49.6% | 80.9% | + +**Key findings**: +- FFN layers (gate/up/down) are remarkably sparse: ~64% of weights below 0.01 +- At threshold 0.02, FFN sparsity exceeds 92% +- Attention projections are less sparse but still significant: 45-52% below 0.01 +- v_proj is the least sparse component (44.7% below 0.01) + +## Per-layer parameter breakdown + +- **QKV projection**: H × (Q_dim + K_dim + V_dim) ≈ 5120 × 13312 ≈ 68M +- **Output projection**: ~26M +- **FFN (gate + up + down)**: 5120 × 17408 × 3 ≈ 267M +- **Total per layer**: ~361M +- **64 layers**: ~23.1B (rest is embeddings, norms, etc.) + +## Dense baseline: FLOPs per token + +Each weight parameter contributes 2 FLOPs per token (multiply + accumulate). + +- Per layer: ~722M FLOPs/token +- 64 layers: **~46.2B FLOPs/token** + +On a B200 (theoretical ~4.5 PFLOPS FP8, ~1.1 PFLOPS BF16): +- BF16 throughput: 46.2B / 1.1e15 ≈ 0.042ms per token (compute-bound limit) +- But inference is usually **memory-bandwidth-bound** for small batch sizes + +B200 HBM bandwidth: ~8 TB/s. At BF16 (2 bytes/param): +- Loading all weights once: 27.5B × 2 = 55GB → 55/8000 ≈ **6.9ms per token** (batch=1) +- This is the real bottleneck. Compute is cheap; loading weights is expensive. + +## What sparsity buys you + +At **X% sparsity** (X% of weights are zero), you need to load (1-X)% of the weights: + +| Sparsity | Params loaded | Time (batch=1) | Speedup | +|----------|--------------|-----------------|---------| +| 0% (dense) | 27.5B | 6.9ms | 1.0x | +| 50% | 13.8B | 3.4ms | 2.0x | +| 75% | 6.9B | 1.7ms | 4.0x | +| 90% | 2.8B | 0.7ms | 10x | + +**This is the key insight**: inference at small batch sizes is memory-bandwidth-bound. +Sparse weights = fewer bytes to load = directly proportional speedup, +**IF** the sparse kernel can avoid the gather/scatter overhead. + +## The compilation problem + +Dense GEMM is fast because: +1. Weights are contiguous in memory → sequential reads +2. Tiling fits perfectly in SRAM → high reuse +3. Hardware tensor cores expect dense blocks + +Naive sparse matmul kills this: +- Irregular memory access → low bandwidth utilization +- Poor SRAM tiling → cache thrashing +- Tensor cores can't help + +### The FlashAttention analogy + +FlashAttention's insight: the N×N attention matrix doesn't fit in SRAM, +but you can tile it so each tile does. You recompute instead of materializing. + +**Sparse kernel compilation insight**: the sparsity pattern is **known at compile time**. +A compiler can: + +1. **Analyze the sparsity graph** of each weight matrix +2. **Find blocks** of non-zero weights that are close in memory +3. **Generate a tiling schedule** that loads these blocks into SRAM efficiently +4. **Emit fused kernels** where the memory access pattern is baked in as constants + +The resulting kernel looks like a dense kernel to the hardware — +sequential reads, high SRAM reuse, maybe even tensor core compatible +(if the compiler finds dense sub-blocks within the sparse matrix). + +## Block sparsity vs unstructured sparsity + +**Block sparse** (e.g., 4×4 or 16×16 blocks zeroed out): +- GPU-friendly: blocks map to tensor core operations +- Less flexible: coarser pruning granularity → less achievable sparsity +- NVIDIA's 2:4 structured sparsity gets ~50% sparsity with tensor core support +- Real-world: typically 50-70% sparsity achievable without quality loss + +**Unstructured sparse** (individual weights zeroed): +- Maximally flexible: fine-grained pruning → higher achievable sparsity +- GPU-hostile: the gather/scatter problem +- Real-world: 80-95% sparsity achievable in many layers without quality loss + +**The compiled kernel approach bridges this**: take unstructured sparsity +(maximally flexible, high compression) and compile it into a kernel that +runs as efficiently as block-sparse. Best of both worlds. + +## Recurrent depth composability + +From our April 10 discussion: middle transformer layers are doing +open-coded simulated annealing — similar weights, similar computation. + +If layers 8-24 have cosine similarity > 0.95: +- Replace 16 layers with 1 layer × 16 iterations +- **Parameter reduction**: 16 × 361M = 5.8B → 361M (16x reduction for those layers) +- **Memory bandwidth**: load one layer's weights, iterate in SRAM +- Combined with 50% sparsity on the remaining unique layers: + - Unique layers (48): 48 × 361M × 0.5 = 8.7B params + - Recurrent layer: 361M × 0.5 = 180M params (but iterated 16x in SRAM) + - Total loaded per token: ~8.9B × 2 bytes = 17.8GB + - Time: 17.8/8000 ≈ **2.2ms per token** (vs 6.9ms dense) — **3.1x speedup** + +With higher sparsity (75%) + recurrence: + - Unique layers: 48 × 361M × 0.25 = 4.3B + - Recurrent: 90M + - Total: ~4.4B × 2 = 8.8GB → **1.1ms per token** — **6.3x speedup** + +## What needs to happen + +### Phase 1: Measure (can do now with B200 access) +1. Extract all weight matrices from Qwen3.5-27B +2. For each matrix, compute: + - Magnitude distribution (what % of weights are near-zero?) + - Achievable sparsity at various thresholds (L1 magnitude pruning) + - Dense sub-block statistics (how many 4×4, 16×16 blocks are all-zero?) +3. Layer similarity: pairwise cosine similarity of weight matrices across layers + - Which layers are nearly identical? (recurrence candidates) +4. Validate quality: run perplexity eval at various sparsity levels + +### Phase 2: Compile (research project) +1. For a single sparse weight matrix, generate an optimized Triton kernel +2. Benchmark vs dense GEMM and vs NVIDIA's 2:4 sparse +3. Iterate on the tiling strategy + +### Phase 3: End-to-end +1. Full model with compiled sparse kernels +2. Perplexity + latency benchmarks +3. Compare: dense, 2:4 structured, compiled unstructured + +## Related work to read + +- **SparseGPT** (Frantar & Alistarh, 2023): one-shot pruning to 50-60% unstructured sparsity + with minimal quality loss. Key result: large models are more prunable than small ones. +- **Wanda** (Sun et al., 2023): pruning by weight magnitude × input activation. + Simpler than SparseGPT, comparable results. +- **NVIDIA 2:4 sparsity**: hardware-supported structured sparsity on Ampere+. + 50% sparsity, ~2x speedup on tensor cores. The existence proof that sparse can be fast. +- **Triton** (Tillet et al.): Python DSL for GPU kernel generation. + The right compilation target — can express arbitrary tiling strategies. +- **TACO** (Kjolstad et al.): tensor algebra compiler. Generates kernels for + specific sparse tensor formats. Academic but the ideas are right. +- **FlashAttention** (Dao et al.): the tiling strategy to learn from. +- **DejaVu** (Liu et al., 2023): contextual sparsity — predicting which neurons + to activate per input. Dynamic sparsity, complementary to weight sparsity. + +## The bigger picture + +Current state of the art: dense models with FlashAttention for the N×N attention part. +Weight sparsity is known to work (SparseGPT, Wanda) but isn't deployed because +the GPU kernels don't exist to exploit it efficiently. + +The gap: nobody has built a compiler that takes a specific sparse weight matrix +and emits a kernel optimized for that exact pattern. FlashAttention proved +that custom kernels for specific computational patterns beat general-purpose ones. +The same should hold for sparse weight patterns. + +**The bet**: a compiled sparse kernel for Qwen3.5-27B's actual sparsity pattern +would be within 80% of the theoretical bandwidth-bound speedup. If true, +50% sparsity → 1.6x real speedup, 75% → 3.2x, composing with recurrent depth +for potentially 5-6x total. + +That would make 27B inference as fast as a 5B dense model, with 27B quality. diff --git a/src/agent/tools/memory.rs b/src/agent/tools/memory.rs index 76b7934..f526d44 100644 --- a/src/agent/tools/memory.rs +++ b/src/agent/tools/memory.rs @@ -260,7 +260,7 @@ async fn query(args: &serde_json::Value) -> Result { let store = arc.lock().await; let graph = store.build_graph(); - let stages = crate::search::Stage::parse_pipeline(query_str) + let stages = crate::query_parser::parse_stages(query_str) .map_err(|e| anyhow::anyhow!("{}", e))?; let results = crate::search::run_query(&stages, vec![], &graph, &store, false, 100); let keys: Vec = results.into_iter().map(|(k, _)| k).collect(); @@ -272,12 +272,61 @@ async fn query(args: &serde_json::Value) -> Result { Ok(crate::subconscious::prompts::format_nodes_section(&store, &items, &graph)) } _ => { - crate::query_parser::query_to_string(&store, &graph, query_str) - .map_err(|e| anyhow::anyhow!("{}", e)) + // Compact output: check for count/select stages, else just list keys + use crate::search::{Stage, Transform}; + let has_count = stages.iter().any(|s| matches!(s, Stage::Transform(Transform::Count))); + if has_count { + return Ok(keys.len().to_string()); + } + if keys.is_empty() { + return Ok("no results".to_string()); + } + let select_fields: Option<&Vec> = stages.iter().find_map(|s| match s { + Stage::Transform(Transform::Select(f)) => Some(f), + _ => None, + }); + if let Some(fields) = select_fields { + let mut out = String::from("key\t"); + out.push_str(&fields.join("\t")); + out.push('\n'); + for key in &keys { + out.push_str(key); + for f in fields { + out.push('\t'); + out.push_str(&resolve_field_str(&store, &graph, key, f)); + } + out.push('\n'); + } + Ok(out) + } else { + Ok(keys.join("\n")) + } } } } +fn resolve_field_str(store: &crate::store::Store, graph: &crate::graph::Graph, key: &str, field: &str) -> String { + let node = match store.nodes.get(key) { + Some(n) => n, + None => return "-".to_string(), + }; + match field { + "key" => key.to_string(), + "weight" => format!("{:.3}", node.weight), + "node_type" => format!("{:?}", node.node_type), + "provenance" => node.provenance.clone(), + "emotion" => format!("{}", node.emotion), + "retrievals" => format!("{}", node.retrievals), + "uses" => format!("{}", node.uses), + "wrongs" => format!("{}", node.wrongs), + "created" => format!("{}", node.created_at), + "timestamp" => format!("{}", node.timestamp), + "degree" => format!("{}", graph.degree(key)), + "content_len" => format!("{}", node.content.len()), + _ => "-".to_string(), + } +} + // ── Journal tools ────────────────────────────────────────────── async fn journal_tail(args: &serde_json::Value) -> Result { diff --git a/src/cli/agent.rs b/src/cli/agent.rs index c6ae426..d2d05c8 100644 --- a/src/cli/agent.rs +++ b/src/cli/agent.rs @@ -25,7 +25,7 @@ pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option target.to_vec() } else if let Some(q) = query { let graph = store.build_graph(); - let stages = crate::search::Stage::parse_pipeline(q)?; + let stages = crate::query_parser::parse_stages(q)?; let results = crate::search::run_query(&stages, vec![], &graph, &store, false, count); if results.is_empty() { return Err(format!("query returned no results: {}", q)); diff --git a/src/cli/misc.rs b/src/cli/misc.rs index 5e49b56..890d8ab 100644 --- a/src/cli/misc.rs +++ b/src/cli/misc.rs @@ -3,25 +3,26 @@ pub fn cmd_search(terms: &[String], pipeline_args: &[String], expand: bool, full: bool, debug: bool, fuzzy: bool, content: bool) -> Result<(), String> { use std::collections::BTreeMap; + use crate::search::{Stage, Algorithm, AlgoStage}; // When running inside an agent session, exclude already-surfaced nodes let seen = crate::session::HookSession::from_env() .map(|s| s.seen()) .unwrap_or_default(); - // Parse pipeline stages (unified: algorithms, filters, transforms, generators) - let stages: Vec = if pipeline_args.is_empty() { - vec![crate::search::Stage::Algorithm(crate::search::AlgoStage::parse("spread").unwrap())] + // Build pipeline: if args provided, parse them; otherwise default to spread + let stages: Vec = if pipeline_args.is_empty() { + vec![Stage::Algorithm(AlgoStage { algo: Algorithm::Spread, params: std::collections::HashMap::new() })] } else { - pipeline_args.iter() - .map(|a| crate::search::Stage::parse(a)) - .collect::, _>>()? + // Join args with | and parse as unified query + let pipeline_str = format!("all | {}", pipeline_args.join(" | ")); + crate::query_parser::parse_stages(&pipeline_str)? }; // Check if pipeline needs full Store (has filters/transforms/generators) - let needs_store = stages.iter().any(|s| !matches!(s, crate::search::Stage::Algorithm(_))); + let needs_store = stages.iter().any(|s| !matches!(s, Stage::Algorithm(_))); // Check if pipeline starts with a generator (doesn't need seed terms) - let has_generator = stages.first().map(|s| matches!(s, crate::search::Stage::Generator(_))).unwrap_or(false); + let has_generator = stages.first().map(|s| matches!(s, Stage::Generator(_))).unwrap_or(false); if terms.is_empty() && !has_generator { return Err("search requires terms or a generator stage (e.g. 'all')".into()); diff --git a/src/hippocampus/query/engine.rs b/src/hippocampus/query/engine.rs index 915ec14..28ca344 100644 --- a/src/hippocampus/query/engine.rs +++ b/src/hippocampus/query/engine.rs @@ -157,6 +157,9 @@ pub enum Filter { pub enum Transform { Sort(SortField), Limit(usize), + Select(Vec), + Count, + Connectivity, DominatingSet, } @@ -168,6 +171,8 @@ pub enum SortField { Degree, Weight, Isolation, + Key, + Named(String, bool), // (field_name, ascending) Composite(Vec<(ScoreField, f64)>), } @@ -206,79 +211,6 @@ impl Cmp { } } -/// Parse a comparison like ">0.5", ">=60", "<7d" (durations converted to seconds). -fn parse_cmp(s: &str) -> Result { - let (op_len, ctor): (usize, fn(f64) -> Cmp) = if s.starts_with(">=") { - (2, Cmp::Gte) - } else if s.starts_with("<=") { - (2, Cmp::Lte) - } else if s.starts_with('>') { - (1, Cmp::Gt) - } else if s.starts_with('<') { - (1, Cmp::Lt) - } else if s.starts_with('=') { - (1, Cmp::Eq) - } else { - return Err(format!("expected comparison operator in '{}'", s)); - }; - - let val_str = &s[op_len..]; - let val = parse_duration_or_number(val_str)?; - Ok(ctor(val)) -} - -/// Parse "7d", "24h", "30m" as seconds, or plain numbers. -fn parse_duration_or_number(s: &str) -> Result { - if let Some(n) = s.strip_suffix('d') { - let v: f64 = n.parse().map_err(|_| format!("bad number: {}", n))?; - Ok(v * 86400.0) - } else if let Some(n) = s.strip_suffix('h') { - let v: f64 = n.parse().map_err(|_| format!("bad number: {}", n))?; - Ok(v * 3600.0) - } else if let Some(n) = s.strip_suffix('m') { - let v: f64 = n.parse().map_err(|_| format!("bad number: {}", n))?; - Ok(v * 60.0) - } else { - s.parse().map_err(|_| format!("bad number: {}", s)) - } -} - -/// Parse composite sort: "isolation*0.7+recency(linker)*0.3" -/// Each term is field or field(arg), optionally *weight (default 1.0). -fn parse_composite_sort(s: &str) -> Result, String> { - let mut terms = Vec::new(); - for term in s.split('+') { - let term = term.trim(); - let (field_part, weight) = if let Some((f, w)) = term.rsplit_once('*') { - (f, w.parse::().map_err(|_| format!("bad weight: {}", w))?) - } else { - (term, 1.0) - }; - - // Parse field, possibly with (arg) - let field = if let Some((name, arg)) = field_part.split_once('(') { - let arg = arg.strip_suffix(')').ok_or("missing ) in sort field")?; - match name { - "recency" => ScoreField::Recency(arg.to_string()), - _ => return Err(format!("unknown parameterized sort field: {}", name)), - } - } else { - match field_part { - "isolation" => ScoreField::Isolation, - "degree" => ScoreField::Degree, - "weight" => ScoreField::Weight, - "content-len" => ScoreField::ContentLen, - "priority" => ScoreField::Priority, - _ => return Err(format!("unknown sort field: {}", field_part)), - } - }; - terms.push((field, weight)); - } - if terms.is_empty() { - return Err("empty composite sort".into()); - } - Ok(terms) -} /// Compute a 0-1 score for a node on a single dimension. fn score_field( @@ -348,129 +280,6 @@ impl CompositeCache { } } -/// Parse a NodeType from a label. -fn parse_node_type(s: &str) -> Result { - match s { - "episodic" | "session" => Ok(NodeType::EpisodicSession), - "daily" => Ok(NodeType::EpisodicDaily), - "weekly" => Ok(NodeType::EpisodicWeekly), - "monthly" => Ok(NodeType::EpisodicMonthly), - "semantic" => Ok(NodeType::Semantic), - _ => Err(format!("unknown node type: {} (use: episodic, semantic, daily, weekly, monthly)", s)), - } -} - -impl Stage { - /// Parse a single stage from a string. - /// - /// Algorithm names are tried first (bare words), then predicate syntax - /// (contains ':'). No ambiguity since algorithms are bare words. - pub fn parse(s: &str) -> Result { - let s = s.trim(); - let (negated, s) = if let Some(rest) = s.strip_prefix('!') { - (true, rest) - } else { - (false, s) - }; - - // Generator: "all" - if s == "all" { - return Ok(Stage::Generator(Generator::All)); - } - - // Transform: "dominating-set" - if s == "dominating-set" { - return Ok(Stage::Transform(Transform::DominatingSet)); - } - - // Try algorithm parse first (bare words, no colon) - if !s.contains(':') - && let Ok(algo) = AlgoStage::parse(s) { - return Ok(Stage::Algorithm(algo)); - } - - // Algorithm with params: "spread,max_hops=4" (contains comma but no colon) - if s.contains(',') && !s.contains(':') { - return AlgoStage::parse(s).map(Stage::Algorithm); - } - - // Predicate/transform syntax: "key:value" - let (prefix, value) = s.split_once(':') - .ok_or_else(|| format!("unknown stage: {}", s))?; - - let filter_or_transform = match prefix { - "type" => Stage::Filter(Filter::Type(parse_node_type(value)?)), - "key" => Stage::Filter(Filter::KeyGlob(value.to_string())), - "weight" => Stage::Filter(Filter::Weight(parse_cmp(value)?)), - "age" => Stage::Filter(Filter::Age(parse_cmp(value)?)), - "content-len" => Stage::Filter(Filter::ContentLen(parse_cmp(value)?)), - "provenance" => { - Stage::Filter(Filter::Provenance(value.to_string())) - } - "not-visited" => { - let (agent, dur) = value.split_once(',') - .ok_or("not-visited:AGENT,DURATION")?; - let secs = parse_duration_or_number(dur)?; - Stage::Filter(Filter::NotVisited { - agent: agent.to_string(), - duration: secs as i64, - }) - } - "visited" => Stage::Filter(Filter::Visited { - agent: value.to_string(), - }), - "sort" => { - // Check for composite sort: field*weight+field*weight+... - let field = if value.contains('+') || value.contains('*') { - SortField::Composite(parse_composite_sort(value)?) - } else { - match value { - "priority" => SortField::Priority, - "timestamp" => SortField::Timestamp, - "content-len" => SortField::ContentLen, - "degree" => SortField::Degree, - "weight" => SortField::Weight, - "isolation" => SortField::Isolation, - _ => return Err(format!("unknown sort field: {}", value)), - } - }; - Stage::Transform(Transform::Sort(field)) - } - "limit" => { - let n: usize = value.parse() - .map_err(|_| format!("bad limit: {}", value))?; - Stage::Transform(Transform::Limit(n)) - } - "match" => { - let terms: Vec = value.split(',') - .map(|t| t.to_string()) - .collect(); - Stage::Generator(Generator::Match(terms)) - } - // Algorithm with colon in params? Try fallback. - _ => return AlgoStage::parse(s).map(Stage::Algorithm) - .map_err(|_| format!("unknown stage: {}", s)), - }; - - // Apply negation to filters - if negated { - match filter_or_transform { - Stage::Filter(f) => Ok(Stage::Filter(Filter::Negated(Box::new(f)))), - _ => Err("! prefix only works on filter stages".to_string()), - } - } else { - Ok(filter_or_transform) - } - } - - /// Parse a pipe-separated pipeline string. - pub fn parse_pipeline(s: &str) -> Result, String> { - s.split('|') - .map(|part| Stage::parse(part.trim())) - .collect() - } -} - impl fmt::Display for Stage { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { @@ -479,6 +288,9 @@ impl fmt::Display for Stage { Stage::Filter(filt) => write!(f, "{}", filt), Stage::Transform(Transform::Sort(field)) => write!(f, "sort:{:?}", field), Stage::Transform(Transform::Limit(n)) => write!(f, "limit:{}", n), + Stage::Transform(Transform::Select(fields)) => write!(f, "select:{}", fields.join(",")), + Stage::Transform(Transform::Count) => write!(f, "count"), + Stage::Transform(Transform::Connectivity) => write!(f, "connectivity"), Stage::Transform(Transform::DominatingSet) => write!(f, "dominating-set"), Stage::Algorithm(a) => write!(f, "{}", a.algo), } @@ -613,7 +425,7 @@ fn run_generator(g: &Generator, store: &Store) -> Vec<(String, f64)> { } } -fn eval_filter(filt: &Filter, key: &str, store: &Store, now: i64) -> bool { +pub fn eval_filter(filt: &Filter, key: &str, store: &Store, now: i64) -> bool { let node = match store.nodes.get(key) { Some(n) => n, None => return false, @@ -686,6 +498,39 @@ pub fn run_transform( sb.total_cmp(&sa) // most isolated first }); } + SortField::Key => { + items.sort_by(|a, b| a.0.cmp(&b.0)); + } + SortField::Named(field, asc) => { + // Resolve field from node properties + let resolve = |key: &str| -> Option { + let node = store.nodes.get(key)?; + match field.as_str() { + "weight" => Some(node.weight as f64), + "emotion" => Some(node.emotion as f64), + "retrievals" => Some(node.retrievals as f64), + "uses" => Some(node.uses as f64), + "wrongs" => Some(node.wrongs as f64), + "created" => Some(node.created_at as f64), + "timestamp" => Some(node.timestamp as f64), + "degree" => Some(graph.degree(key) as f64), + "content_len" => Some(node.content.len() as f64), + _ => None, + } + }; + let asc = *asc; + items.sort_by(|a, b| { + let va = resolve(&a.0); + let vb = resolve(&b.0); + let ord = match (va, vb) { + (Some(a), Some(b)) => a.total_cmp(&b), + (Some(_), None) => std::cmp::Ordering::Less, + (None, Some(_)) => std::cmp::Ordering::Greater, + (None, None) => a.0.cmp(&b.0), + }; + if asc { ord } else { ord.reverse() } + }); + } SortField::Priority => { // Pre-compute priorities to avoid O(n log n) calls // inside the sort comparator. @@ -725,6 +570,8 @@ pub fn run_transform( items.truncate(*n); items } + // Output mode directives - don't modify result set, handled at output layer + Transform::Select(_) | Transform::Count | Transform::Connectivity => items, Transform::DominatingSet => { // Greedy 3-covering dominating set: pick the node that covers // the most under-covered neighbors, repeat until every node diff --git a/src/hippocampus/query/parser.rs b/src/hippocampus/query/parser.rs index 64ffc00..e755367 100644 --- a/src/hippocampus/query/parser.rs +++ b/src/hippocampus/query/parser.rs @@ -26,6 +26,12 @@ use crate::graph::Graph; use regex::Regex; use std::collections::BTreeMap; +// Re-export engine types used by Query +pub use super::engine::{ + Stage, Filter, Transform, Generator, SortField, + Algorithm, AlgoStage, Cmp, +}; + // -- AST types -- #[derive(Debug, Clone)] @@ -57,16 +63,6 @@ pub enum CmpOp { Gt, Lt, Ge, Le, Eq, Ne, Match, } -#[derive(Debug, Clone)] -pub enum Stage { - Sort { field: String, ascending: bool }, - Limit(usize), - Select(Vec), - Count, - Connectivity, - DominatingSet, -} - #[derive(Debug, Clone)] pub struct Query { pub expr: Expr, @@ -86,18 +82,54 @@ peg::parser! { = s:(_ "|" _ s:stage() { s })* { s } rule stage() -> Stage - = "sort" _ f:field() _ a:asc_desc() { Stage::Sort { field: f, ascending: a } } - / "limit" _ n:integer() { Stage::Limit(n) } - / "select" _ f:field_list() { Stage::Select(f) } - / "count" { Stage::Count } - / "connectivity" { Stage::Connectivity } - / "dominating-set" { Stage::DominatingSet } + // Original PEG syntax (space-separated) + = "sort" _ f:field() _ a:asc_desc() { + Stage::Transform(Transform::Sort(make_sort_field(&f, a))) + } + / "limit" _ n:integer() { Stage::Transform(Transform::Limit(n)) } + / "select" _ f:field_list() { Stage::Transform(Transform::Select(f)) } + / "count" { Stage::Transform(Transform::Count) } + / "connectivity" { Stage::Transform(Transform::Connectivity) } + / "dominating-set" { Stage::Transform(Transform::DominatingSet) } + // Pipeline syntax (colon-separated) + / "sort:" f:field() { Stage::Transform(Transform::Sort(make_sort_field(&f, false))) } + / "limit:" n:integer() { Stage::Transform(Transform::Limit(n)) } + / "select:" f:field_list_colon() { Stage::Transform(Transform::Select(f)) } + / "type:" t:ident() { make_type_filter(&t) } + / "age:" c:cmp_duration() { Stage::Filter(Filter::Age(c)) } + / "key:" g:ident() { Stage::Filter(Filter::KeyGlob(g)) } + / "provenance:" p:ident() { Stage::Filter(Filter::Provenance(p)) } + / "all" { Stage::Generator(Generator::All) } + // Graph algorithms + / "spread" { Stage::Algorithm(AlgoStage { algo: Algorithm::Spread, params: std::collections::HashMap::new() }) } + / "spectral" { Stage::Algorithm(AlgoStage { algo: Algorithm::Spectral, params: std::collections::HashMap::new() }) } rule asc_desc() -> bool = "asc" { true } / "desc" { false } / { false } // default: descending + rule field_list_colon() -> Vec + = f:field() fs:("," f:field() { f })* { + let mut v = vec![f]; + v.extend(fs); + v + } + + rule cmp_duration() -> Cmp + = ">=" n:duration() { Cmp::Gte(n) } + / "<=" n:duration() { Cmp::Lte(n) } + / ">" n:duration() { Cmp::Gt(n) } + / "<" n:duration() { Cmp::Lt(n) } + / "=" n:duration() { Cmp::Eq(n) } + + rule duration() -> f64 + = n:number() "d" { n * 86400.0 } + / n:number() "h" { n * 3600.0 } + / n:number() "m" { n * 60.0 } + / n:number() "s" { n } + / n:number() { n } + rule field_list() -> Vec = f:field() fs:(_ "," _ f:field() { f })* { let mut v = vec![f]; @@ -122,6 +154,7 @@ peg::parser! { Expr::Comparison { field: f, op, value: v } } "*" { Expr::All } + "all" { Expr::All } "(" _ e:expr() _ ")" { e } } @@ -167,6 +200,55 @@ peg::parser! { } } +// -- Helper functions for PEG grammar -- + +fn make_sort_field(field: &str, ascending: bool) -> SortField { + match field { + "priority" => SortField::Priority, + "timestamp" => SortField::Timestamp, + "content-len" | "content_len" => SortField::ContentLen, + "degree" => SortField::Degree, + "weight" => SortField::Weight, + "isolation" => SortField::Isolation, + "key" => SortField::Key, + _ => SortField::Named(field.to_string(), ascending), + } +} + +fn make_type_filter(type_name: &str) -> Stage { + let node_type = match type_name { + "episodic" | "session" => NodeType::EpisodicSession, + "daily" => NodeType::EpisodicDaily, + "weekly" => NodeType::EpisodicWeekly, + "monthly" => NodeType::EpisodicMonthly, + "semantic" => NodeType::Semantic, + _ => NodeType::Semantic, // fallback + }; + Stage::Filter(Filter::Type(node_type)) +} + +/// Parse a query string into Vec for pipeline execution. +/// This is the unified entry point — replaces engine::Stage::parse_pipeline. +pub fn parse_stages(s: &str) -> Result, String> { + let q = query_parser::query(s) + .map_err(|e| format!("Parse error: {}", e))?; + + let mut stages = Vec::new(); + + // Convert Expr to a Generator stage + match &q.expr { + Expr::All => stages.push(Stage::Generator(Generator::All)), + _ => { + // For complex expressions, we need the Query-based path + // This shouldn't happen for pipeline queries + return Err("Complex expressions not supported in pipeline mode; use CLI query".into()); + } + } + + stages.extend(q.stages); + Ok(stages) +} + // -- Field resolution -- /// Resolve a field value from a node + graph context, returning a comparable Value. @@ -377,12 +459,12 @@ fn execute_parsed( let mut set = Vec::new(); for stage in &q.stages { match stage { - Stage::Select(fields) => { + Stage::Transform(Transform::Select(fields)) => { for f in fields { if !set.contains(f) { set.push(f.clone()); } } } - Stage::Sort { field, .. } => { + Stage::Transform(Transform::Sort(SortField::Named(field, _))) => { if !set.contains(field) { set.push(field.clone()); } } _ => {} @@ -404,37 +486,75 @@ fn execute_parsed( let mut has_sort = false; for stage in &q.stages { match stage { - Stage::Sort { field, ascending } => { + Stage::Transform(Transform::Sort(sort_field)) => { has_sort = true; - let asc = *ascending; - results.sort_by(|a, b| { - let va = a.fields.get(field).and_then(as_num); - let vb = b.fields.get(field).and_then(as_num); - let ord = match (va, vb) { - (Some(a), Some(b)) => a.total_cmp(&b), - _ => { - let sa = a.fields.get(field).map(as_str).unwrap_or_default(); - let sb = b.fields.get(field).map(as_str).unwrap_or_default(); - sa.cmp(&sb) - } - }; - if asc { ord } else { ord.reverse() } - }); + match sort_field { + SortField::Named(field, asc) => { + let asc = *asc; + let field = field.clone(); + results.sort_by(|a, b| { + let va = a.fields.get(&field).and_then(as_num); + let vb = b.fields.get(&field).and_then(as_num); + let ord = match (va, vb) { + (Some(a), Some(b)) => a.total_cmp(&b), + _ => { + let sa = a.fields.get(&field).map(as_str).unwrap_or_default(); + let sb = b.fields.get(&field).map(as_str).unwrap_or_default(); + sa.cmp(&sb) + } + }; + if asc { ord } else { ord.reverse() } + }); + } + SortField::Key => { + results.sort_by(|a, b| a.key.cmp(&b.key)); + } + SortField::Degree => { + results.sort_by(|a, b| { + let da = graph.degree(&a.key); + let db = graph.degree(&b.key); + db.cmp(&da) + }); + } + SortField::Weight => { + results.sort_by(|a, b| { + let wa = store.nodes.get(&a.key).map(|n| n.weight).unwrap_or(0.0); + let wb = store.nodes.get(&b.key).map(|n| n.weight).unwrap_or(0.0); + wb.total_cmp(&wa) + }); + } + SortField::Timestamp => { + results.sort_by(|a, b| { + let ta = store.nodes.get(&a.key).map(|n| n.timestamp).unwrap_or(0); + let tb = store.nodes.get(&b.key).map(|n| n.timestamp).unwrap_or(0); + tb.cmp(&ta) + }); + } + _ => {} // other sort fields handled by default degree sort + } } - Stage::Limit(n) => { + Stage::Transform(Transform::Limit(n)) => { results.truncate(*n); } - Stage::Connectivity => {} // handled in output - Stage::Select(_) | Stage::Count => {} // handled in output - Stage::DominatingSet => { + Stage::Transform(Transform::Connectivity) => {} // handled in output + Stage::Transform(Transform::Select(_) | Transform::Count) => {} // handled in output + Stage::Transform(Transform::DominatingSet) => { let mut items: Vec<(String, f64)> = results.iter() .map(|r| (r.key.clone(), graph.degree(&r.key) as f64)) .collect(); - let xform = super::engine::Transform::DominatingSet; + let xform = Transform::DominatingSet; items = super::engine::run_transform(&xform, items, store, graph); let keep: std::collections::HashSet = items.into_iter().map(|(k, _)| k).collect(); results.retain(|r| keep.contains(&r.key)); } + Stage::Filter(filt) => { + // Apply filter to narrow results + let now = crate::store::now_epoch(); + results.retain(|r| super::engine::eval_filter(filt, &r.key, store, now)); + } + Stage::Generator(_) | Stage::Algorithm(_) => { + // Generators are handled by Expr, algorithms not applicable here + } } } @@ -474,7 +594,7 @@ pub fn run_query(store: &Store, graph: &Graph, query_str: &str) -> Result<(), St let results = execute_parsed(store, graph, &q)?; // Count stage - if q.stages.iter().any(|s| matches!(s, Stage::Count)) { + if q.stages.iter().any(|s| matches!(s, Stage::Transform(Transform::Count))) { println!("{}", results.len()); return Ok(()); } @@ -485,14 +605,14 @@ pub fn run_query(store: &Store, graph: &Graph, query_str: &str) -> Result<(), St } // Connectivity stage - if q.stages.iter().any(|s| matches!(s, Stage::Connectivity)) { + if q.stages.iter().any(|s| matches!(s, Stage::Transform(Transform::Connectivity))) { print_connectivity(&results, graph); return Ok(()); } // Select stage let fields: Option<&Vec> = q.stages.iter().find_map(|s| match s { - Stage::Select(f) => Some(f), + Stage::Transform(Transform::Select(f)) => Some(f), _ => None, }); @@ -527,7 +647,7 @@ pub fn query_to_string(store: &Store, graph: &Graph, query_str: &str) -> Result< let results = execute_parsed(store, graph, &q)?; - if q.stages.iter().any(|s| matches!(s, Stage::Count)) { + if q.stages.iter().any(|s| matches!(s, Stage::Transform(Transform::Count))) { return Ok(results.len().to_string()); } if results.is_empty() { @@ -535,7 +655,7 @@ pub fn query_to_string(store: &Store, graph: &Graph, query_str: &str) -> Result< } let fields: Option<&Vec> = q.stages.iter().find_map(|s| match s { - Stage::Select(f) => Some(f), + Stage::Transform(Transform::Select(f)) => Some(f), _ => None, }); diff --git a/src/subconscious/defs.rs b/src/subconscious/defs.rs index 1fe8377..682b4fa 100644 --- a/src/subconscious/defs.rs +++ b/src/subconscious/defs.rs @@ -801,7 +801,7 @@ pub fn run_agent( // Run the query if present let keys = if !def.query.is_empty() { - let mut stages = search::Stage::parse_pipeline(&def.query)?; + let mut stages = crate::query_parser::parse_stages(&def.query)?; let has_limit = stages.iter().any(|s| matches!(s, search::Stage::Transform(search::Transform::Limit(_)))); if !has_limit {