search: unified query pipeline with filters, transforms, generators

Extend the pipeline with four stage types composing left-to-right:
  Generators: all, match:TERM
  Filters: type:, key:, weight:, age:, content-len:, provenance:,
           not-visited:, visited: (plus ! negation)
  Transforms: sort:(priority|timestamp|content-len|degree|weight), limit:N
  Algorithms: spread, spectral, confluence, geodesic, manifold (unchanged)

Duration syntax (7d, 24h, 30m) and glob matching on keys.
CLI auto-detects filter/transform stages and loads full Store;
algorithm-only pipelines keep the fast MmapView path.

Co-Authored-By: ProofOfConcept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-03-10 15:22:12 -04:00
parent c6bb7c3910
commit e736471d99
2 changed files with 591 additions and 70 deletions

View file

@ -598,35 +598,77 @@ fn cmd_search(terms: &[String], pipeline_args: &[String], expand: bool, full: bo
use store::StoreView;
use std::collections::BTreeMap;
if terms.is_empty() {
return Err("search requires at least one term".into());
// Parse pipeline stages (unified: algorithms, filters, transforms, generators)
let stages: Vec<search::Stage> = if pipeline_args.is_empty() {
vec![search::Stage::Algorithm(search::AlgoStage::parse("spread").unwrap())]
} else {
pipeline_args.iter()
.map(|a| search::Stage::parse(a))
.collect::<Result<Vec<_>, _>>()?
};
// Check if pipeline needs full Store (has filters/transforms/generators)
let needs_store = stages.iter().any(|s| !matches!(s, search::Stage::Algorithm(_)));
// Check if pipeline starts with a generator (doesn't need seed terms)
let has_generator = stages.first().map(|s| matches!(s, search::Stage::Generator(_))).unwrap_or(false);
if terms.is_empty() && !has_generator {
return Err("search requires terms or a generator stage (e.g. 'all')".into());
}
let query: String = terms.join(" ");
// Parse pipeline (default: spread)
let pipeline: Vec<search::AlgoStage> = if pipeline_args.is_empty() {
vec![search::AlgoStage::parse("spread").unwrap()]
} else {
pipeline_args.iter()
.map(|a| search::AlgoStage::parse(a))
.collect::<Result<Vec<_>, _>>()?
};
if debug {
let names: Vec<String> = pipeline.iter().map(|s| format!("{}", s.algo)).collect();
let names: Vec<String> = stages.iter().map(|s| format!("{}", s)).collect();
println!("[search] pipeline: {}", names.join(""));
}
let max_results = if expand { 15 } else { 5 };
if needs_store {
// Full Store path — needed for filter/transform/generator stages
let store = store::Store::load()?;
let graph = store.build_graph();
let seeds = if has_generator {
vec![] // generator will produce its own result set
} else {
let terms_map: BTreeMap<String, f64> = query.split_whitespace()
.map(|t| (t.to_lowercase(), 1.0))
.collect();
let (seeds, _) = search::match_seeds(&terms_map, &store);
seeds
};
let raw = search::run_query(&stages, seeds, &graph, &store, debug, max_results);
if raw.is_empty() {
eprintln!("No results");
return Ok(());
}
for (i, (key, score)) in raw.iter().enumerate().take(max_results) {
let weight = store.nodes.get(key).map(|n| n.weight).unwrap_or(0.0);
println!("{:2}. [{:.2}/{:.2}] {}", i + 1, score, weight, key);
if full {
if let Some(node) = store.nodes.get(key) {
println!();
for line in node.content.lines() {
println!(" {}", line);
}
println!();
}
}
}
} else {
// Fast MmapView path — algorithm-only pipeline
let view = store::AnyView::load()?;
let graph = graph::build_graph_fast(&view);
// Build equal-weight terms from query
let terms: BTreeMap<String, f64> = query.split_whitespace()
let terms_map: BTreeMap<String, f64> = query.split_whitespace()
.map(|t| (t.to_lowercase(), 1.0))
.collect();
let (seeds, direct_hits) = search::match_seeds(&terms, &view);
let (seeds, direct_hits) = search::match_seeds(&terms_map, &view);
if seeds.is_empty() {
eprintln!("No results for '{}'", query);
@ -635,13 +677,18 @@ fn cmd_search(terms: &[String], pipeline_args: &[String], expand: bool, full: bo
if debug {
println!("[search] {} seeds from query '{}'", seeds.len(), query);
for (key, score) in &seeds {
println!(" {:.4} {}", score, key);
}
}
let max_results = if expand { 15 } else { 5 };
let raw = search::run_pipeline(&pipeline, seeds, &graph, &view, debug, max_results);
// Extract AlgoStages from the unified stages
let algo_stages: Vec<&search::AlgoStage> = stages.iter()
.filter_map(|s| match s {
search::Stage::Algorithm(a) => Some(a),
_ => None,
})
.collect();
let algo_owned: Vec<search::AlgoStage> = algo_stages.into_iter().cloned().collect();
let raw = search::run_pipeline(&algo_owned, seeds, &graph, &view, debug, max_results);
let results: Vec<search::SearchResult> = raw.into_iter()
.map(|(key, activation)| {
@ -676,6 +723,7 @@ fn cmd_search(terms: &[String], pipeline_args: &[String], expand: bool, full: bo
}
}
}
}
Ok(())
}

View file

@ -1,17 +1,44 @@
// Memory search: composable algorithm pipeline.
// Memory search: composable query pipeline.
//
// Each algorithm is a stage: takes seeds Vec<(String, f64)>, returns
// new/modified seeds. Stages compose left-to-right in a pipeline.
// The pipeline has four kinds of stages, all composing left-to-right:
//
// Available algorithms:
// spread — spreading activation through graph edges
// spectral — nearest neighbors in spectral embedding space
// manifold — extrapolation along direction defined by seeds (TODO)
// Generators — produce a result set from nothing:
// all every non-deleted node
// match:TERM text match (current seed extraction)
//
// Seed extraction (matching query terms to node keys) is shared
// infrastructure, not an algorithm stage.
// Filters — narrow an existing result set on node metadata:
// type:episodic node_type == EpisodicSession
// type:semantic node_type == Semantic
// type:daily node_type == EpisodicDaily
// type:weekly node_type == EpisodicWeekly
// type:monthly node_type == EpisodicMonthly
// key:GLOB glob match on key
// weight:>0.5 numeric comparison on weight
// age:<7d created/modified within duration
// content-len:>1000 content size filter
// provenance:manual provenance match
// not-visited:AGENT,DUR not seen by agent in duration
// visited:AGENT has been seen by agent
//
// Transforms — reorder or reshape:
// sort:priority consolidation priority scoring
// sort:timestamp by timestamp (desc)
// sort:content-len by content size
// sort:degree by graph degree
// sort:weight by weight
// limit:N truncate to N results
//
// Algorithms — graph exploration (existing):
// spread spreading activation
// spectral,k=20 spectral nearest neighbors
// confluence multi-source reachability
// geodesic straightest spectral paths
// manifold extrapolation along seed direction
//
// Stages are parsed from strings and composed via the -p flag or
// pipe-separated in agent definitions.
use crate::store::StoreView;
use crate::store::{Store, StoreView, NodeType, Provenance};
use crate::graph::Graph;
use crate::spectral;
@ -96,6 +123,452 @@ impl AlgoStage {
}
}
// ── Unified query pipeline ──────────────────────────────────────────
/// A pipeline stage: generator, filter, transform, or graph algorithm.
#[derive(Clone, Debug)]
pub enum Stage {
Generator(Generator),
Filter(Filter),
Transform(Transform),
Algorithm(AlgoStage),
}
#[derive(Clone, Debug)]
pub enum Generator {
All, // every non-deleted node
Match(Vec<String>), // text match seeds
}
#[derive(Clone, Debug)]
pub enum Filter {
Type(NodeType),
KeyGlob(String),
Weight(Cmp),
Age(Cmp), // vs now - timestamp (seconds)
ContentLen(Cmp),
Provenance(Provenance),
NotVisited { agent: String, duration: i64 }, // seconds
Visited { agent: String },
Negated(Box<Filter>),
}
#[derive(Clone, Debug)]
pub enum Transform {
Sort(SortField),
Limit(usize),
}
#[derive(Clone, Debug)]
pub enum SortField {
Priority,
Timestamp,
ContentLen,
Degree,
Weight,
}
/// Numeric comparison operator.
#[derive(Clone, Debug)]
pub enum Cmp {
Gt(f64),
Gte(f64),
Lt(f64),
Lte(f64),
Eq(f64),
}
impl Cmp {
fn matches(&self, val: f64) -> bool {
match self {
Cmp::Gt(x) => val > *x,
Cmp::Gte(x) => val >= *x,
Cmp::Lt(x) => val < *x,
Cmp::Lte(x) => val <= *x,
Cmp::Eq(x) => (val - x).abs() < f64::EPSILON,
}
}
}
/// Parse a comparison like ">0.5", ">=60", "<7d" (durations converted to seconds).
fn parse_cmp(s: &str) -> Result<Cmp, String> {
let (op_len, ctor): (usize, fn(f64) -> Cmp) = if s.starts_with(">=") {
(2, Cmp::Gte)
} else if s.starts_with("<=") {
(2, Cmp::Lte)
} else if s.starts_with('>') {
(1, Cmp::Gt)
} else if s.starts_with('<') {
(1, Cmp::Lt)
} else if s.starts_with('=') {
(1, Cmp::Eq)
} else {
return Err(format!("expected comparison operator in '{}'", s));
};
let val_str = &s[op_len..];
let val = parse_duration_or_number(val_str)?;
Ok(ctor(val))
}
/// Parse "7d", "24h", "30m" as seconds, or plain numbers.
fn parse_duration_or_number(s: &str) -> Result<f64, String> {
if let Some(n) = s.strip_suffix('d') {
let v: f64 = n.parse().map_err(|_| format!("bad number: {}", n))?;
Ok(v * 86400.0)
} else if let Some(n) = s.strip_suffix('h') {
let v: f64 = n.parse().map_err(|_| format!("bad number: {}", n))?;
Ok(v * 3600.0)
} else if let Some(n) = s.strip_suffix('m') {
let v: f64 = n.parse().map_err(|_| format!("bad number: {}", n))?;
Ok(v * 60.0)
} else {
s.parse().map_err(|_| format!("bad number: {}", s))
}
}
/// Parse a NodeType from a label.
fn parse_node_type(s: &str) -> Result<NodeType, String> {
match s {
"episodic" | "session" => Ok(NodeType::EpisodicSession),
"daily" => Ok(NodeType::EpisodicDaily),
"weekly" => Ok(NodeType::EpisodicWeekly),
"monthly" => Ok(NodeType::EpisodicMonthly),
"semantic" => Ok(NodeType::Semantic),
_ => Err(format!("unknown node type: {} (use: episodic, semantic, daily, weekly, monthly)", s)),
}
}
impl Stage {
/// Parse a single stage from a string.
///
/// Algorithm names are tried first (bare words), then predicate syntax
/// (contains ':'). No ambiguity since algorithms are bare words.
pub fn parse(s: &str) -> Result<Self, String> {
let s = s.trim();
let (negated, s) = if let Some(rest) = s.strip_prefix('!') {
(true, rest)
} else {
(false, s)
};
// Generator: "all"
if s == "all" {
return Ok(Stage::Generator(Generator::All));
}
// Try algorithm parse first (bare words, no colon)
if !s.contains(':') {
if let Ok(algo) = AlgoStage::parse(s) {
return Ok(Stage::Algorithm(algo));
}
}
// Algorithm with params: "spread,max_hops=4" (contains comma but no colon)
if s.contains(',') && !s.contains(':') {
return AlgoStage::parse(s).map(Stage::Algorithm);
}
// Predicate/transform syntax: "key:value"
let (prefix, value) = s.split_once(':')
.ok_or_else(|| format!("unknown stage: {}", s))?;
let filter_or_transform = match prefix {
"type" => Stage::Filter(Filter::Type(parse_node_type(value)?)),
"key" => Stage::Filter(Filter::KeyGlob(value.to_string())),
"weight" => Stage::Filter(Filter::Weight(parse_cmp(value)?)),
"age" => Stage::Filter(Filter::Age(parse_cmp(value)?)),
"content-len" => Stage::Filter(Filter::ContentLen(parse_cmp(value)?)),
"provenance" => {
let prov = Provenance::from_label(value)
.ok_or_else(|| format!("unknown provenance: {}", value))?;
Stage::Filter(Filter::Provenance(prov))
}
"not-visited" => {
let (agent, dur) = value.split_once(',')
.ok_or("not-visited:AGENT,DURATION")?;
let secs = parse_duration_or_number(dur)?;
Stage::Filter(Filter::NotVisited {
agent: agent.to_string(),
duration: secs as i64,
})
}
"visited" => Stage::Filter(Filter::Visited {
agent: value.to_string(),
}),
"sort" => {
let field = match value {
"priority" => SortField::Priority,
"timestamp" => SortField::Timestamp,
"content-len" => SortField::ContentLen,
"degree" => SortField::Degree,
"weight" => SortField::Weight,
_ => return Err(format!("unknown sort field: {}", value)),
};
Stage::Transform(Transform::Sort(field))
}
"limit" => {
let n: usize = value.parse()
.map_err(|_| format!("bad limit: {}", value))?;
Stage::Transform(Transform::Limit(n))
}
"match" => {
let terms: Vec<String> = value.split(',')
.map(|t| t.to_string())
.collect();
Stage::Generator(Generator::Match(terms))
}
// Algorithm with colon in params? Try fallback.
_ => return AlgoStage::parse(s).map(Stage::Algorithm)
.map_err(|_| format!("unknown stage: {}", s)),
};
// Apply negation to filters
if negated {
match filter_or_transform {
Stage::Filter(f) => Ok(Stage::Filter(Filter::Negated(Box::new(f)))),
_ => Err("! prefix only works on filter stages".to_string()),
}
} else {
Ok(filter_or_transform)
}
}
/// Parse a pipe-separated pipeline string.
pub fn parse_pipeline(s: &str) -> Result<Vec<Stage>, String> {
s.split('|')
.map(|part| Stage::parse(part.trim()))
.collect()
}
}
impl fmt::Display for Stage {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Stage::Generator(Generator::All) => write!(f, "all"),
Stage::Generator(Generator::Match(terms)) => write!(f, "match:{}", terms.join(",")),
Stage::Filter(filt) => write!(f, "{}", filt),
Stage::Transform(Transform::Sort(field)) => write!(f, "sort:{:?}", field),
Stage::Transform(Transform::Limit(n)) => write!(f, "limit:{}", n),
Stage::Algorithm(a) => write!(f, "{}", a.algo),
}
}
}
impl fmt::Display for Filter {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Filter::Type(t) => write!(f, "type:{:?}", t),
Filter::KeyGlob(g) => write!(f, "key:{}", g),
Filter::Weight(c) => write!(f, "weight:{}", c),
Filter::Age(c) => write!(f, "age:{}", c),
Filter::ContentLen(c) => write!(f, "content-len:{}", c),
Filter::Provenance(p) => write!(f, "provenance:{}", p.label()),
Filter::NotVisited { agent, duration } => write!(f, "not-visited:{},{}s", agent, duration),
Filter::Visited { agent } => write!(f, "visited:{}", agent),
Filter::Negated(inner) => write!(f, "!{}", inner),
}
}
}
impl fmt::Display for Cmp {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Cmp::Gt(v) => write!(f, ">{}", v),
Cmp::Gte(v) => write!(f, ">={}", v),
Cmp::Lt(v) => write!(f, "<{}", v),
Cmp::Lte(v) => write!(f, "<={}", v),
Cmp::Eq(v) => write!(f, "={}", v),
}
}
}
/// Simple glob matching (supports * and ?).
fn glob_matches(pattern: &str, text: &str) -> bool {
fn inner(pat: &[char], txt: &[char]) -> bool {
if pat.is_empty() { return txt.is_empty(); }
if pat[0] == '*' {
// Try matching * against 0..n characters
for skip in 0..=txt.len() {
if inner(&pat[1..], &txt[skip..]) { return true; }
}
return false;
}
if txt.is_empty() { return false; }
if pat[0] == '?' || pat[0] == txt[0] {
return inner(&pat[1..], &txt[1..]);
}
false
}
let pat: Vec<char> = pattern.chars().collect();
let txt: Vec<char> = text.chars().collect();
inner(&pat, &txt)
}
/// Run a unified query pipeline. Requires &Store for filter/transform stages.
///
/// If the pipeline starts with no generator, the input `seeds` are used.
/// Generators produce a fresh result set (ignoring seeds). Filters narrow
/// the current set. Transforms reorder/truncate. Algorithms do graph
/// exploration.
pub fn run_query(
stages: &[Stage],
seeds: Vec<(String, f64)>,
graph: &Graph,
store: &Store,
debug: bool,
max_results: usize,
) -> Vec<(String, f64)> {
let now = crate::store::now_epoch();
let mut current = seeds;
for stage in stages {
if debug {
println!("\n[query] === {} ({} items in) ===", stage, current.len());
}
current = match stage {
Stage::Generator(gen) => run_generator(gen, store),
Stage::Filter(filt) => {
current.into_iter()
.filter(|(key, _)| eval_filter(filt, key, store, now))
.collect()
}
Stage::Transform(xform) => run_transform(xform, current, store, graph),
Stage::Algorithm(algo_stage) => {
match algo_stage.algo {
Algorithm::Spread => run_spread(&current, graph, store, algo_stage, debug),
Algorithm::Spectral => run_spectral(&current, graph, algo_stage, debug),
Algorithm::Manifold => run_manifold(&current, graph, algo_stage, debug),
Algorithm::Confluence => run_confluence(&current, graph, store, algo_stage, debug),
Algorithm::Geodesic => run_geodesic(&current, graph, algo_stage, debug),
}
}
};
if debug {
println!("[query] → {} results", current.len());
for (key, score) in current.iter().take(10) {
println!(" [{:.4}] {}", score, key);
}
if current.len() > 10 {
println!(" ... ({} more)", current.len() - 10);
}
}
}
current.truncate(max_results);
current
}
fn run_generator(gen: &Generator, store: &Store) -> Vec<(String, f64)> {
match gen {
Generator::All => {
store.nodes.iter()
.filter(|(_, n)| !n.deleted)
.map(|(key, n)| (key.clone(), n.weight as f64))
.collect()
}
Generator::Match(terms) => {
let weighted: BTreeMap<String, f64> = terms.iter()
.map(|t| (t.to_lowercase(), 1.0))
.collect();
let (seeds, _) = match_seeds(&weighted, store);
seeds
}
}
}
fn eval_filter(filt: &Filter, key: &str, store: &Store, now: i64) -> bool {
let node = match store.nodes.get(key) {
Some(n) => n,
None => return false,
};
match filt {
Filter::Type(t) => node.node_type == *t,
Filter::KeyGlob(pattern) => glob_matches(pattern, key),
Filter::Weight(cmp) => cmp.matches(node.weight as f64),
Filter::Age(cmp) => {
let age_secs = (now - node.timestamp) as f64;
cmp.matches(age_secs)
}
Filter::ContentLen(cmp) => cmp.matches(node.content.len() as f64),
Filter::Provenance(p) => node.provenance == *p,
Filter::NotVisited { agent, duration } => {
let last = store.last_visited(key, agent);
last == 0 || (now - last) > *duration
}
Filter::Visited { agent } => {
store.last_visited(key, agent) > 0
}
Filter::Negated(inner) => !eval_filter(inner, key, store, now),
}
}
fn run_transform(
xform: &Transform,
mut items: Vec<(String, f64)>,
store: &Store,
graph: &Graph,
) -> Vec<(String, f64)> {
match xform {
Transform::Sort(field) => {
match field {
SortField::Weight => {
items.sort_by(|a, b| b.1.total_cmp(&a.1));
}
SortField::Timestamp => {
items.sort_by(|a, b| {
let ta = store.nodes.get(&a.0).map(|n| n.timestamp).unwrap_or(0);
let tb = store.nodes.get(&b.0).map(|n| n.timestamp).unwrap_or(0);
tb.cmp(&ta) // desc
});
}
SortField::ContentLen => {
items.sort_by(|a, b| {
let la = store.nodes.get(&a.0).map(|n| n.content.len()).unwrap_or(0);
let lb = store.nodes.get(&b.0).map(|n| n.content.len()).unwrap_or(0);
lb.cmp(&la) // desc
});
}
SortField::Degree => {
items.sort_by(|a, b| {
let da = graph.degree(&a.0);
let db = graph.degree(&b.0);
db.cmp(&da) // desc
});
}
SortField::Priority => {
// Pre-compute priorities to avoid O(n log n) calls
// inside the sort comparator.
let priorities: HashMap<String, f64> = items.iter()
.map(|(key, _)| {
let p = crate::neuro::consolidation_priority(
store, key, graph, None);
(key.clone(), p)
})
.collect();
items.sort_by(|a, b| {
let pa = priorities.get(&a.0).copied().unwrap_or(0.0);
let pb = priorities.get(&b.0).copied().unwrap_or(0.0);
pb.total_cmp(&pa) // desc
});
}
}
items
}
Transform::Limit(n) => {
items.truncate(*n);
items
}
}
}
/// Extract seeds from weighted terms by matching against node keys and content.
///
/// Three matching strategies, in priority order: