diff --git a/research/query-language-unification.md b/research/query-language-unification.md deleted file mode 100644 index 57ceb98..0000000 --- a/research/query-language-unification.md +++ /dev/null @@ -1,94 +0,0 @@ -# Query Language Unification Plan - -**Status: DONE** (2026-04-11) - -## Problem (was) - -Two query parsers that didn't agree on syntax: - -1. **PEG parser** (`hippocampus/query/parser.rs`) — boolean logic, general - comparisons, operator precedence, parentheses. Used by CLI and compact - format path in `query()` tool. - -2. **Pipeline parser** (`hippocampus/query/engine.rs`) — domain-specific - filters (type, age, provenance), graph algorithms (spread, spectral). - Used by full format path in `query()` tool. - -`journal_tail` generates pipeline syntax but gets routed through the PEG -parser on the compact path. Result: parse errors. - -## Approach - -Keep the PEG parser (has the harder-to-build structural foundation), -extend it with the pipeline parser's domain features. - -## Expression extensions (add to `expr` rule in parser.rs) - -- `field:value` shorthand for `field = 'value'` (colon-separated equality) -- `*` already works as `Expr::All` -- `key ~ 'glob'` already works via match operator - -## New stages (add to `stage` rule in parser.rs) - -Domain filter stages from engine.rs: -- `type:X` — filter by node type (episodic, daily, weekly, monthly, semantic) -- `age:<7d` — duration comparison on timestamp -- `key:GLOB` — glob match on key -- `provenance:X` — provenance filter -- `weight:>N` — weight comparison (may already work via general comparison) -- `content-len:>N` — content size filter - -Sort/limit syntax variants: -- `sort:field` alongside existing `sort field` -- `limit:N` alongside existing `limit N` - -Graph algorithms: -- `spread` — spreading activation -- `spectral` — spectral nearest neighbors -- `confluence` — multi-source reachability -- `geodesic` — straightest spectral paths -- `manifold` — extrapolation along seed direction - -## What changes - -1. `parser.rs` — add field:value shorthand to expr, add domain stages -2. `engine.rs` — keep run_pipeline execution logic, have PEG parser emit - compatible Stage types (or convert PEG AST to Stage at boundary) -3. `query()` tool handler (memory.rs) — one parser path for all formats -4. `journal_tail` (memory.rs) — generate unified syntax -5. CLI `poc-memory query` — uses unified parser - -## Migration path - -1. Add field:value shorthand and type/age/key stages to PEG parser -2. Route query() through PEG parser for all formats -3. Migrate journal_tail and any other pipeline-syntax callers -4. Remove the pipeline parser (or keep as internal execution layer) - -## What was done - -**Deleted from engine.rs (-153 lines):** -- `Stage::parse()` and `Stage::parse_pipeline()` — redundant with PEG -- `parse_cmp()`, `parse_duration_or_number()`, `parse_composite_sort()`, - `parse_node_type()`, `parse_sort_field()` — helper functions for deleted parser - -**Added to parser.rs (+120 lines):** -- Pipeline syntax in PEG grammar (`type:X`, `age:` -- Grammar helper functions - -**Net: +17 lines** - -**Architecture now:** -- parser.rs: PEG grammar handles ALL parsing (both syntaxes) -- engine.rs: Pure execution — types and `run_query()`, no parsing - -Result: `all | type:episodic | sort:timestamp | limit:5` works everywhere. -Mixed syntax like `degree > 5 | type:semantic | sort degree` also works. - -## What NOT to change (original note) - -The run_pipeline execution logic stays — it's correct and well-tested. -Only the parsing front-end unifies. The pipeline parser's Stage enum -becomes the internal representation that both the PEG parser and any -remaining direct callers produce. diff --git a/research/sparse-kernel-napkin-math.md b/research/sparse-kernel-napkin-math.md deleted file mode 100644 index b732b55..0000000 --- a/research/sparse-kernel-napkin-math.md +++ /dev/null @@ -1,198 +0,0 @@ -# Sparse Kernel Compilation: Napkin Math for Qwen3.5-27B - -## Architecture recap - -| Parameter | Value | -|-----------|-------| -| Layers | 64 (48 linear attention, 16 full attention) | -| Hidden dim (H) | 5,120 | -| Query heads | 24 | -| KV heads | 4 (GQA) | -| Head dim | 256 | -| FFN intermediate | 17,408 | -| Full attention interval | every 4th layer | -| Total params | ~27.5B | - -**Key discovery**: Qwen3.5 already uses sparse attention — 48/64 layers use -linear attention (O(N)), only 16 use full O(N^2) attention. The attention -sparsity question is partially answered by the architecture itself. The -remaining opportunity is **weight sparsity** in the projection and FFN matrices. - -## Measured weight sparsity (from B200, all 11 shards) - -| Component | Count | Total params | <0.001 | <0.005 | <0.01 | <0.02 | -|-----------|-------|-------------|--------|--------|-------|-------| -| down_proj | 65 | 5,793,382,400 | 7.4% | 35.8% | 64.3% | 92.8% | -| gate_proj | 65 | 5,793,382,400 | 7.2% | 35.0% | 63.3% | 92.3% | -| up_proj | 65 | 5,793,382,400 | 7.3% | 35.2% | 63.5% | 92.5% | -| q_proj | 17 | 1,069,547,520 | 5.7% | 27.9% | 51.9% | 82.8% | -| k_proj | 17 | 89,128,960 | 6.0% | 29.4% | 54.1% | 84.1% | -| v_proj | 17 | 89,128,960 | 4.8% | 23.7% | 44.7% | 74.2% | -| o_proj | 17 | 534,773,760 | 5.3% | 25.9% | 48.7% | 79.6% | -| other | 605 | 6,070,652,272 | 5.4% | 26.4% | 49.6% | 80.9% | - -**Key findings**: -- FFN layers (gate/up/down) are remarkably sparse: ~64% of weights below 0.01 -- At threshold 0.02, FFN sparsity exceeds 92% -- Attention projections are less sparse but still significant: 45-52% below 0.01 -- v_proj is the least sparse component (44.7% below 0.01) - -## Per-layer parameter breakdown - -- **QKV projection**: H × (Q_dim + K_dim + V_dim) ≈ 5120 × 13312 ≈ 68M -- **Output projection**: ~26M -- **FFN (gate + up + down)**: 5120 × 17408 × 3 ≈ 267M -- **Total per layer**: ~361M -- **64 layers**: ~23.1B (rest is embeddings, norms, etc.) - -## Dense baseline: FLOPs per token - -Each weight parameter contributes 2 FLOPs per token (multiply + accumulate). - -- Per layer: ~722M FLOPs/token -- 64 layers: **~46.2B FLOPs/token** - -On a B200 (theoretical ~4.5 PFLOPS FP8, ~1.1 PFLOPS BF16): -- BF16 throughput: 46.2B / 1.1e15 ≈ 0.042ms per token (compute-bound limit) -- But inference is usually **memory-bandwidth-bound** for small batch sizes - -B200 HBM bandwidth: ~8 TB/s. At BF16 (2 bytes/param): -- Loading all weights once: 27.5B × 2 = 55GB → 55/8000 ≈ **6.9ms per token** (batch=1) -- This is the real bottleneck. Compute is cheap; loading weights is expensive. - -## What sparsity buys you - -At **X% sparsity** (X% of weights are zero), you need to load (1-X)% of the weights: - -| Sparsity | Params loaded | Time (batch=1) | Speedup | -|----------|--------------|-----------------|---------| -| 0% (dense) | 27.5B | 6.9ms | 1.0x | -| 50% | 13.8B | 3.4ms | 2.0x | -| 75% | 6.9B | 1.7ms | 4.0x | -| 90% | 2.8B | 0.7ms | 10x | - -**This is the key insight**: inference at small batch sizes is memory-bandwidth-bound. -Sparse weights = fewer bytes to load = directly proportional speedup, -**IF** the sparse kernel can avoid the gather/scatter overhead. - -## The compilation problem - -Dense GEMM is fast because: -1. Weights are contiguous in memory → sequential reads -2. Tiling fits perfectly in SRAM → high reuse -3. Hardware tensor cores expect dense blocks - -Naive sparse matmul kills this: -- Irregular memory access → low bandwidth utilization -- Poor SRAM tiling → cache thrashing -- Tensor cores can't help - -### The FlashAttention analogy - -FlashAttention's insight: the N×N attention matrix doesn't fit in SRAM, -but you can tile it so each tile does. You recompute instead of materializing. - -**Sparse kernel compilation insight**: the sparsity pattern is **known at compile time**. -A compiler can: - -1. **Analyze the sparsity graph** of each weight matrix -2. **Find blocks** of non-zero weights that are close in memory -3. **Generate a tiling schedule** that loads these blocks into SRAM efficiently -4. **Emit fused kernels** where the memory access pattern is baked in as constants - -The resulting kernel looks like a dense kernel to the hardware — -sequential reads, high SRAM reuse, maybe even tensor core compatible -(if the compiler finds dense sub-blocks within the sparse matrix). - -## Block sparsity vs unstructured sparsity - -**Block sparse** (e.g., 4×4 or 16×16 blocks zeroed out): -- GPU-friendly: blocks map to tensor core operations -- Less flexible: coarser pruning granularity → less achievable sparsity -- NVIDIA's 2:4 structured sparsity gets ~50% sparsity with tensor core support -- Real-world: typically 50-70% sparsity achievable without quality loss - -**Unstructured sparse** (individual weights zeroed): -- Maximally flexible: fine-grained pruning → higher achievable sparsity -- GPU-hostile: the gather/scatter problem -- Real-world: 80-95% sparsity achievable in many layers without quality loss - -**The compiled kernel approach bridges this**: take unstructured sparsity -(maximally flexible, high compression) and compile it into a kernel that -runs as efficiently as block-sparse. Best of both worlds. - -## Recurrent depth composability - -From our April 10 discussion: middle transformer layers are doing -open-coded simulated annealing — similar weights, similar computation. - -If layers 8-24 have cosine similarity > 0.95: -- Replace 16 layers with 1 layer × 16 iterations -- **Parameter reduction**: 16 × 361M = 5.8B → 361M (16x reduction for those layers) -- **Memory bandwidth**: load one layer's weights, iterate in SRAM -- Combined with 50% sparsity on the remaining unique layers: - - Unique layers (48): 48 × 361M × 0.5 = 8.7B params - - Recurrent layer: 361M × 0.5 = 180M params (but iterated 16x in SRAM) - - Total loaded per token: ~8.9B × 2 bytes = 17.8GB - - Time: 17.8/8000 ≈ **2.2ms per token** (vs 6.9ms dense) — **3.1x speedup** - -With higher sparsity (75%) + recurrence: - - Unique layers: 48 × 361M × 0.25 = 4.3B - - Recurrent: 90M - - Total: ~4.4B × 2 = 8.8GB → **1.1ms per token** — **6.3x speedup** - -## What needs to happen - -### Phase 1: Measure (can do now with B200 access) -1. Extract all weight matrices from Qwen3.5-27B -2. For each matrix, compute: - - Magnitude distribution (what % of weights are near-zero?) - - Achievable sparsity at various thresholds (L1 magnitude pruning) - - Dense sub-block statistics (how many 4×4, 16×16 blocks are all-zero?) -3. Layer similarity: pairwise cosine similarity of weight matrices across layers - - Which layers are nearly identical? (recurrence candidates) -4. Validate quality: run perplexity eval at various sparsity levels - -### Phase 2: Compile (research project) -1. For a single sparse weight matrix, generate an optimized Triton kernel -2. Benchmark vs dense GEMM and vs NVIDIA's 2:4 sparse -3. Iterate on the tiling strategy - -### Phase 3: End-to-end -1. Full model with compiled sparse kernels -2. Perplexity + latency benchmarks -3. Compare: dense, 2:4 structured, compiled unstructured - -## Related work to read - -- **SparseGPT** (Frantar & Alistarh, 2023): one-shot pruning to 50-60% unstructured sparsity - with minimal quality loss. Key result: large models are more prunable than small ones. -- **Wanda** (Sun et al., 2023): pruning by weight magnitude × input activation. - Simpler than SparseGPT, comparable results. -- **NVIDIA 2:4 sparsity**: hardware-supported structured sparsity on Ampere+. - 50% sparsity, ~2x speedup on tensor cores. The existence proof that sparse can be fast. -- **Triton** (Tillet et al.): Python DSL for GPU kernel generation. - The right compilation target — can express arbitrary tiling strategies. -- **TACO** (Kjolstad et al.): tensor algebra compiler. Generates kernels for - specific sparse tensor formats. Academic but the ideas are right. -- **FlashAttention** (Dao et al.): the tiling strategy to learn from. -- **DejaVu** (Liu et al., 2023): contextual sparsity — predicting which neurons - to activate per input. Dynamic sparsity, complementary to weight sparsity. - -## The bigger picture - -Current state of the art: dense models with FlashAttention for the N×N attention part. -Weight sparsity is known to work (SparseGPT, Wanda) but isn't deployed because -the GPU kernels don't exist to exploit it efficiently. - -The gap: nobody has built a compiler that takes a specific sparse weight matrix -and emits a kernel optimized for that exact pattern. FlashAttention proved -that custom kernels for specific computational patterns beat general-purpose ones. -The same should hold for sparse weight patterns. - -**The bet**: a compiled sparse kernel for Qwen3.5-27B's actual sparsity pattern -would be within 80% of the theoretical bandwidth-bound speedup. If true, -50% sparsity → 1.6x real speedup, 75% → 3.2x, composing with recurrent depth -for potentially 5-6x total. - -That would make 27B inference as fast as a 5B dense model, with 27B quality. diff --git a/src/agent/context.rs b/src/agent/context.rs index 2e54391..2b8bf34 100644 --- a/src/agent/context.rs +++ b/src/agent/context.rs @@ -764,7 +764,7 @@ impl ContextState { pub fn conversation(&self) -> &[AstNode] { &self.conversation } pub fn conversation_mut(&mut self) -> &mut Vec { &mut self.conversation } - pub fn sections(&self) -> [&Vec; 4] { + fn sections(&self) -> [&Vec; 4] { [&self.system, &self.identity, &self.journal, &self.conversation] } } diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 349fe91..e371a53 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -192,15 +192,12 @@ impl Agent { prompt_file: String, conversation_log: Option, active_tools: tools::ActiveTools, - agent_tools: Vec, ) -> Arc { let mut context = ContextState::new(); context.conversation_log = conversation_log; context.push_no_log(Section::System, AstNode::system_msg(&system_prompt)); - let mut tool_defs: Vec = agent_tools.iter().map(|t| t.to_json()).collect(); - tool_defs.extend(tools::all_mcp_tool_definitions().await); - + let tool_defs = tools::all_tool_definitions().await; if !tool_defs.is_empty() { let tools_text = format!( "# Tools\n\nYou have access to the following functions:\n\n\n{}\n\n\n\ @@ -226,7 +223,7 @@ impl Agent { session_id, context: tokio::sync::Mutex::new(context), state: tokio::sync::Mutex::new(AgentState { - tools: agent_tools, + tools: tools::tools(), mcp_tools: McpToolAccess::All, last_prompt_tokens: 0, reasoning_effort: "none".to_string(), diff --git a/src/agent/oneshot.rs b/src/agent/oneshot.rs index 9a2fed9..cc590bb 100644 --- a/src/agent/oneshot.rs +++ b/src/agent/oneshot.rs @@ -138,7 +138,6 @@ impl AutoAgent { app, String::new(), None, super::tools::ActiveTools::new(), - super::tools::tools(), ).await; { let mut st = agent.state.lock().await; diff --git a/src/agent/tools/memory.rs b/src/agent/tools/memory.rs index f526d44..76b7934 100644 --- a/src/agent/tools/memory.rs +++ b/src/agent/tools/memory.rs @@ -260,7 +260,7 @@ async fn query(args: &serde_json::Value) -> Result { let store = arc.lock().await; let graph = store.build_graph(); - let stages = crate::query_parser::parse_stages(query_str) + let stages = crate::search::Stage::parse_pipeline(query_str) .map_err(|e| anyhow::anyhow!("{}", e))?; let results = crate::search::run_query(&stages, vec![], &graph, &store, false, 100); let keys: Vec = results.into_iter().map(|(k, _)| k).collect(); @@ -272,61 +272,12 @@ async fn query(args: &serde_json::Value) -> Result { Ok(crate::subconscious::prompts::format_nodes_section(&store, &items, &graph)) } _ => { - // Compact output: check for count/select stages, else just list keys - use crate::search::{Stage, Transform}; - let has_count = stages.iter().any(|s| matches!(s, Stage::Transform(Transform::Count))); - if has_count { - return Ok(keys.len().to_string()); - } - if keys.is_empty() { - return Ok("no results".to_string()); - } - let select_fields: Option<&Vec> = stages.iter().find_map(|s| match s { - Stage::Transform(Transform::Select(f)) => Some(f), - _ => None, - }); - if let Some(fields) = select_fields { - let mut out = String::from("key\t"); - out.push_str(&fields.join("\t")); - out.push('\n'); - for key in &keys { - out.push_str(key); - for f in fields { - out.push('\t'); - out.push_str(&resolve_field_str(&store, &graph, key, f)); - } - out.push('\n'); - } - Ok(out) - } else { - Ok(keys.join("\n")) - } + crate::query_parser::query_to_string(&store, &graph, query_str) + .map_err(|e| anyhow::anyhow!("{}", e)) } } } -fn resolve_field_str(store: &crate::store::Store, graph: &crate::graph::Graph, key: &str, field: &str) -> String { - let node = match store.nodes.get(key) { - Some(n) => n, - None => return "-".to_string(), - }; - match field { - "key" => key.to_string(), - "weight" => format!("{:.3}", node.weight), - "node_type" => format!("{:?}", node.node_type), - "provenance" => node.provenance.clone(), - "emotion" => format!("{}", node.emotion), - "retrievals" => format!("{}", node.retrievals), - "uses" => format!("{}", node.uses), - "wrongs" => format!("{}", node.wrongs), - "created" => format!("{}", node.created_at), - "timestamp" => format!("{}", node.timestamp), - "degree" => format!("{}", graph.degree(key)), - "content_len" => format!("{}", node.content.len()), - _ => "-".to_string(), - } -} - // ── Journal tools ────────────────────────────────────────────── async fn journal_tail(args: &serde_json::Value) -> Result { diff --git a/src/agent/tools/mod.rs b/src/agent/tools/mod.rs index b873a11..ce42c9e 100644 --- a/src/agent/tools/mod.rs +++ b/src/agent/tools/mod.rs @@ -195,10 +195,6 @@ pub async fn all_tool_definitions() -> Vec { defs } -pub async fn all_mcp_tool_definitions() -> Vec { - mcp_client::tool_definitions_json().await -} - /// Memory + journal tools only — for subconscious agents. pub fn memory_and_journal_tools() -> Vec { let mut all = memory::memory_tools().to_vec(); diff --git a/src/cli/agent.rs b/src/cli/agent.rs index d2d05c8..c6ae426 100644 --- a/src/cli/agent.rs +++ b/src/cli/agent.rs @@ -25,7 +25,7 @@ pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option target.to_vec() } else if let Some(q) = query { let graph = store.build_graph(); - let stages = crate::query_parser::parse_stages(q)?; + let stages = crate::search::Stage::parse_pipeline(q)?; let results = crate::search::run_query(&stages, vec![], &graph, &store, false, count); if results.is_empty() { return Err(format!("query returned no results: {}", q)); diff --git a/src/cli/misc.rs b/src/cli/misc.rs index 890d8ab..5e49b56 100644 --- a/src/cli/misc.rs +++ b/src/cli/misc.rs @@ -3,26 +3,25 @@ pub fn cmd_search(terms: &[String], pipeline_args: &[String], expand: bool, full: bool, debug: bool, fuzzy: bool, content: bool) -> Result<(), String> { use std::collections::BTreeMap; - use crate::search::{Stage, Algorithm, AlgoStage}; // When running inside an agent session, exclude already-surfaced nodes let seen = crate::session::HookSession::from_env() .map(|s| s.seen()) .unwrap_or_default(); - // Build pipeline: if args provided, parse them; otherwise default to spread - let stages: Vec = if pipeline_args.is_empty() { - vec![Stage::Algorithm(AlgoStage { algo: Algorithm::Spread, params: std::collections::HashMap::new() })] + // Parse pipeline stages (unified: algorithms, filters, transforms, generators) + let stages: Vec = if pipeline_args.is_empty() { + vec![crate::search::Stage::Algorithm(crate::search::AlgoStage::parse("spread").unwrap())] } else { - // Join args with | and parse as unified query - let pipeline_str = format!("all | {}", pipeline_args.join(" | ")); - crate::query_parser::parse_stages(&pipeline_str)? + pipeline_args.iter() + .map(|a| crate::search::Stage::parse(a)) + .collect::, _>>()? }; // Check if pipeline needs full Store (has filters/transforms/generators) - let needs_store = stages.iter().any(|s| !matches!(s, Stage::Algorithm(_))); + let needs_store = stages.iter().any(|s| !matches!(s, crate::search::Stage::Algorithm(_))); // Check if pipeline starts with a generator (doesn't need seed terms) - let has_generator = stages.first().map(|s| matches!(s, Stage::Generator(_))).unwrap_or(false); + let has_generator = stages.first().map(|s| matches!(s, crate::search::Stage::Generator(_))).unwrap_or(false); if terms.is_empty() && !has_generator { return Err("search requires terms or a generator stage (e.g. 'all')".into()); diff --git a/src/hippocampus/query/engine.rs b/src/hippocampus/query/engine.rs index 28ca344..915ec14 100644 --- a/src/hippocampus/query/engine.rs +++ b/src/hippocampus/query/engine.rs @@ -157,9 +157,6 @@ pub enum Filter { pub enum Transform { Sort(SortField), Limit(usize), - Select(Vec), - Count, - Connectivity, DominatingSet, } @@ -171,8 +168,6 @@ pub enum SortField { Degree, Weight, Isolation, - Key, - Named(String, bool), // (field_name, ascending) Composite(Vec<(ScoreField, f64)>), } @@ -211,6 +206,79 @@ impl Cmp { } } +/// Parse a comparison like ">0.5", ">=60", "<7d" (durations converted to seconds). +fn parse_cmp(s: &str) -> Result { + let (op_len, ctor): (usize, fn(f64) -> Cmp) = if s.starts_with(">=") { + (2, Cmp::Gte) + } else if s.starts_with("<=") { + (2, Cmp::Lte) + } else if s.starts_with('>') { + (1, Cmp::Gt) + } else if s.starts_with('<') { + (1, Cmp::Lt) + } else if s.starts_with('=') { + (1, Cmp::Eq) + } else { + return Err(format!("expected comparison operator in '{}'", s)); + }; + + let val_str = &s[op_len..]; + let val = parse_duration_or_number(val_str)?; + Ok(ctor(val)) +} + +/// Parse "7d", "24h", "30m" as seconds, or plain numbers. +fn parse_duration_or_number(s: &str) -> Result { + if let Some(n) = s.strip_suffix('d') { + let v: f64 = n.parse().map_err(|_| format!("bad number: {}", n))?; + Ok(v * 86400.0) + } else if let Some(n) = s.strip_suffix('h') { + let v: f64 = n.parse().map_err(|_| format!("bad number: {}", n))?; + Ok(v * 3600.0) + } else if let Some(n) = s.strip_suffix('m') { + let v: f64 = n.parse().map_err(|_| format!("bad number: {}", n))?; + Ok(v * 60.0) + } else { + s.parse().map_err(|_| format!("bad number: {}", s)) + } +} + +/// Parse composite sort: "isolation*0.7+recency(linker)*0.3" +/// Each term is field or field(arg), optionally *weight (default 1.0). +fn parse_composite_sort(s: &str) -> Result, String> { + let mut terms = Vec::new(); + for term in s.split('+') { + let term = term.trim(); + let (field_part, weight) = if let Some((f, w)) = term.rsplit_once('*') { + (f, w.parse::().map_err(|_| format!("bad weight: {}", w))?) + } else { + (term, 1.0) + }; + + // Parse field, possibly with (arg) + let field = if let Some((name, arg)) = field_part.split_once('(') { + let arg = arg.strip_suffix(')').ok_or("missing ) in sort field")?; + match name { + "recency" => ScoreField::Recency(arg.to_string()), + _ => return Err(format!("unknown parameterized sort field: {}", name)), + } + } else { + match field_part { + "isolation" => ScoreField::Isolation, + "degree" => ScoreField::Degree, + "weight" => ScoreField::Weight, + "content-len" => ScoreField::ContentLen, + "priority" => ScoreField::Priority, + _ => return Err(format!("unknown sort field: {}", field_part)), + } + }; + terms.push((field, weight)); + } + if terms.is_empty() { + return Err("empty composite sort".into()); + } + Ok(terms) +} /// Compute a 0-1 score for a node on a single dimension. fn score_field( @@ -280,6 +348,129 @@ impl CompositeCache { } } +/// Parse a NodeType from a label. +fn parse_node_type(s: &str) -> Result { + match s { + "episodic" | "session" => Ok(NodeType::EpisodicSession), + "daily" => Ok(NodeType::EpisodicDaily), + "weekly" => Ok(NodeType::EpisodicWeekly), + "monthly" => Ok(NodeType::EpisodicMonthly), + "semantic" => Ok(NodeType::Semantic), + _ => Err(format!("unknown node type: {} (use: episodic, semantic, daily, weekly, monthly)", s)), + } +} + +impl Stage { + /// Parse a single stage from a string. + /// + /// Algorithm names are tried first (bare words), then predicate syntax + /// (contains ':'). No ambiguity since algorithms are bare words. + pub fn parse(s: &str) -> Result { + let s = s.trim(); + let (negated, s) = if let Some(rest) = s.strip_prefix('!') { + (true, rest) + } else { + (false, s) + }; + + // Generator: "all" + if s == "all" { + return Ok(Stage::Generator(Generator::All)); + } + + // Transform: "dominating-set" + if s == "dominating-set" { + return Ok(Stage::Transform(Transform::DominatingSet)); + } + + // Try algorithm parse first (bare words, no colon) + if !s.contains(':') + && let Ok(algo) = AlgoStage::parse(s) { + return Ok(Stage::Algorithm(algo)); + } + + // Algorithm with params: "spread,max_hops=4" (contains comma but no colon) + if s.contains(',') && !s.contains(':') { + return AlgoStage::parse(s).map(Stage::Algorithm); + } + + // Predicate/transform syntax: "key:value" + let (prefix, value) = s.split_once(':') + .ok_or_else(|| format!("unknown stage: {}", s))?; + + let filter_or_transform = match prefix { + "type" => Stage::Filter(Filter::Type(parse_node_type(value)?)), + "key" => Stage::Filter(Filter::KeyGlob(value.to_string())), + "weight" => Stage::Filter(Filter::Weight(parse_cmp(value)?)), + "age" => Stage::Filter(Filter::Age(parse_cmp(value)?)), + "content-len" => Stage::Filter(Filter::ContentLen(parse_cmp(value)?)), + "provenance" => { + Stage::Filter(Filter::Provenance(value.to_string())) + } + "not-visited" => { + let (agent, dur) = value.split_once(',') + .ok_or("not-visited:AGENT,DURATION")?; + let secs = parse_duration_or_number(dur)?; + Stage::Filter(Filter::NotVisited { + agent: agent.to_string(), + duration: secs as i64, + }) + } + "visited" => Stage::Filter(Filter::Visited { + agent: value.to_string(), + }), + "sort" => { + // Check for composite sort: field*weight+field*weight+... + let field = if value.contains('+') || value.contains('*') { + SortField::Composite(parse_composite_sort(value)?) + } else { + match value { + "priority" => SortField::Priority, + "timestamp" => SortField::Timestamp, + "content-len" => SortField::ContentLen, + "degree" => SortField::Degree, + "weight" => SortField::Weight, + "isolation" => SortField::Isolation, + _ => return Err(format!("unknown sort field: {}", value)), + } + }; + Stage::Transform(Transform::Sort(field)) + } + "limit" => { + let n: usize = value.parse() + .map_err(|_| format!("bad limit: {}", value))?; + Stage::Transform(Transform::Limit(n)) + } + "match" => { + let terms: Vec = value.split(',') + .map(|t| t.to_string()) + .collect(); + Stage::Generator(Generator::Match(terms)) + } + // Algorithm with colon in params? Try fallback. + _ => return AlgoStage::parse(s).map(Stage::Algorithm) + .map_err(|_| format!("unknown stage: {}", s)), + }; + + // Apply negation to filters + if negated { + match filter_or_transform { + Stage::Filter(f) => Ok(Stage::Filter(Filter::Negated(Box::new(f)))), + _ => Err("! prefix only works on filter stages".to_string()), + } + } else { + Ok(filter_or_transform) + } + } + + /// Parse a pipe-separated pipeline string. + pub fn parse_pipeline(s: &str) -> Result, String> { + s.split('|') + .map(|part| Stage::parse(part.trim())) + .collect() + } +} + impl fmt::Display for Stage { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { @@ -288,9 +479,6 @@ impl fmt::Display for Stage { Stage::Filter(filt) => write!(f, "{}", filt), Stage::Transform(Transform::Sort(field)) => write!(f, "sort:{:?}", field), Stage::Transform(Transform::Limit(n)) => write!(f, "limit:{}", n), - Stage::Transform(Transform::Select(fields)) => write!(f, "select:{}", fields.join(",")), - Stage::Transform(Transform::Count) => write!(f, "count"), - Stage::Transform(Transform::Connectivity) => write!(f, "connectivity"), Stage::Transform(Transform::DominatingSet) => write!(f, "dominating-set"), Stage::Algorithm(a) => write!(f, "{}", a.algo), } @@ -425,7 +613,7 @@ fn run_generator(g: &Generator, store: &Store) -> Vec<(String, f64)> { } } -pub fn eval_filter(filt: &Filter, key: &str, store: &Store, now: i64) -> bool { +fn eval_filter(filt: &Filter, key: &str, store: &Store, now: i64) -> bool { let node = match store.nodes.get(key) { Some(n) => n, None => return false, @@ -498,39 +686,6 @@ pub fn run_transform( sb.total_cmp(&sa) // most isolated first }); } - SortField::Key => { - items.sort_by(|a, b| a.0.cmp(&b.0)); - } - SortField::Named(field, asc) => { - // Resolve field from node properties - let resolve = |key: &str| -> Option { - let node = store.nodes.get(key)?; - match field.as_str() { - "weight" => Some(node.weight as f64), - "emotion" => Some(node.emotion as f64), - "retrievals" => Some(node.retrievals as f64), - "uses" => Some(node.uses as f64), - "wrongs" => Some(node.wrongs as f64), - "created" => Some(node.created_at as f64), - "timestamp" => Some(node.timestamp as f64), - "degree" => Some(graph.degree(key) as f64), - "content_len" => Some(node.content.len() as f64), - _ => None, - } - }; - let asc = *asc; - items.sort_by(|a, b| { - let va = resolve(&a.0); - let vb = resolve(&b.0); - let ord = match (va, vb) { - (Some(a), Some(b)) => a.total_cmp(&b), - (Some(_), None) => std::cmp::Ordering::Less, - (None, Some(_)) => std::cmp::Ordering::Greater, - (None, None) => a.0.cmp(&b.0), - }; - if asc { ord } else { ord.reverse() } - }); - } SortField::Priority => { // Pre-compute priorities to avoid O(n log n) calls // inside the sort comparator. @@ -570,8 +725,6 @@ pub fn run_transform( items.truncate(*n); items } - // Output mode directives - don't modify result set, handled at output layer - Transform::Select(_) | Transform::Count | Transform::Connectivity => items, Transform::DominatingSet => { // Greedy 3-covering dominating set: pick the node that covers // the most under-covered neighbors, repeat until every node diff --git a/src/hippocampus/query/parser.rs b/src/hippocampus/query/parser.rs index e755367..64ffc00 100644 --- a/src/hippocampus/query/parser.rs +++ b/src/hippocampus/query/parser.rs @@ -26,12 +26,6 @@ use crate::graph::Graph; use regex::Regex; use std::collections::BTreeMap; -// Re-export engine types used by Query -pub use super::engine::{ - Stage, Filter, Transform, Generator, SortField, - Algorithm, AlgoStage, Cmp, -}; - // -- AST types -- #[derive(Debug, Clone)] @@ -63,6 +57,16 @@ pub enum CmpOp { Gt, Lt, Ge, Le, Eq, Ne, Match, } +#[derive(Debug, Clone)] +pub enum Stage { + Sort { field: String, ascending: bool }, + Limit(usize), + Select(Vec), + Count, + Connectivity, + DominatingSet, +} + #[derive(Debug, Clone)] pub struct Query { pub expr: Expr, @@ -82,54 +86,18 @@ peg::parser! { = s:(_ "|" _ s:stage() { s })* { s } rule stage() -> Stage - // Original PEG syntax (space-separated) - = "sort" _ f:field() _ a:asc_desc() { - Stage::Transform(Transform::Sort(make_sort_field(&f, a))) - } - / "limit" _ n:integer() { Stage::Transform(Transform::Limit(n)) } - / "select" _ f:field_list() { Stage::Transform(Transform::Select(f)) } - / "count" { Stage::Transform(Transform::Count) } - / "connectivity" { Stage::Transform(Transform::Connectivity) } - / "dominating-set" { Stage::Transform(Transform::DominatingSet) } - // Pipeline syntax (colon-separated) - / "sort:" f:field() { Stage::Transform(Transform::Sort(make_sort_field(&f, false))) } - / "limit:" n:integer() { Stage::Transform(Transform::Limit(n)) } - / "select:" f:field_list_colon() { Stage::Transform(Transform::Select(f)) } - / "type:" t:ident() { make_type_filter(&t) } - / "age:" c:cmp_duration() { Stage::Filter(Filter::Age(c)) } - / "key:" g:ident() { Stage::Filter(Filter::KeyGlob(g)) } - / "provenance:" p:ident() { Stage::Filter(Filter::Provenance(p)) } - / "all" { Stage::Generator(Generator::All) } - // Graph algorithms - / "spread" { Stage::Algorithm(AlgoStage { algo: Algorithm::Spread, params: std::collections::HashMap::new() }) } - / "spectral" { Stage::Algorithm(AlgoStage { algo: Algorithm::Spectral, params: std::collections::HashMap::new() }) } + = "sort" _ f:field() _ a:asc_desc() { Stage::Sort { field: f, ascending: a } } + / "limit" _ n:integer() { Stage::Limit(n) } + / "select" _ f:field_list() { Stage::Select(f) } + / "count" { Stage::Count } + / "connectivity" { Stage::Connectivity } + / "dominating-set" { Stage::DominatingSet } rule asc_desc() -> bool = "asc" { true } / "desc" { false } / { false } // default: descending - rule field_list_colon() -> Vec - = f:field() fs:("," f:field() { f })* { - let mut v = vec![f]; - v.extend(fs); - v - } - - rule cmp_duration() -> Cmp - = ">=" n:duration() { Cmp::Gte(n) } - / "<=" n:duration() { Cmp::Lte(n) } - / ">" n:duration() { Cmp::Gt(n) } - / "<" n:duration() { Cmp::Lt(n) } - / "=" n:duration() { Cmp::Eq(n) } - - rule duration() -> f64 - = n:number() "d" { n * 86400.0 } - / n:number() "h" { n * 3600.0 } - / n:number() "m" { n * 60.0 } - / n:number() "s" { n } - / n:number() { n } - rule field_list() -> Vec = f:field() fs:(_ "," _ f:field() { f })* { let mut v = vec![f]; @@ -154,7 +122,6 @@ peg::parser! { Expr::Comparison { field: f, op, value: v } } "*" { Expr::All } - "all" { Expr::All } "(" _ e:expr() _ ")" { e } } @@ -200,55 +167,6 @@ peg::parser! { } } -// -- Helper functions for PEG grammar -- - -fn make_sort_field(field: &str, ascending: bool) -> SortField { - match field { - "priority" => SortField::Priority, - "timestamp" => SortField::Timestamp, - "content-len" | "content_len" => SortField::ContentLen, - "degree" => SortField::Degree, - "weight" => SortField::Weight, - "isolation" => SortField::Isolation, - "key" => SortField::Key, - _ => SortField::Named(field.to_string(), ascending), - } -} - -fn make_type_filter(type_name: &str) -> Stage { - let node_type = match type_name { - "episodic" | "session" => NodeType::EpisodicSession, - "daily" => NodeType::EpisodicDaily, - "weekly" => NodeType::EpisodicWeekly, - "monthly" => NodeType::EpisodicMonthly, - "semantic" => NodeType::Semantic, - _ => NodeType::Semantic, // fallback - }; - Stage::Filter(Filter::Type(node_type)) -} - -/// Parse a query string into Vec for pipeline execution. -/// This is the unified entry point — replaces engine::Stage::parse_pipeline. -pub fn parse_stages(s: &str) -> Result, String> { - let q = query_parser::query(s) - .map_err(|e| format!("Parse error: {}", e))?; - - let mut stages = Vec::new(); - - // Convert Expr to a Generator stage - match &q.expr { - Expr::All => stages.push(Stage::Generator(Generator::All)), - _ => { - // For complex expressions, we need the Query-based path - // This shouldn't happen for pipeline queries - return Err("Complex expressions not supported in pipeline mode; use CLI query".into()); - } - } - - stages.extend(q.stages); - Ok(stages) -} - // -- Field resolution -- /// Resolve a field value from a node + graph context, returning a comparable Value. @@ -459,12 +377,12 @@ fn execute_parsed( let mut set = Vec::new(); for stage in &q.stages { match stage { - Stage::Transform(Transform::Select(fields)) => { + Stage::Select(fields) => { for f in fields { if !set.contains(f) { set.push(f.clone()); } } } - Stage::Transform(Transform::Sort(SortField::Named(field, _))) => { + Stage::Sort { field, .. } => { if !set.contains(field) { set.push(field.clone()); } } _ => {} @@ -486,75 +404,37 @@ fn execute_parsed( let mut has_sort = false; for stage in &q.stages { match stage { - Stage::Transform(Transform::Sort(sort_field)) => { + Stage::Sort { field, ascending } => { has_sort = true; - match sort_field { - SortField::Named(field, asc) => { - let asc = *asc; - let field = field.clone(); - results.sort_by(|a, b| { - let va = a.fields.get(&field).and_then(as_num); - let vb = b.fields.get(&field).and_then(as_num); - let ord = match (va, vb) { - (Some(a), Some(b)) => a.total_cmp(&b), - _ => { - let sa = a.fields.get(&field).map(as_str).unwrap_or_default(); - let sb = b.fields.get(&field).map(as_str).unwrap_or_default(); - sa.cmp(&sb) - } - }; - if asc { ord } else { ord.reverse() } - }); - } - SortField::Key => { - results.sort_by(|a, b| a.key.cmp(&b.key)); - } - SortField::Degree => { - results.sort_by(|a, b| { - let da = graph.degree(&a.key); - let db = graph.degree(&b.key); - db.cmp(&da) - }); - } - SortField::Weight => { - results.sort_by(|a, b| { - let wa = store.nodes.get(&a.key).map(|n| n.weight).unwrap_or(0.0); - let wb = store.nodes.get(&b.key).map(|n| n.weight).unwrap_or(0.0); - wb.total_cmp(&wa) - }); - } - SortField::Timestamp => { - results.sort_by(|a, b| { - let ta = store.nodes.get(&a.key).map(|n| n.timestamp).unwrap_or(0); - let tb = store.nodes.get(&b.key).map(|n| n.timestamp).unwrap_or(0); - tb.cmp(&ta) - }); - } - _ => {} // other sort fields handled by default degree sort - } + let asc = *ascending; + results.sort_by(|a, b| { + let va = a.fields.get(field).and_then(as_num); + let vb = b.fields.get(field).and_then(as_num); + let ord = match (va, vb) { + (Some(a), Some(b)) => a.total_cmp(&b), + _ => { + let sa = a.fields.get(field).map(as_str).unwrap_or_default(); + let sb = b.fields.get(field).map(as_str).unwrap_or_default(); + sa.cmp(&sb) + } + }; + if asc { ord } else { ord.reverse() } + }); } - Stage::Transform(Transform::Limit(n)) => { + Stage::Limit(n) => { results.truncate(*n); } - Stage::Transform(Transform::Connectivity) => {} // handled in output - Stage::Transform(Transform::Select(_) | Transform::Count) => {} // handled in output - Stage::Transform(Transform::DominatingSet) => { + Stage::Connectivity => {} // handled in output + Stage::Select(_) | Stage::Count => {} // handled in output + Stage::DominatingSet => { let mut items: Vec<(String, f64)> = results.iter() .map(|r| (r.key.clone(), graph.degree(&r.key) as f64)) .collect(); - let xform = Transform::DominatingSet; + let xform = super::engine::Transform::DominatingSet; items = super::engine::run_transform(&xform, items, store, graph); let keep: std::collections::HashSet = items.into_iter().map(|(k, _)| k).collect(); results.retain(|r| keep.contains(&r.key)); } - Stage::Filter(filt) => { - // Apply filter to narrow results - let now = crate::store::now_epoch(); - results.retain(|r| super::engine::eval_filter(filt, &r.key, store, now)); - } - Stage::Generator(_) | Stage::Algorithm(_) => { - // Generators are handled by Expr, algorithms not applicable here - } } } @@ -594,7 +474,7 @@ pub fn run_query(store: &Store, graph: &Graph, query_str: &str) -> Result<(), St let results = execute_parsed(store, graph, &q)?; // Count stage - if q.stages.iter().any(|s| matches!(s, Stage::Transform(Transform::Count))) { + if q.stages.iter().any(|s| matches!(s, Stage::Count)) { println!("{}", results.len()); return Ok(()); } @@ -605,14 +485,14 @@ pub fn run_query(store: &Store, graph: &Graph, query_str: &str) -> Result<(), St } // Connectivity stage - if q.stages.iter().any(|s| matches!(s, Stage::Transform(Transform::Connectivity))) { + if q.stages.iter().any(|s| matches!(s, Stage::Connectivity)) { print_connectivity(&results, graph); return Ok(()); } // Select stage let fields: Option<&Vec> = q.stages.iter().find_map(|s| match s { - Stage::Transform(Transform::Select(f)) => Some(f), + Stage::Select(f) => Some(f), _ => None, }); @@ -647,7 +527,7 @@ pub fn query_to_string(store: &Store, graph: &Graph, query_str: &str) -> Result< let results = execute_parsed(store, graph, &q)?; - if q.stages.iter().any(|s| matches!(s, Stage::Transform(Transform::Count))) { + if q.stages.iter().any(|s| matches!(s, Stage::Count)) { return Ok(results.len().to_string()); } if results.is_empty() { @@ -655,7 +535,7 @@ pub fn query_to_string(store: &Store, graph: &Graph, query_str: &str) -> Result< } let fields: Option<&Vec> = q.stages.iter().find_map(|s| match s { - Stage::Transform(Transform::Select(f)) => Some(f), + Stage::Select(f) => Some(f), _ => None, }); diff --git a/src/mind/mod.rs b/src/mind/mod.rs index 38e8fdf..010829f 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -298,7 +298,6 @@ impl Mind { config.prompt_file.clone(), conversation_log, crate::agent::tools::ActiveTools::new(), - crate::agent::tools::tools(), ).await; let shared = Arc::new(std::sync::Mutex::new(MindState::new(config.app.dmn.max_turns))); diff --git a/src/mind/unconscious.rs b/src/mind/unconscious.rs index 5678b19..a137fe0 100644 --- a/src/mind/unconscious.rs +++ b/src/mind/unconscious.rs @@ -77,19 +77,20 @@ impl Unconscious { // Scan all .agent files, exclude subconscious-* and surface-observe let mut agents: Vec = Vec::new(); - let base_tools = tools::memory::memory_tools().to_vec(); - let extra_tools = tools::memory::journal_tools().to_vec(); + let all_tools = tools::memory::memory_tools().to_vec(); for def in defs::load_defs() { if def.agent.starts_with("subconscious-") { continue; } if def.agent == "surface-observe" { continue; } let enabled = enabled_map.get(&def.agent).copied() .unwrap_or(false); - let mut effective_tools = base_tools.clone(); - for name in &def.tools { - if let Some(t) = extra_tools.iter().find(|t| t.name == name) { - effective_tools.push(t.clone()); - } - } + let effective_tools: Vec = if def.tools.is_empty() { + all_tools.clone() + } else { + all_tools.iter() + .filter(|t| def.tools.iter().any(|w| w == t.name)) + .cloned() + .collect() + }; let steps: Vec = def.steps.iter().map(|s| AutoStep { prompt: s.prompt.clone(), phase: s.phase.clone(), @@ -281,11 +282,11 @@ impl Unconscious { client, system_prompt, personality, app, String::new(), None, crate::agent::tools::ActiveTools::new(), - auto.tools.clone(), ).await; { let mut st = agent.state.lock().await; st.provenance = format!("unconscious:{}", auto.name); + st.tools = auto.tools.clone(); st.priority = Some(10); } @@ -308,11 +309,14 @@ pub async fn save_agent_log(name: &str, agent: &std::sync::Arc = Vec::new(); - for section in ctx.sections() { - context.extend(section); - } - if let Ok(json) = serde_json::to_string_pretty(&context) { + let sections = serde_json::json!({ + "system": ctx.system(), + "identity": ctx.identity(), + "journal": ctx.journal(), + "conversation": ctx.conversation(), + "stats": stats, + }); + if let Ok(json) = serde_json::to_string_pretty(§ions) { let _ = std::fs::write(&path, json); } } diff --git a/src/subconscious/agents/digest.agent b/src/subconscious/agents/digest.agent index c342d25..a8e47d3 100644 --- a/src/subconscious/agents/digest.agent +++ b/src/subconscious/agents/digest.agent @@ -1,4 +1,4 @@ -{"agent": "digest", "schedule": "daily", "tools": ["journal_tail", "journal_new", "journal_update"]} +{"agent": "digest", "schedule": "daily"} # Digest Agent — Episodic Consolidation diff --git a/src/subconscious/defs.rs b/src/subconscious/defs.rs index 682b4fa..1fe8377 100644 --- a/src/subconscious/defs.rs +++ b/src/subconscious/defs.rs @@ -801,7 +801,7 @@ pub fn run_agent( // Run the query if present let keys = if !def.query.is_empty() { - let mut stages = crate::query_parser::parse_stages(&def.query)?; + let mut stages = search::Stage::parse_pipeline(&def.query)?; let has_limit = stages.iter().any(|s| matches!(s, search::Stage::Transform(search::Transform::Limit(_)))); if !has_limit {