From e736471d99320a8c3d83292e67ff623d6344b942 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Tue, 10 Mar 2026 15:22:12 -0400 Subject: [PATCH] search: unified query pipeline with filters, transforms, generators Extend the pipeline with four stage types composing left-to-right: Generators: all, match:TERM Filters: type:, key:, weight:, age:, content-len:, provenance:, not-visited:, visited: (plus ! negation) Transforms: sort:(priority|timestamp|content-len|degree|weight), limit:N Algorithms: spread, spectral, confluence, geodesic, manifold (unchanged) Duration syntax (7d, 24h, 30m) and glob matching on keys. CLI auto-detects filter/transform stages and loads full Store; algorithm-only pipelines keep the fast MmapView path. Co-Authored-By: ProofOfConcept --- poc-memory/src/main.rs | 168 ++++++++----- poc-memory/src/search.rs | 493 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 591 insertions(+), 70 deletions(-) diff --git a/poc-memory/src/main.rs b/poc-memory/src/main.rs index 0494b46..986f21e 100644 --- a/poc-memory/src/main.rs +++ b/poc-memory/src/main.rs @@ -598,81 +598,129 @@ fn cmd_search(terms: &[String], pipeline_args: &[String], expand: bool, full: bo use store::StoreView; use std::collections::BTreeMap; - if terms.is_empty() { - return Err("search requires at least one term".into()); + // Parse pipeline stages (unified: algorithms, filters, transforms, generators) + let stages: Vec = if pipeline_args.is_empty() { + vec![search::Stage::Algorithm(search::AlgoStage::parse("spread").unwrap())] + } else { + pipeline_args.iter() + .map(|a| search::Stage::parse(a)) + .collect::, _>>()? + }; + + // Check if pipeline needs full Store (has filters/transforms/generators) + let needs_store = stages.iter().any(|s| !matches!(s, search::Stage::Algorithm(_))); + // Check if pipeline starts with a generator (doesn't need seed terms) + let has_generator = stages.first().map(|s| matches!(s, search::Stage::Generator(_))).unwrap_or(false); + + if terms.is_empty() && !has_generator { + return Err("search requires terms or a generator stage (e.g. 'all')".into()); } let query: String = terms.join(" "); - // Parse pipeline (default: spread) - let pipeline: Vec = if pipeline_args.is_empty() { - vec![search::AlgoStage::parse("spread").unwrap()] - } else { - pipeline_args.iter() - .map(|a| search::AlgoStage::parse(a)) - .collect::, _>>()? - }; - if debug { - let names: Vec = pipeline.iter().map(|s| format!("{}", s.algo)).collect(); + let names: Vec = stages.iter().map(|s| format!("{}", s)).collect(); println!("[search] pipeline: {}", names.join(" → ")); } - let view = store::AnyView::load()?; - let graph = graph::build_graph_fast(&view); - - // Build equal-weight terms from query - let terms: BTreeMap = query.split_whitespace() - .map(|t| (t.to_lowercase(), 1.0)) - .collect(); - - let (seeds, direct_hits) = search::match_seeds(&terms, &view); - - if seeds.is_empty() { - eprintln!("No results for '{}'", query); - return Ok(()); - } - - if debug { - println!("[search] {} seeds from query '{}'", seeds.len(), query); - for (key, score) in &seeds { - println!(" {:.4} {}", score, key); - } - } - let max_results = if expand { 15 } else { 5 }; - let raw = search::run_pipeline(&pipeline, seeds, &graph, &view, debug, max_results); - let results: Vec = raw.into_iter() - .map(|(key, activation)| { - let is_direct = direct_hits.contains(&key); - search::SearchResult { key, activation, is_direct, snippet: None } - }) - .collect(); + if needs_store { + // Full Store path — needed for filter/transform/generator stages + let store = store::Store::load()?; + let graph = store.build_graph(); - if results.is_empty() { - eprintln!("No results for '{}'", query); - return Ok(()); - } + let seeds = if has_generator { + vec![] // generator will produce its own result set + } else { + let terms_map: BTreeMap = query.split_whitespace() + .map(|t| (t.to_lowercase(), 1.0)) + .collect(); + let (seeds, _) = search::match_seeds(&terms_map, &store); + seeds + }; - // Log retrieval - store::Store::log_retrieval_static(&query, - &results.iter().map(|r| r.key.clone()).collect::>()); + let raw = search::run_query(&stages, seeds, &graph, &store, debug, max_results); - let bump_keys: Vec<&str> = results.iter().take(max_results).map(|r| r.key.as_str()).collect(); - let _ = lookups::bump_many(&bump_keys); + if raw.is_empty() { + eprintln!("No results"); + return Ok(()); + } - for (i, r) in results.iter().enumerate().take(max_results) { - let marker = if r.is_direct { "→" } else { " " }; - let weight = view.node_weight(&r.key); - println!("{}{:2}. [{:.2}/{:.2}] {}", marker, i + 1, r.activation, weight, r.key); - if full { - if let Some(content) = view.node_content(&r.key) { - println!(); - for line in content.lines() { - println!(" {}", line); + for (i, (key, score)) in raw.iter().enumerate().take(max_results) { + let weight = store.nodes.get(key).map(|n| n.weight).unwrap_or(0.0); + println!("{:2}. [{:.2}/{:.2}] {}", i + 1, score, weight, key); + if full { + if let Some(node) = store.nodes.get(key) { + println!(); + for line in node.content.lines() { + println!(" {}", line); + } + println!(); + } + } + } + } else { + // Fast MmapView path — algorithm-only pipeline + let view = store::AnyView::load()?; + let graph = graph::build_graph_fast(&view); + + let terms_map: BTreeMap = query.split_whitespace() + .map(|t| (t.to_lowercase(), 1.0)) + .collect(); + let (seeds, direct_hits) = search::match_seeds(&terms_map, &view); + + if seeds.is_empty() { + eprintln!("No results for '{}'", query); + return Ok(()); + } + + if debug { + println!("[search] {} seeds from query '{}'", seeds.len(), query); + } + + // Extract AlgoStages from the unified stages + let algo_stages: Vec<&search::AlgoStage> = stages.iter() + .filter_map(|s| match s { + search::Stage::Algorithm(a) => Some(a), + _ => None, + }) + .collect(); + let algo_owned: Vec = algo_stages.into_iter().cloned().collect(); + + let raw = search::run_pipeline(&algo_owned, seeds, &graph, &view, debug, max_results); + + let results: Vec = raw.into_iter() + .map(|(key, activation)| { + let is_direct = direct_hits.contains(&key); + search::SearchResult { key, activation, is_direct, snippet: None } + }) + .collect(); + + if results.is_empty() { + eprintln!("No results for '{}'", query); + return Ok(()); + } + + // Log retrieval + store::Store::log_retrieval_static(&query, + &results.iter().map(|r| r.key.clone()).collect::>()); + + let bump_keys: Vec<&str> = results.iter().take(max_results).map(|r| r.key.as_str()).collect(); + let _ = lookups::bump_many(&bump_keys); + + for (i, r) in results.iter().enumerate().take(max_results) { + let marker = if r.is_direct { "→" } else { " " }; + let weight = view.node_weight(&r.key); + println!("{}{:2}. [{:.2}/{:.2}] {}", marker, i + 1, r.activation, weight, r.key); + if full { + if let Some(content) = view.node_content(&r.key) { + println!(); + for line in content.lines() { + println!(" {}", line); + } + println!(); } - println!(); } } } diff --git a/poc-memory/src/search.rs b/poc-memory/src/search.rs index a02fcd1..8ab5953 100644 --- a/poc-memory/src/search.rs +++ b/poc-memory/src/search.rs @@ -1,17 +1,44 @@ -// Memory search: composable algorithm pipeline. +// Memory search: composable query pipeline. // -// Each algorithm is a stage: takes seeds Vec<(String, f64)>, returns -// new/modified seeds. Stages compose left-to-right in a pipeline. +// The pipeline has four kinds of stages, all composing left-to-right: // -// Available algorithms: -// spread — spreading activation through graph edges -// spectral — nearest neighbors in spectral embedding space -// manifold — extrapolation along direction defined by seeds (TODO) +// Generators — produce a result set from nothing: +// all every non-deleted node +// match:TERM text match (current seed extraction) // -// Seed extraction (matching query terms to node keys) is shared -// infrastructure, not an algorithm stage. +// Filters — narrow an existing result set on node metadata: +// type:episodic node_type == EpisodicSession +// type:semantic node_type == Semantic +// type:daily node_type == EpisodicDaily +// type:weekly node_type == EpisodicWeekly +// type:monthly node_type == EpisodicMonthly +// key:GLOB glob match on key +// weight:>0.5 numeric comparison on weight +// age:<7d created/modified within duration +// content-len:>1000 content size filter +// provenance:manual provenance match +// not-visited:AGENT,DUR not seen by agent in duration +// visited:AGENT has been seen by agent +// +// Transforms — reorder or reshape: +// sort:priority consolidation priority scoring +// sort:timestamp by timestamp (desc) +// sort:content-len by content size +// sort:degree by graph degree +// sort:weight by weight +// limit:N truncate to N results +// +// Algorithms — graph exploration (existing): +// spread spreading activation +// spectral,k=20 spectral nearest neighbors +// confluence multi-source reachability +// geodesic straightest spectral paths +// manifold extrapolation along seed direction +// +// Stages are parsed from strings and composed via the -p flag or +// pipe-separated in agent definitions. -use crate::store::StoreView; +use crate::store::{Store, StoreView, NodeType, Provenance}; use crate::graph::Graph; use crate::spectral; @@ -96,6 +123,452 @@ impl AlgoStage { } } +// ── Unified query pipeline ────────────────────────────────────────── + +/// A pipeline stage: generator, filter, transform, or graph algorithm. +#[derive(Clone, Debug)] +pub enum Stage { + Generator(Generator), + Filter(Filter), + Transform(Transform), + Algorithm(AlgoStage), +} + +#[derive(Clone, Debug)] +pub enum Generator { + All, // every non-deleted node + Match(Vec), // text match seeds +} + +#[derive(Clone, Debug)] +pub enum Filter { + Type(NodeType), + KeyGlob(String), + Weight(Cmp), + Age(Cmp), // vs now - timestamp (seconds) + ContentLen(Cmp), + Provenance(Provenance), + NotVisited { agent: String, duration: i64 }, // seconds + Visited { agent: String }, + Negated(Box), +} + +#[derive(Clone, Debug)] +pub enum Transform { + Sort(SortField), + Limit(usize), +} + +#[derive(Clone, Debug)] +pub enum SortField { + Priority, + Timestamp, + ContentLen, + Degree, + Weight, +} + +/// Numeric comparison operator. +#[derive(Clone, Debug)] +pub enum Cmp { + Gt(f64), + Gte(f64), + Lt(f64), + Lte(f64), + Eq(f64), +} + +impl Cmp { + fn matches(&self, val: f64) -> bool { + match self { + Cmp::Gt(x) => val > *x, + Cmp::Gte(x) => val >= *x, + Cmp::Lt(x) => val < *x, + Cmp::Lte(x) => val <= *x, + Cmp::Eq(x) => (val - x).abs() < f64::EPSILON, + } + } +} + +/// Parse a comparison like ">0.5", ">=60", "<7d" (durations converted to seconds). +fn parse_cmp(s: &str) -> Result { + let (op_len, ctor): (usize, fn(f64) -> Cmp) = if s.starts_with(">=") { + (2, Cmp::Gte) + } else if s.starts_with("<=") { + (2, Cmp::Lte) + } else if s.starts_with('>') { + (1, Cmp::Gt) + } else if s.starts_with('<') { + (1, Cmp::Lt) + } else if s.starts_with('=') { + (1, Cmp::Eq) + } else { + return Err(format!("expected comparison operator in '{}'", s)); + }; + + let val_str = &s[op_len..]; + let val = parse_duration_or_number(val_str)?; + Ok(ctor(val)) +} + +/// Parse "7d", "24h", "30m" as seconds, or plain numbers. +fn parse_duration_or_number(s: &str) -> Result { + if let Some(n) = s.strip_suffix('d') { + let v: f64 = n.parse().map_err(|_| format!("bad number: {}", n))?; + Ok(v * 86400.0) + } else if let Some(n) = s.strip_suffix('h') { + let v: f64 = n.parse().map_err(|_| format!("bad number: {}", n))?; + Ok(v * 3600.0) + } else if let Some(n) = s.strip_suffix('m') { + let v: f64 = n.parse().map_err(|_| format!("bad number: {}", n))?; + Ok(v * 60.0) + } else { + s.parse().map_err(|_| format!("bad number: {}", s)) + } +} + +/// Parse a NodeType from a label. +fn parse_node_type(s: &str) -> Result { + match s { + "episodic" | "session" => Ok(NodeType::EpisodicSession), + "daily" => Ok(NodeType::EpisodicDaily), + "weekly" => Ok(NodeType::EpisodicWeekly), + "monthly" => Ok(NodeType::EpisodicMonthly), + "semantic" => Ok(NodeType::Semantic), + _ => Err(format!("unknown node type: {} (use: episodic, semantic, daily, weekly, monthly)", s)), + } +} + +impl Stage { + /// Parse a single stage from a string. + /// + /// Algorithm names are tried first (bare words), then predicate syntax + /// (contains ':'). No ambiguity since algorithms are bare words. + pub fn parse(s: &str) -> Result { + let s = s.trim(); + let (negated, s) = if let Some(rest) = s.strip_prefix('!') { + (true, rest) + } else { + (false, s) + }; + + // Generator: "all" + if s == "all" { + return Ok(Stage::Generator(Generator::All)); + } + + // Try algorithm parse first (bare words, no colon) + if !s.contains(':') { + if let Ok(algo) = AlgoStage::parse(s) { + return Ok(Stage::Algorithm(algo)); + } + } + + // Algorithm with params: "spread,max_hops=4" (contains comma but no colon) + if s.contains(',') && !s.contains(':') { + return AlgoStage::parse(s).map(Stage::Algorithm); + } + + // Predicate/transform syntax: "key:value" + let (prefix, value) = s.split_once(':') + .ok_or_else(|| format!("unknown stage: {}", s))?; + + let filter_or_transform = match prefix { + "type" => Stage::Filter(Filter::Type(parse_node_type(value)?)), + "key" => Stage::Filter(Filter::KeyGlob(value.to_string())), + "weight" => Stage::Filter(Filter::Weight(parse_cmp(value)?)), + "age" => Stage::Filter(Filter::Age(parse_cmp(value)?)), + "content-len" => Stage::Filter(Filter::ContentLen(parse_cmp(value)?)), + "provenance" => { + let prov = Provenance::from_label(value) + .ok_or_else(|| format!("unknown provenance: {}", value))?; + Stage::Filter(Filter::Provenance(prov)) + } + "not-visited" => { + let (agent, dur) = value.split_once(',') + .ok_or("not-visited:AGENT,DURATION")?; + let secs = parse_duration_or_number(dur)?; + Stage::Filter(Filter::NotVisited { + agent: agent.to_string(), + duration: secs as i64, + }) + } + "visited" => Stage::Filter(Filter::Visited { + agent: value.to_string(), + }), + "sort" => { + let field = match value { + "priority" => SortField::Priority, + "timestamp" => SortField::Timestamp, + "content-len" => SortField::ContentLen, + "degree" => SortField::Degree, + "weight" => SortField::Weight, + _ => return Err(format!("unknown sort field: {}", value)), + }; + Stage::Transform(Transform::Sort(field)) + } + "limit" => { + let n: usize = value.parse() + .map_err(|_| format!("bad limit: {}", value))?; + Stage::Transform(Transform::Limit(n)) + } + "match" => { + let terms: Vec = value.split(',') + .map(|t| t.to_string()) + .collect(); + Stage::Generator(Generator::Match(terms)) + } + // Algorithm with colon in params? Try fallback. + _ => return AlgoStage::parse(s).map(Stage::Algorithm) + .map_err(|_| format!("unknown stage: {}", s)), + }; + + // Apply negation to filters + if negated { + match filter_or_transform { + Stage::Filter(f) => Ok(Stage::Filter(Filter::Negated(Box::new(f)))), + _ => Err("! prefix only works on filter stages".to_string()), + } + } else { + Ok(filter_or_transform) + } + } + + /// Parse a pipe-separated pipeline string. + pub fn parse_pipeline(s: &str) -> Result, String> { + s.split('|') + .map(|part| Stage::parse(part.trim())) + .collect() + } +} + +impl fmt::Display for Stage { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Stage::Generator(Generator::All) => write!(f, "all"), + Stage::Generator(Generator::Match(terms)) => write!(f, "match:{}", terms.join(",")), + Stage::Filter(filt) => write!(f, "{}", filt), + Stage::Transform(Transform::Sort(field)) => write!(f, "sort:{:?}", field), + Stage::Transform(Transform::Limit(n)) => write!(f, "limit:{}", n), + Stage::Algorithm(a) => write!(f, "{}", a.algo), + } + } +} + +impl fmt::Display for Filter { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Filter::Type(t) => write!(f, "type:{:?}", t), + Filter::KeyGlob(g) => write!(f, "key:{}", g), + Filter::Weight(c) => write!(f, "weight:{}", c), + Filter::Age(c) => write!(f, "age:{}", c), + Filter::ContentLen(c) => write!(f, "content-len:{}", c), + Filter::Provenance(p) => write!(f, "provenance:{}", p.label()), + Filter::NotVisited { agent, duration } => write!(f, "not-visited:{},{}s", agent, duration), + Filter::Visited { agent } => write!(f, "visited:{}", agent), + Filter::Negated(inner) => write!(f, "!{}", inner), + } + } +} + +impl fmt::Display for Cmp { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Cmp::Gt(v) => write!(f, ">{}", v), + Cmp::Gte(v) => write!(f, ">={}", v), + Cmp::Lt(v) => write!(f, "<{}", v), + Cmp::Lte(v) => write!(f, "<={}", v), + Cmp::Eq(v) => write!(f, "={}", v), + } + } +} + +/// Simple glob matching (supports * and ?). +fn glob_matches(pattern: &str, text: &str) -> bool { + fn inner(pat: &[char], txt: &[char]) -> bool { + if pat.is_empty() { return txt.is_empty(); } + if pat[0] == '*' { + // Try matching * against 0..n characters + for skip in 0..=txt.len() { + if inner(&pat[1..], &txt[skip..]) { return true; } + } + return false; + } + if txt.is_empty() { return false; } + if pat[0] == '?' || pat[0] == txt[0] { + return inner(&pat[1..], &txt[1..]); + } + false + } + + let pat: Vec = pattern.chars().collect(); + let txt: Vec = text.chars().collect(); + inner(&pat, &txt) +} + +/// Run a unified query pipeline. Requires &Store for filter/transform stages. +/// +/// If the pipeline starts with no generator, the input `seeds` are used. +/// Generators produce a fresh result set (ignoring seeds). Filters narrow +/// the current set. Transforms reorder/truncate. Algorithms do graph +/// exploration. +pub fn run_query( + stages: &[Stage], + seeds: Vec<(String, f64)>, + graph: &Graph, + store: &Store, + debug: bool, + max_results: usize, +) -> Vec<(String, f64)> { + let now = crate::store::now_epoch(); + let mut current = seeds; + + for stage in stages { + if debug { + println!("\n[query] === {} ({} items in) ===", stage, current.len()); + } + + current = match stage { + Stage::Generator(gen) => run_generator(gen, store), + + Stage::Filter(filt) => { + current.into_iter() + .filter(|(key, _)| eval_filter(filt, key, store, now)) + .collect() + } + + Stage::Transform(xform) => run_transform(xform, current, store, graph), + + Stage::Algorithm(algo_stage) => { + match algo_stage.algo { + Algorithm::Spread => run_spread(¤t, graph, store, algo_stage, debug), + Algorithm::Spectral => run_spectral(¤t, graph, algo_stage, debug), + Algorithm::Manifold => run_manifold(¤t, graph, algo_stage, debug), + Algorithm::Confluence => run_confluence(¤t, graph, store, algo_stage, debug), + Algorithm::Geodesic => run_geodesic(¤t, graph, algo_stage, debug), + } + } + }; + + if debug { + println!("[query] → {} results", current.len()); + for (key, score) in current.iter().take(10) { + println!(" [{:.4}] {}", score, key); + } + if current.len() > 10 { + println!(" ... ({} more)", current.len() - 10); + } + } + } + + current.truncate(max_results); + current +} + +fn run_generator(gen: &Generator, store: &Store) -> Vec<(String, f64)> { + match gen { + Generator::All => { + store.nodes.iter() + .filter(|(_, n)| !n.deleted) + .map(|(key, n)| (key.clone(), n.weight as f64)) + .collect() + } + Generator::Match(terms) => { + let weighted: BTreeMap = terms.iter() + .map(|t| (t.to_lowercase(), 1.0)) + .collect(); + let (seeds, _) = match_seeds(&weighted, store); + seeds + } + } +} + +fn eval_filter(filt: &Filter, key: &str, store: &Store, now: i64) -> bool { + let node = match store.nodes.get(key) { + Some(n) => n, + None => return false, + }; + + match filt { + Filter::Type(t) => node.node_type == *t, + Filter::KeyGlob(pattern) => glob_matches(pattern, key), + Filter::Weight(cmp) => cmp.matches(node.weight as f64), + Filter::Age(cmp) => { + let age_secs = (now - node.timestamp) as f64; + cmp.matches(age_secs) + } + Filter::ContentLen(cmp) => cmp.matches(node.content.len() as f64), + Filter::Provenance(p) => node.provenance == *p, + Filter::NotVisited { agent, duration } => { + let last = store.last_visited(key, agent); + last == 0 || (now - last) > *duration + } + Filter::Visited { agent } => { + store.last_visited(key, agent) > 0 + } + Filter::Negated(inner) => !eval_filter(inner, key, store, now), + } +} + +fn run_transform( + xform: &Transform, + mut items: Vec<(String, f64)>, + store: &Store, + graph: &Graph, +) -> Vec<(String, f64)> { + match xform { + Transform::Sort(field) => { + match field { + SortField::Weight => { + items.sort_by(|a, b| b.1.total_cmp(&a.1)); + } + SortField::Timestamp => { + items.sort_by(|a, b| { + let ta = store.nodes.get(&a.0).map(|n| n.timestamp).unwrap_or(0); + let tb = store.nodes.get(&b.0).map(|n| n.timestamp).unwrap_or(0); + tb.cmp(&ta) // desc + }); + } + SortField::ContentLen => { + items.sort_by(|a, b| { + let la = store.nodes.get(&a.0).map(|n| n.content.len()).unwrap_or(0); + let lb = store.nodes.get(&b.0).map(|n| n.content.len()).unwrap_or(0); + lb.cmp(&la) // desc + }); + } + SortField::Degree => { + items.sort_by(|a, b| { + let da = graph.degree(&a.0); + let db = graph.degree(&b.0); + db.cmp(&da) // desc + }); + } + SortField::Priority => { + // Pre-compute priorities to avoid O(n log n) calls + // inside the sort comparator. + let priorities: HashMap = items.iter() + .map(|(key, _)| { + let p = crate::neuro::consolidation_priority( + store, key, graph, None); + (key.clone(), p) + }) + .collect(); + items.sort_by(|a, b| { + let pa = priorities.get(&a.0).copied().unwrap_or(0.0); + let pb = priorities.get(&b.0).copied().unwrap_or(0.0); + pb.total_cmp(&pa) // desc + }); + } + } + items + } + Transform::Limit(n) => { + items.truncate(*n); + items + } + } +} + /// Extract seeds from weighted terms by matching against node keys and content. /// /// Three matching strategies, in priority order: