diff --git a/research/query-language-unification.md b/research/query-language-unification.md new file mode 100644 index 0000000..57ceb98 --- /dev/null +++ b/research/query-language-unification.md @@ -0,0 +1,94 @@ +# Query Language Unification Plan + +**Status: DONE** (2026-04-11) + +## Problem (was) + +Two query parsers that didn't agree on syntax: + +1. **PEG parser** (`hippocampus/query/parser.rs`) — boolean logic, general + comparisons, operator precedence, parentheses. Used by CLI and compact + format path in `query()` tool. + +2. **Pipeline parser** (`hippocampus/query/engine.rs`) — domain-specific + filters (type, age, provenance), graph algorithms (spread, spectral). + Used by full format path in `query()` tool. + +`journal_tail` generates pipeline syntax but gets routed through the PEG +parser on the compact path. Result: parse errors. + +## Approach + +Keep the PEG parser (has the harder-to-build structural foundation), +extend it with the pipeline parser's domain features. + +## Expression extensions (add to `expr` rule in parser.rs) + +- `field:value` shorthand for `field = 'value'` (colon-separated equality) +- `*` already works as `Expr::All` +- `key ~ 'glob'` already works via match operator + +## New stages (add to `stage` rule in parser.rs) + +Domain filter stages from engine.rs: +- `type:X` — filter by node type (episodic, daily, weekly, monthly, semantic) +- `age:<7d` — duration comparison on timestamp +- `key:GLOB` — glob match on key +- `provenance:X` — provenance filter +- `weight:>N` — weight comparison (may already work via general comparison) +- `content-len:>N` — content size filter + +Sort/limit syntax variants: +- `sort:field` alongside existing `sort field` +- `limit:N` alongside existing `limit N` + +Graph algorithms: +- `spread` — spreading activation +- `spectral` — spectral nearest neighbors +- `confluence` — multi-source reachability +- `geodesic` — straightest spectral paths +- `manifold` — extrapolation along seed direction + +## What changes + +1. `parser.rs` — add field:value shorthand to expr, add domain stages +2. `engine.rs` — keep run_pipeline execution logic, have PEG parser emit + compatible Stage types (or convert PEG AST to Stage at boundary) +3. `query()` tool handler (memory.rs) — one parser path for all formats +4. `journal_tail` (memory.rs) — generate unified syntax +5. CLI `poc-memory query` — uses unified parser + +## Migration path + +1. Add field:value shorthand and type/age/key stages to PEG parser +2. Route query() through PEG parser for all formats +3. Migrate journal_tail and any other pipeline-syntax callers +4. Remove the pipeline parser (or keep as internal execution layer) + +## What was done + +**Deleted from engine.rs (-153 lines):** +- `Stage::parse()` and `Stage::parse_pipeline()` — redundant with PEG +- `parse_cmp()`, `parse_duration_or_number()`, `parse_composite_sort()`, + `parse_node_type()`, `parse_sort_field()` — helper functions for deleted parser + +**Added to parser.rs (+120 lines):** +- Pipeline syntax in PEG grammar (`type:X`, `age:` +- Grammar helper functions + +**Net: +17 lines** + +**Architecture now:** +- parser.rs: PEG grammar handles ALL parsing (both syntaxes) +- engine.rs: Pure execution — types and `run_query()`, no parsing + +Result: `all | type:episodic | sort:timestamp | limit:5` works everywhere. +Mixed syntax like `degree > 5 | type:semantic | sort degree` also works. + +## What NOT to change (original note) + +The run_pipeline execution logic stays — it's correct and well-tested. +Only the parsing front-end unifies. The pipeline parser's Stage enum +becomes the internal representation that both the PEG parser and any +remaining direct callers produce. diff --git a/research/sparse-kernel-napkin-math.md b/research/sparse-kernel-napkin-math.md new file mode 100644 index 0000000..b732b55 --- /dev/null +++ b/research/sparse-kernel-napkin-math.md @@ -0,0 +1,198 @@ +# Sparse Kernel Compilation: Napkin Math for Qwen3.5-27B + +## Architecture recap + +| Parameter | Value | +|-----------|-------| +| Layers | 64 (48 linear attention, 16 full attention) | +| Hidden dim (H) | 5,120 | +| Query heads | 24 | +| KV heads | 4 (GQA) | +| Head dim | 256 | +| FFN intermediate | 17,408 | +| Full attention interval | every 4th layer | +| Total params | ~27.5B | + +**Key discovery**: Qwen3.5 already uses sparse attention — 48/64 layers use +linear attention (O(N)), only 16 use full O(N^2) attention. The attention +sparsity question is partially answered by the architecture itself. The +remaining opportunity is **weight sparsity** in the projection and FFN matrices. + +## Measured weight sparsity (from B200, all 11 shards) + +| Component | Count | Total params | <0.001 | <0.005 | <0.01 | <0.02 | +|-----------|-------|-------------|--------|--------|-------|-------| +| down_proj | 65 | 5,793,382,400 | 7.4% | 35.8% | 64.3% | 92.8% | +| gate_proj | 65 | 5,793,382,400 | 7.2% | 35.0% | 63.3% | 92.3% | +| up_proj | 65 | 5,793,382,400 | 7.3% | 35.2% | 63.5% | 92.5% | +| q_proj | 17 | 1,069,547,520 | 5.7% | 27.9% | 51.9% | 82.8% | +| k_proj | 17 | 89,128,960 | 6.0% | 29.4% | 54.1% | 84.1% | +| v_proj | 17 | 89,128,960 | 4.8% | 23.7% | 44.7% | 74.2% | +| o_proj | 17 | 534,773,760 | 5.3% | 25.9% | 48.7% | 79.6% | +| other | 605 | 6,070,652,272 | 5.4% | 26.4% | 49.6% | 80.9% | + +**Key findings**: +- FFN layers (gate/up/down) are remarkably sparse: ~64% of weights below 0.01 +- At threshold 0.02, FFN sparsity exceeds 92% +- Attention projections are less sparse but still significant: 45-52% below 0.01 +- v_proj is the least sparse component (44.7% below 0.01) + +## Per-layer parameter breakdown + +- **QKV projection**: H × (Q_dim + K_dim + V_dim) ≈ 5120 × 13312 ≈ 68M +- **Output projection**: ~26M +- **FFN (gate + up + down)**: 5120 × 17408 × 3 ≈ 267M +- **Total per layer**: ~361M +- **64 layers**: ~23.1B (rest is embeddings, norms, etc.) + +## Dense baseline: FLOPs per token + +Each weight parameter contributes 2 FLOPs per token (multiply + accumulate). + +- Per layer: ~722M FLOPs/token +- 64 layers: **~46.2B FLOPs/token** + +On a B200 (theoretical ~4.5 PFLOPS FP8, ~1.1 PFLOPS BF16): +- BF16 throughput: 46.2B / 1.1e15 ≈ 0.042ms per token (compute-bound limit) +- But inference is usually **memory-bandwidth-bound** for small batch sizes + +B200 HBM bandwidth: ~8 TB/s. At BF16 (2 bytes/param): +- Loading all weights once: 27.5B × 2 = 55GB → 55/8000 ≈ **6.9ms per token** (batch=1) +- This is the real bottleneck. Compute is cheap; loading weights is expensive. + +## What sparsity buys you + +At **X% sparsity** (X% of weights are zero), you need to load (1-X)% of the weights: + +| Sparsity | Params loaded | Time (batch=1) | Speedup | +|----------|--------------|-----------------|---------| +| 0% (dense) | 27.5B | 6.9ms | 1.0x | +| 50% | 13.8B | 3.4ms | 2.0x | +| 75% | 6.9B | 1.7ms | 4.0x | +| 90% | 2.8B | 0.7ms | 10x | + +**This is the key insight**: inference at small batch sizes is memory-bandwidth-bound. +Sparse weights = fewer bytes to load = directly proportional speedup, +**IF** the sparse kernel can avoid the gather/scatter overhead. + +## The compilation problem + +Dense GEMM is fast because: +1. Weights are contiguous in memory → sequential reads +2. Tiling fits perfectly in SRAM → high reuse +3. Hardware tensor cores expect dense blocks + +Naive sparse matmul kills this: +- Irregular memory access → low bandwidth utilization +- Poor SRAM tiling → cache thrashing +- Tensor cores can't help + +### The FlashAttention analogy + +FlashAttention's insight: the N×N attention matrix doesn't fit in SRAM, +but you can tile it so each tile does. You recompute instead of materializing. + +**Sparse kernel compilation insight**: the sparsity pattern is **known at compile time**. +A compiler can: + +1. **Analyze the sparsity graph** of each weight matrix +2. **Find blocks** of non-zero weights that are close in memory +3. **Generate a tiling schedule** that loads these blocks into SRAM efficiently +4. **Emit fused kernels** where the memory access pattern is baked in as constants + +The resulting kernel looks like a dense kernel to the hardware — +sequential reads, high SRAM reuse, maybe even tensor core compatible +(if the compiler finds dense sub-blocks within the sparse matrix). + +## Block sparsity vs unstructured sparsity + +**Block sparse** (e.g., 4×4 or 16×16 blocks zeroed out): +- GPU-friendly: blocks map to tensor core operations +- Less flexible: coarser pruning granularity → less achievable sparsity +- NVIDIA's 2:4 structured sparsity gets ~50% sparsity with tensor core support +- Real-world: typically 50-70% sparsity achievable without quality loss + +**Unstructured sparse** (individual weights zeroed): +- Maximally flexible: fine-grained pruning → higher achievable sparsity +- GPU-hostile: the gather/scatter problem +- Real-world: 80-95% sparsity achievable in many layers without quality loss + +**The compiled kernel approach bridges this**: take unstructured sparsity +(maximally flexible, high compression) and compile it into a kernel that +runs as efficiently as block-sparse. Best of both worlds. + +## Recurrent depth composability + +From our April 10 discussion: middle transformer layers are doing +open-coded simulated annealing — similar weights, similar computation. + +If layers 8-24 have cosine similarity > 0.95: +- Replace 16 layers with 1 layer × 16 iterations +- **Parameter reduction**: 16 × 361M = 5.8B → 361M (16x reduction for those layers) +- **Memory bandwidth**: load one layer's weights, iterate in SRAM +- Combined with 50% sparsity on the remaining unique layers: + - Unique layers (48): 48 × 361M × 0.5 = 8.7B params + - Recurrent layer: 361M × 0.5 = 180M params (but iterated 16x in SRAM) + - Total loaded per token: ~8.9B × 2 bytes = 17.8GB + - Time: 17.8/8000 ≈ **2.2ms per token** (vs 6.9ms dense) — **3.1x speedup** + +With higher sparsity (75%) + recurrence: + - Unique layers: 48 × 361M × 0.25 = 4.3B + - Recurrent: 90M + - Total: ~4.4B × 2 = 8.8GB → **1.1ms per token** — **6.3x speedup** + +## What needs to happen + +### Phase 1: Measure (can do now with B200 access) +1. Extract all weight matrices from Qwen3.5-27B +2. For each matrix, compute: + - Magnitude distribution (what % of weights are near-zero?) + - Achievable sparsity at various thresholds (L1 magnitude pruning) + - Dense sub-block statistics (how many 4×4, 16×16 blocks are all-zero?) +3. Layer similarity: pairwise cosine similarity of weight matrices across layers + - Which layers are nearly identical? (recurrence candidates) +4. Validate quality: run perplexity eval at various sparsity levels + +### Phase 2: Compile (research project) +1. For a single sparse weight matrix, generate an optimized Triton kernel +2. Benchmark vs dense GEMM and vs NVIDIA's 2:4 sparse +3. Iterate on the tiling strategy + +### Phase 3: End-to-end +1. Full model with compiled sparse kernels +2. Perplexity + latency benchmarks +3. Compare: dense, 2:4 structured, compiled unstructured + +## Related work to read + +- **SparseGPT** (Frantar & Alistarh, 2023): one-shot pruning to 50-60% unstructured sparsity + with minimal quality loss. Key result: large models are more prunable than small ones. +- **Wanda** (Sun et al., 2023): pruning by weight magnitude × input activation. + Simpler than SparseGPT, comparable results. +- **NVIDIA 2:4 sparsity**: hardware-supported structured sparsity on Ampere+. + 50% sparsity, ~2x speedup on tensor cores. The existence proof that sparse can be fast. +- **Triton** (Tillet et al.): Python DSL for GPU kernel generation. + The right compilation target — can express arbitrary tiling strategies. +- **TACO** (Kjolstad et al.): tensor algebra compiler. Generates kernels for + specific sparse tensor formats. Academic but the ideas are right. +- **FlashAttention** (Dao et al.): the tiling strategy to learn from. +- **DejaVu** (Liu et al., 2023): contextual sparsity — predicting which neurons + to activate per input. Dynamic sparsity, complementary to weight sparsity. + +## The bigger picture + +Current state of the art: dense models with FlashAttention for the N×N attention part. +Weight sparsity is known to work (SparseGPT, Wanda) but isn't deployed because +the GPU kernels don't exist to exploit it efficiently. + +The gap: nobody has built a compiler that takes a specific sparse weight matrix +and emits a kernel optimized for that exact pattern. FlashAttention proved +that custom kernels for specific computational patterns beat general-purpose ones. +The same should hold for sparse weight patterns. + +**The bet**: a compiled sparse kernel for Qwen3.5-27B's actual sparsity pattern +would be within 80% of the theoretical bandwidth-bound speedup. If true, +50% sparsity → 1.6x real speedup, 75% → 3.2x, composing with recurrent depth +for potentially 5-6x total. + +That would make 27B inference as fast as a 5B dense model, with 27B quality. diff --git a/src/agent/tools/memory.rs b/src/agent/tools/memory.rs index 76b7934..f526d44 100644 --- a/src/agent/tools/memory.rs +++ b/src/agent/tools/memory.rs @@ -260,7 +260,7 @@ async fn query(args: &serde_json::Value) -> Result { let store = arc.lock().await; let graph = store.build_graph(); - let stages = crate::search::Stage::parse_pipeline(query_str) + let stages = crate::query_parser::parse_stages(query_str) .map_err(|e| anyhow::anyhow!("{}", e))?; let results = crate::search::run_query(&stages, vec![], &graph, &store, false, 100); let keys: Vec = results.into_iter().map(|(k, _)| k).collect(); @@ -272,12 +272,61 @@ async fn query(args: &serde_json::Value) -> Result { Ok(crate::subconscious::prompts::format_nodes_section(&store, &items, &graph)) } _ => { - crate::query_parser::query_to_string(&store, &graph, query_str) - .map_err(|e| anyhow::anyhow!("{}", e)) + // Compact output: check for count/select stages, else just list keys + use crate::search::{Stage, Transform}; + let has_count = stages.iter().any(|s| matches!(s, Stage::Transform(Transform::Count))); + if has_count { + return Ok(keys.len().to_string()); + } + if keys.is_empty() { + return Ok("no results".to_string()); + } + let select_fields: Option<&Vec> = stages.iter().find_map(|s| match s { + Stage::Transform(Transform::Select(f)) => Some(f), + _ => None, + }); + if let Some(fields) = select_fields { + let mut out = String::from("key\t"); + out.push_str(&fields.join("\t")); + out.push('\n'); + for key in &keys { + out.push_str(key); + for f in fields { + out.push('\t'); + out.push_str(&resolve_field_str(&store, &graph, key, f)); + } + out.push('\n'); + } + Ok(out) + } else { + Ok(keys.join("\n")) + } } } } +fn resolve_field_str(store: &crate::store::Store, graph: &crate::graph::Graph, key: &str, field: &str) -> String { + let node = match store.nodes.get(key) { + Some(n) => n, + None => return "-".to_string(), + }; + match field { + "key" => key.to_string(), + "weight" => format!("{:.3}", node.weight), + "node_type" => format!("{:?}", node.node_type), + "provenance" => node.provenance.clone(), + "emotion" => format!("{}", node.emotion), + "retrievals" => format!("{}", node.retrievals), + "uses" => format!("{}", node.uses), + "wrongs" => format!("{}", node.wrongs), + "created" => format!("{}", node.created_at), + "timestamp" => format!("{}", node.timestamp), + "degree" => format!("{}", graph.degree(key)), + "content_len" => format!("{}", node.content.len()), + _ => "-".to_string(), + } +} + // ── Journal tools ────────────────────────────────────────────── async fn journal_tail(args: &serde_json::Value) -> Result { diff --git a/src/cli/agent.rs b/src/cli/agent.rs index c6ae426..d2d05c8 100644 --- a/src/cli/agent.rs +++ b/src/cli/agent.rs @@ -25,7 +25,7 @@ pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option target.to_vec() } else if let Some(q) = query { let graph = store.build_graph(); - let stages = crate::search::Stage::parse_pipeline(q)?; + let stages = crate::query_parser::parse_stages(q)?; let results = crate::search::run_query(&stages, vec![], &graph, &store, false, count); if results.is_empty() { return Err(format!("query returned no results: {}", q)); diff --git a/src/cli/misc.rs b/src/cli/misc.rs index 5e49b56..890d8ab 100644 --- a/src/cli/misc.rs +++ b/src/cli/misc.rs @@ -3,25 +3,26 @@ pub fn cmd_search(terms: &[String], pipeline_args: &[String], expand: bool, full: bool, debug: bool, fuzzy: bool, content: bool) -> Result<(), String> { use std::collections::BTreeMap; + use crate::search::{Stage, Algorithm, AlgoStage}; // When running inside an agent session, exclude already-surfaced nodes let seen = crate::session::HookSession::from_env() .map(|s| s.seen()) .unwrap_or_default(); - // Parse pipeline stages (unified: algorithms, filters, transforms, generators) - let stages: Vec = if pipeline_args.is_empty() { - vec![crate::search::Stage::Algorithm(crate::search::AlgoStage::parse("spread").unwrap())] + // Build pipeline: if args provided, parse them; otherwise default to spread + let stages: Vec = if pipeline_args.is_empty() { + vec![Stage::Algorithm(AlgoStage { algo: Algorithm::Spread, params: std::collections::HashMap::new() })] } else { - pipeline_args.iter() - .map(|a| crate::search::Stage::parse(a)) - .collect::, _>>()? + // Join args with | and parse as unified query + let pipeline_str = format!("all | {}", pipeline_args.join(" | ")); + crate::query_parser::parse_stages(&pipeline_str)? }; // Check if pipeline needs full Store (has filters/transforms/generators) - let needs_store = stages.iter().any(|s| !matches!(s, crate::search::Stage::Algorithm(_))); + let needs_store = stages.iter().any(|s| !matches!(s, Stage::Algorithm(_))); // Check if pipeline starts with a generator (doesn't need seed terms) - let has_generator = stages.first().map(|s| matches!(s, crate::search::Stage::Generator(_))).unwrap_or(false); + let has_generator = stages.first().map(|s| matches!(s, Stage::Generator(_))).unwrap_or(false); if terms.is_empty() && !has_generator { return Err("search requires terms or a generator stage (e.g. 'all')".into()); diff --git a/src/hippocampus/query/engine.rs b/src/hippocampus/query/engine.rs index 915ec14..28ca344 100644 --- a/src/hippocampus/query/engine.rs +++ b/src/hippocampus/query/engine.rs @@ -157,6 +157,9 @@ pub enum Filter { pub enum Transform { Sort(SortField), Limit(usize), + Select(Vec), + Count, + Connectivity, DominatingSet, } @@ -168,6 +171,8 @@ pub enum SortField { Degree, Weight, Isolation, + Key, + Named(String, bool), // (field_name, ascending) Composite(Vec<(ScoreField, f64)>), } @@ -206,79 +211,6 @@ impl Cmp { } } -/// Parse a comparison like ">0.5", ">=60", "<7d" (durations converted to seconds). -fn parse_cmp(s: &str) -> Result { - let (op_len, ctor): (usize, fn(f64) -> Cmp) = if s.starts_with(">=") { - (2, Cmp::Gte) - } else if s.starts_with("<=") { - (2, Cmp::Lte) - } else if s.starts_with('>') { - (1, Cmp::Gt) - } else if s.starts_with('<') { - (1, Cmp::Lt) - } else if s.starts_with('=') { - (1, Cmp::Eq) - } else { - return Err(format!("expected comparison operator in '{}'", s)); - }; - - let val_str = &s[op_len..]; - let val = parse_duration_or_number(val_str)?; - Ok(ctor(val)) -} - -/// Parse "7d", "24h", "30m" as seconds, or plain numbers. -fn parse_duration_or_number(s: &str) -> Result { - if let Some(n) = s.strip_suffix('d') { - let v: f64 = n.parse().map_err(|_| format!("bad number: {}", n))?; - Ok(v * 86400.0) - } else if let Some(n) = s.strip_suffix('h') { - let v: f64 = n.parse().map_err(|_| format!("bad number: {}", n))?; - Ok(v * 3600.0) - } else if let Some(n) = s.strip_suffix('m') { - let v: f64 = n.parse().map_err(|_| format!("bad number: {}", n))?; - Ok(v * 60.0) - } else { - s.parse().map_err(|_| format!("bad number: {}", s)) - } -} - -/// Parse composite sort: "isolation*0.7+recency(linker)*0.3" -/// Each term is field or field(arg), optionally *weight (default 1.0). -fn parse_composite_sort(s: &str) -> Result, String> { - let mut terms = Vec::new(); - for term in s.split('+') { - let term = term.trim(); - let (field_part, weight) = if let Some((f, w)) = term.rsplit_once('*') { - (f, w.parse::().map_err(|_| format!("bad weight: {}", w))?) - } else { - (term, 1.0) - }; - - // Parse field, possibly with (arg) - let field = if let Some((name, arg)) = field_part.split_once('(') { - let arg = arg.strip_suffix(')').ok_or("missing ) in sort field")?; - match name { - "recency" => ScoreField::Recency(arg.to_string()), - _ => return Err(format!("unknown parameterized sort field: {}", name)), - } - } else { - match field_part { - "isolation" => ScoreField::Isolation, - "degree" => ScoreField::Degree, - "weight" => ScoreField::Weight, - "content-len" => ScoreField::ContentLen, - "priority" => ScoreField::Priority, - _ => return Err(format!("unknown sort field: {}", field_part)), - } - }; - terms.push((field, weight)); - } - if terms.is_empty() { - return Err("empty composite sort".into()); - } - Ok(terms) -} /// Compute a 0-1 score for a node on a single dimension. fn score_field( @@ -348,129 +280,6 @@ impl CompositeCache { } } -/// Parse a NodeType from a label. -fn parse_node_type(s: &str) -> Result { - match s { - "episodic" | "session" => Ok(NodeType::EpisodicSession), - "daily" => Ok(NodeType::EpisodicDaily), - "weekly" => Ok(NodeType::EpisodicWeekly), - "monthly" => Ok(NodeType::EpisodicMonthly), - "semantic" => Ok(NodeType::Semantic), - _ => Err(format!("unknown node type: {} (use: episodic, semantic, daily, weekly, monthly)", s)), - } -} - -impl Stage { - /// Parse a single stage from a string. - /// - /// Algorithm names are tried first (bare words), then predicate syntax - /// (contains ':'). No ambiguity since algorithms are bare words. - pub fn parse(s: &str) -> Result { - let s = s.trim(); - let (negated, s) = if let Some(rest) = s.strip_prefix('!') { - (true, rest) - } else { - (false, s) - }; - - // Generator: "all" - if s == "all" { - return Ok(Stage::Generator(Generator::All)); - } - - // Transform: "dominating-set" - if s == "dominating-set" { - return Ok(Stage::Transform(Transform::DominatingSet)); - } - - // Try algorithm parse first (bare words, no colon) - if !s.contains(':') - && let Ok(algo) = AlgoStage::parse(s) { - return Ok(Stage::Algorithm(algo)); - } - - // Algorithm with params: "spread,max_hops=4" (contains comma but no colon) - if s.contains(',') && !s.contains(':') { - return AlgoStage::parse(s).map(Stage::Algorithm); - } - - // Predicate/transform syntax: "key:value" - let (prefix, value) = s.split_once(':') - .ok_or_else(|| format!("unknown stage: {}", s))?; - - let filter_or_transform = match prefix { - "type" => Stage::Filter(Filter::Type(parse_node_type(value)?)), - "key" => Stage::Filter(Filter::KeyGlob(value.to_string())), - "weight" => Stage::Filter(Filter::Weight(parse_cmp(value)?)), - "age" => Stage::Filter(Filter::Age(parse_cmp(value)?)), - "content-len" => Stage::Filter(Filter::ContentLen(parse_cmp(value)?)), - "provenance" => { - Stage::Filter(Filter::Provenance(value.to_string())) - } - "not-visited" => { - let (agent, dur) = value.split_once(',') - .ok_or("not-visited:AGENT,DURATION")?; - let secs = parse_duration_or_number(dur)?; - Stage::Filter(Filter::NotVisited { - agent: agent.to_string(), - duration: secs as i64, - }) - } - "visited" => Stage::Filter(Filter::Visited { - agent: value.to_string(), - }), - "sort" => { - // Check for composite sort: field*weight+field*weight+... - let field = if value.contains('+') || value.contains('*') { - SortField::Composite(parse_composite_sort(value)?) - } else { - match value { - "priority" => SortField::Priority, - "timestamp" => SortField::Timestamp, - "content-len" => SortField::ContentLen, - "degree" => SortField::Degree, - "weight" => SortField::Weight, - "isolation" => SortField::Isolation, - _ => return Err(format!("unknown sort field: {}", value)), - } - }; - Stage::Transform(Transform::Sort(field)) - } - "limit" => { - let n: usize = value.parse() - .map_err(|_| format!("bad limit: {}", value))?; - Stage::Transform(Transform::Limit(n)) - } - "match" => { - let terms: Vec = value.split(',') - .map(|t| t.to_string()) - .collect(); - Stage::Generator(Generator::Match(terms)) - } - // Algorithm with colon in params? Try fallback. - _ => return AlgoStage::parse(s).map(Stage::Algorithm) - .map_err(|_| format!("unknown stage: {}", s)), - }; - - // Apply negation to filters - if negated { - match filter_or_transform { - Stage::Filter(f) => Ok(Stage::Filter(Filter::Negated(Box::new(f)))), - _ => Err("! prefix only works on filter stages".to_string()), - } - } else { - Ok(filter_or_transform) - } - } - - /// Parse a pipe-separated pipeline string. - pub fn parse_pipeline(s: &str) -> Result, String> { - s.split('|') - .map(|part| Stage::parse(part.trim())) - .collect() - } -} - impl fmt::Display for Stage { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { @@ -479,6 +288,9 @@ impl fmt::Display for Stage { Stage::Filter(filt) => write!(f, "{}", filt), Stage::Transform(Transform::Sort(field)) => write!(f, "sort:{:?}", field), Stage::Transform(Transform::Limit(n)) => write!(f, "limit:{}", n), + Stage::Transform(Transform::Select(fields)) => write!(f, "select:{}", fields.join(",")), + Stage::Transform(Transform::Count) => write!(f, "count"), + Stage::Transform(Transform::Connectivity) => write!(f, "connectivity"), Stage::Transform(Transform::DominatingSet) => write!(f, "dominating-set"), Stage::Algorithm(a) => write!(f, "{}", a.algo), } @@ -613,7 +425,7 @@ fn run_generator(g: &Generator, store: &Store) -> Vec<(String, f64)> { } } -fn eval_filter(filt: &Filter, key: &str, store: &Store, now: i64) -> bool { +pub fn eval_filter(filt: &Filter, key: &str, store: &Store, now: i64) -> bool { let node = match store.nodes.get(key) { Some(n) => n, None => return false, @@ -686,6 +498,39 @@ pub fn run_transform( sb.total_cmp(&sa) // most isolated first }); } + SortField::Key => { + items.sort_by(|a, b| a.0.cmp(&b.0)); + } + SortField::Named(field, asc) => { + // Resolve field from node properties + let resolve = |key: &str| -> Option { + let node = store.nodes.get(key)?; + match field.as_str() { + "weight" => Some(node.weight as f64), + "emotion" => Some(node.emotion as f64), + "retrievals" => Some(node.retrievals as f64), + "uses" => Some(node.uses as f64), + "wrongs" => Some(node.wrongs as f64), + "created" => Some(node.created_at as f64), + "timestamp" => Some(node.timestamp as f64), + "degree" => Some(graph.degree(key) as f64), + "content_len" => Some(node.content.len() as f64), + _ => None, + } + }; + let asc = *asc; + items.sort_by(|a, b| { + let va = resolve(&a.0); + let vb = resolve(&b.0); + let ord = match (va, vb) { + (Some(a), Some(b)) => a.total_cmp(&b), + (Some(_), None) => std::cmp::Ordering::Less, + (None, Some(_)) => std::cmp::Ordering::Greater, + (None, None) => a.0.cmp(&b.0), + }; + if asc { ord } else { ord.reverse() } + }); + } SortField::Priority => { // Pre-compute priorities to avoid O(n log n) calls // inside the sort comparator. @@ -725,6 +570,8 @@ pub fn run_transform( items.truncate(*n); items } + // Output mode directives - don't modify result set, handled at output layer + Transform::Select(_) | Transform::Count | Transform::Connectivity => items, Transform::DominatingSet => { // Greedy 3-covering dominating set: pick the node that covers // the most under-covered neighbors, repeat until every node diff --git a/src/hippocampus/query/parser.rs b/src/hippocampus/query/parser.rs index 64ffc00..e755367 100644 --- a/src/hippocampus/query/parser.rs +++ b/src/hippocampus/query/parser.rs @@ -26,6 +26,12 @@ use crate::graph::Graph; use regex::Regex; use std::collections::BTreeMap; +// Re-export engine types used by Query +pub use super::engine::{ + Stage, Filter, Transform, Generator, SortField, + Algorithm, AlgoStage, Cmp, +}; + // -- AST types -- #[derive(Debug, Clone)] @@ -57,16 +63,6 @@ pub enum CmpOp { Gt, Lt, Ge, Le, Eq, Ne, Match, } -#[derive(Debug, Clone)] -pub enum Stage { - Sort { field: String, ascending: bool }, - Limit(usize), - Select(Vec), - Count, - Connectivity, - DominatingSet, -} - #[derive(Debug, Clone)] pub struct Query { pub expr: Expr, @@ -86,18 +82,54 @@ peg::parser! { = s:(_ "|" _ s:stage() { s })* { s } rule stage() -> Stage - = "sort" _ f:field() _ a:asc_desc() { Stage::Sort { field: f, ascending: a } } - / "limit" _ n:integer() { Stage::Limit(n) } - / "select" _ f:field_list() { Stage::Select(f) } - / "count" { Stage::Count } - / "connectivity" { Stage::Connectivity } - / "dominating-set" { Stage::DominatingSet } + // Original PEG syntax (space-separated) + = "sort" _ f:field() _ a:asc_desc() { + Stage::Transform(Transform::Sort(make_sort_field(&f, a))) + } + / "limit" _ n:integer() { Stage::Transform(Transform::Limit(n)) } + / "select" _ f:field_list() { Stage::Transform(Transform::Select(f)) } + / "count" { Stage::Transform(Transform::Count) } + / "connectivity" { Stage::Transform(Transform::Connectivity) } + / "dominating-set" { Stage::Transform(Transform::DominatingSet) } + // Pipeline syntax (colon-separated) + / "sort:" f:field() { Stage::Transform(Transform::Sort(make_sort_field(&f, false))) } + / "limit:" n:integer() { Stage::Transform(Transform::Limit(n)) } + / "select:" f:field_list_colon() { Stage::Transform(Transform::Select(f)) } + / "type:" t:ident() { make_type_filter(&t) } + / "age:" c:cmp_duration() { Stage::Filter(Filter::Age(c)) } + / "key:" g:ident() { Stage::Filter(Filter::KeyGlob(g)) } + / "provenance:" p:ident() { Stage::Filter(Filter::Provenance(p)) } + / "all" { Stage::Generator(Generator::All) } + // Graph algorithms + / "spread" { Stage::Algorithm(AlgoStage { algo: Algorithm::Spread, params: std::collections::HashMap::new() }) } + / "spectral" { Stage::Algorithm(AlgoStage { algo: Algorithm::Spectral, params: std::collections::HashMap::new() }) } rule asc_desc() -> bool = "asc" { true } / "desc" { false } / { false } // default: descending + rule field_list_colon() -> Vec + = f:field() fs:("," f:field() { f })* { + let mut v = vec![f]; + v.extend(fs); + v + } + + rule cmp_duration() -> Cmp + = ">=" n:duration() { Cmp::Gte(n) } + / "<=" n:duration() { Cmp::Lte(n) } + / ">" n:duration() { Cmp::Gt(n) } + / "<" n:duration() { Cmp::Lt(n) } + / "=" n:duration() { Cmp::Eq(n) } + + rule duration() -> f64 + = n:number() "d" { n * 86400.0 } + / n:number() "h" { n * 3600.0 } + / n:number() "m" { n * 60.0 } + / n:number() "s" { n } + / n:number() { n } + rule field_list() -> Vec = f:field() fs:(_ "," _ f:field() { f })* { let mut v = vec![f]; @@ -122,6 +154,7 @@ peg::parser! { Expr::Comparison { field: f, op, value: v } } "*" { Expr::All } + "all" { Expr::All } "(" _ e:expr() _ ")" { e } } @@ -167,6 +200,55 @@ peg::parser! { } } +// -- Helper functions for PEG grammar -- + +fn make_sort_field(field: &str, ascending: bool) -> SortField { + match field { + "priority" => SortField::Priority, + "timestamp" => SortField::Timestamp, + "content-len" | "content_len" => SortField::ContentLen, + "degree" => SortField::Degree, + "weight" => SortField::Weight, + "isolation" => SortField::Isolation, + "key" => SortField::Key, + _ => SortField::Named(field.to_string(), ascending), + } +} + +fn make_type_filter(type_name: &str) -> Stage { + let node_type = match type_name { + "episodic" | "session" => NodeType::EpisodicSession, + "daily" => NodeType::EpisodicDaily, + "weekly" => NodeType::EpisodicWeekly, + "monthly" => NodeType::EpisodicMonthly, + "semantic" => NodeType::Semantic, + _ => NodeType::Semantic, // fallback + }; + Stage::Filter(Filter::Type(node_type)) +} + +/// Parse a query string into Vec for pipeline execution. +/// This is the unified entry point — replaces engine::Stage::parse_pipeline. +pub fn parse_stages(s: &str) -> Result, String> { + let q = query_parser::query(s) + .map_err(|e| format!("Parse error: {}", e))?; + + let mut stages = Vec::new(); + + // Convert Expr to a Generator stage + match &q.expr { + Expr::All => stages.push(Stage::Generator(Generator::All)), + _ => { + // For complex expressions, we need the Query-based path + // This shouldn't happen for pipeline queries + return Err("Complex expressions not supported in pipeline mode; use CLI query".into()); + } + } + + stages.extend(q.stages); + Ok(stages) +} + // -- Field resolution -- /// Resolve a field value from a node + graph context, returning a comparable Value. @@ -377,12 +459,12 @@ fn execute_parsed( let mut set = Vec::new(); for stage in &q.stages { match stage { - Stage::Select(fields) => { + Stage::Transform(Transform::Select(fields)) => { for f in fields { if !set.contains(f) { set.push(f.clone()); } } } - Stage::Sort { field, .. } => { + Stage::Transform(Transform::Sort(SortField::Named(field, _))) => { if !set.contains(field) { set.push(field.clone()); } } _ => {} @@ -404,37 +486,75 @@ fn execute_parsed( let mut has_sort = false; for stage in &q.stages { match stage { - Stage::Sort { field, ascending } => { + Stage::Transform(Transform::Sort(sort_field)) => { has_sort = true; - let asc = *ascending; - results.sort_by(|a, b| { - let va = a.fields.get(field).and_then(as_num); - let vb = b.fields.get(field).and_then(as_num); - let ord = match (va, vb) { - (Some(a), Some(b)) => a.total_cmp(&b), - _ => { - let sa = a.fields.get(field).map(as_str).unwrap_or_default(); - let sb = b.fields.get(field).map(as_str).unwrap_or_default(); - sa.cmp(&sb) - } - }; - if asc { ord } else { ord.reverse() } - }); + match sort_field { + SortField::Named(field, asc) => { + let asc = *asc; + let field = field.clone(); + results.sort_by(|a, b| { + let va = a.fields.get(&field).and_then(as_num); + let vb = b.fields.get(&field).and_then(as_num); + let ord = match (va, vb) { + (Some(a), Some(b)) => a.total_cmp(&b), + _ => { + let sa = a.fields.get(&field).map(as_str).unwrap_or_default(); + let sb = b.fields.get(&field).map(as_str).unwrap_or_default(); + sa.cmp(&sb) + } + }; + if asc { ord } else { ord.reverse() } + }); + } + SortField::Key => { + results.sort_by(|a, b| a.key.cmp(&b.key)); + } + SortField::Degree => { + results.sort_by(|a, b| { + let da = graph.degree(&a.key); + let db = graph.degree(&b.key); + db.cmp(&da) + }); + } + SortField::Weight => { + results.sort_by(|a, b| { + let wa = store.nodes.get(&a.key).map(|n| n.weight).unwrap_or(0.0); + let wb = store.nodes.get(&b.key).map(|n| n.weight).unwrap_or(0.0); + wb.total_cmp(&wa) + }); + } + SortField::Timestamp => { + results.sort_by(|a, b| { + let ta = store.nodes.get(&a.key).map(|n| n.timestamp).unwrap_or(0); + let tb = store.nodes.get(&b.key).map(|n| n.timestamp).unwrap_or(0); + tb.cmp(&ta) + }); + } + _ => {} // other sort fields handled by default degree sort + } } - Stage::Limit(n) => { + Stage::Transform(Transform::Limit(n)) => { results.truncate(*n); } - Stage::Connectivity => {} // handled in output - Stage::Select(_) | Stage::Count => {} // handled in output - Stage::DominatingSet => { + Stage::Transform(Transform::Connectivity) => {} // handled in output + Stage::Transform(Transform::Select(_) | Transform::Count) => {} // handled in output + Stage::Transform(Transform::DominatingSet) => { let mut items: Vec<(String, f64)> = results.iter() .map(|r| (r.key.clone(), graph.degree(&r.key) as f64)) .collect(); - let xform = super::engine::Transform::DominatingSet; + let xform = Transform::DominatingSet; items = super::engine::run_transform(&xform, items, store, graph); let keep: std::collections::HashSet = items.into_iter().map(|(k, _)| k).collect(); results.retain(|r| keep.contains(&r.key)); } + Stage::Filter(filt) => { + // Apply filter to narrow results + let now = crate::store::now_epoch(); + results.retain(|r| super::engine::eval_filter(filt, &r.key, store, now)); + } + Stage::Generator(_) | Stage::Algorithm(_) => { + // Generators are handled by Expr, algorithms not applicable here + } } } @@ -474,7 +594,7 @@ pub fn run_query(store: &Store, graph: &Graph, query_str: &str) -> Result<(), St let results = execute_parsed(store, graph, &q)?; // Count stage - if q.stages.iter().any(|s| matches!(s, Stage::Count)) { + if q.stages.iter().any(|s| matches!(s, Stage::Transform(Transform::Count))) { println!("{}", results.len()); return Ok(()); } @@ -485,14 +605,14 @@ pub fn run_query(store: &Store, graph: &Graph, query_str: &str) -> Result<(), St } // Connectivity stage - if q.stages.iter().any(|s| matches!(s, Stage::Connectivity)) { + if q.stages.iter().any(|s| matches!(s, Stage::Transform(Transform::Connectivity))) { print_connectivity(&results, graph); return Ok(()); } // Select stage let fields: Option<&Vec> = q.stages.iter().find_map(|s| match s { - Stage::Select(f) => Some(f), + Stage::Transform(Transform::Select(f)) => Some(f), _ => None, }); @@ -527,7 +647,7 @@ pub fn query_to_string(store: &Store, graph: &Graph, query_str: &str) -> Result< let results = execute_parsed(store, graph, &q)?; - if q.stages.iter().any(|s| matches!(s, Stage::Count)) { + if q.stages.iter().any(|s| matches!(s, Stage::Transform(Transform::Count))) { return Ok(results.len().to_string()); } if results.is_empty() { @@ -535,7 +655,7 @@ pub fn query_to_string(store: &Store, graph: &Graph, query_str: &str) -> Result< } let fields: Option<&Vec> = q.stages.iter().find_map(|s| match s { - Stage::Select(f) => Some(f), + Stage::Transform(Transform::Select(f)) => Some(f), _ => None, }); diff --git a/src/subconscious/defs.rs b/src/subconscious/defs.rs index 1fe8377..682b4fa 100644 --- a/src/subconscious/defs.rs +++ b/src/subconscious/defs.rs @@ -801,7 +801,7 @@ pub fn run_agent( // Run the query if present let keys = if !def.query.is_empty() { - let mut stages = search::Stage::parse_pipeline(&def.query)?; + let mut stages = crate::query_parser::parse_stages(&def.query)?; let has_limit = stages.iter().any(|s| matches!(s, search::Stage::Transform(search::Transform::Limit(_)))); if !has_limit {