consciousness/src/hippocampus/query/engine.rs
ProofOfConcept aad227e487 query: unify PEG and engine parsers
PEG parser now handles both expression syntax (degree > 5 | sort degree)
and pipeline syntax (all | type:episodic | sort:timestamp). Deleted
Stage::parse() and helpers from engine.rs — it's now pure execution.

All callers use parse_stages() from parser.rs as the single entry point.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-11 20:42:58 -04:00

1336 lines
48 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Memory search: composable query pipeline.
//
// The pipeline has four kinds of stages, all composing left-to-right:
//
// Generators — produce a result set from nothing:
// all every non-deleted node
// match:TERM text match (current seed extraction)
//
// Filters — narrow an existing result set on node metadata:
// type:episodic node_type == EpisodicSession
// type:semantic node_type == Semantic
// type:daily node_type == EpisodicDaily
// type:weekly node_type == EpisodicWeekly
// type:monthly node_type == EpisodicMonthly
// key:GLOB glob match on key
// weight:>0.5 numeric comparison on weight
// age:<7d created/modified within duration
// content-len:>1000 content size filter
// provenance:manual provenance match
// not-visited:AGENT,DUR not seen by agent in duration
// visited:AGENT has been seen by agent
//
// Transforms — reorder or reshape:
// sort:priority consolidation priority scoring
// sort:timestamp by timestamp (desc)
// sort:content-len by content size
// sort:degree by graph degree
// sort:weight by weight
// limit:N truncate to N results
//
// Algorithms — graph exploration (existing):
// spread spreading activation
// spectral,k=20 spectral nearest neighbors
// confluence multi-source reachability
// geodesic straightest spectral paths
// manifold extrapolation along seed direction
//
// Stages are parsed from strings and composed via the -p flag or
// pipe-separated in agent definitions.
use crate::store::{Store, StoreView, NodeType};
use crate::graph::Graph;
use crate::spectral;
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::fmt;
pub struct SearchResult {
pub key: String,
pub activation: f64,
pub is_direct: bool,
pub snippet: Option<String>,
}
/// A parsed algorithm stage with its parameters.
#[derive(Clone, Debug)]
pub struct AlgoStage {
pub algo: Algorithm,
pub params: HashMap<String, String>,
}
#[derive(Clone, Debug)]
pub enum Algorithm {
Spread,
Spectral,
Manifold,
Confluence,
Geodesic,
}
impl fmt::Display for Algorithm {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Algorithm::Spread => write!(f, "spread"),
Algorithm::Spectral => write!(f, "spectral"),
Algorithm::Manifold => write!(f, "manifold"),
Algorithm::Confluence => write!(f, "confluence"),
Algorithm::Geodesic => write!(f, "geodesic"),
}
}
}
impl AlgoStage {
/// Parse "spread,max_hops=4,edge_decay=0.5" into an AlgoStage.
pub fn parse(s: &str) -> Result<Self, String> {
let mut parts = s.split(',');
let name = parts.next().unwrap_or("");
let algo = match name {
"spread" => Algorithm::Spread,
"spectral" => Algorithm::Spectral,
"manifold" => Algorithm::Manifold,
"confluence" => Algorithm::Confluence,
"geodesic" => Algorithm::Geodesic,
_ => return Err(format!("unknown algorithm: {}", name)),
};
let mut params = HashMap::new();
for part in parts {
if let Some((k, v)) = part.split_once('=') {
params.insert(k.to_string(), v.to_string());
} else {
return Err(format!("bad param (expected key=val): {}", part));
}
}
Ok(AlgoStage { algo, params })
}
fn param_f64(&self, key: &str, default: f64) -> f64 {
self.params.get(key)
.and_then(|v| v.parse().ok())
.unwrap_or(default)
}
fn param_u32(&self, key: &str, default: u32) -> u32 {
self.params.get(key)
.and_then(|v| v.parse().ok())
.unwrap_or(default)
}
fn param_usize(&self, key: &str, default: usize) -> usize {
self.params.get(key)
.and_then(|v| v.parse().ok())
.unwrap_or(default)
}
}
// ── Unified query pipeline ──────────────────────────────────────────
/// A pipeline stage: generator, filter, transform, or graph algorithm.
#[derive(Clone, Debug)]
pub enum Stage {
Generator(Generator),
Filter(Filter),
Transform(Transform),
Algorithm(AlgoStage),
}
#[derive(Clone, Debug)]
pub enum Generator {
All, // every non-deleted node
Match(Vec<String>), // text match seeds
}
#[derive(Clone, Debug)]
pub enum Filter {
Type(NodeType),
KeyGlob(String),
Weight(Cmp),
Age(Cmp), // vs now - timestamp (seconds)
ContentLen(Cmp),
Provenance(String),
NotVisited { agent: String, duration: i64 }, // seconds
Visited { agent: String },
Negated(Box<Filter>),
}
#[derive(Clone, Debug)]
pub enum Transform {
Sort(SortField),
Limit(usize),
Select(Vec<String>),
Count,
Connectivity,
DominatingSet,
}
#[derive(Clone, Debug)]
pub enum SortField {
Priority,
Timestamp,
ContentLen,
Degree,
Weight,
Isolation,
Key,
Named(String, bool), // (field_name, ascending)
Composite(Vec<(ScoreField, f64)>),
}
/// Individual scoring dimensions for composite sorts.
/// Each computes a 0.0-1.0 score per node.
#[derive(Clone, Debug)]
pub enum ScoreField {
Isolation,
Degree,
Weight,
ContentLen,
Priority,
/// Time since last visit by named agent. 1.0 = never visited, decays toward 0.
Recency(String),
}
/// Numeric comparison operator.
#[derive(Clone, Debug)]
pub enum Cmp {
Gt(f64),
Gte(f64),
Lt(f64),
Lte(f64),
Eq(f64),
}
impl Cmp {
fn matches(&self, val: f64) -> bool {
match self {
Cmp::Gt(x) => val > *x,
Cmp::Gte(x) => val >= *x,
Cmp::Lt(x) => val < *x,
Cmp::Lte(x) => val <= *x,
Cmp::Eq(x) => (val - x).abs() < f64::EPSILON,
}
}
}
/// Compute a 0-1 score for a node on a single dimension.
fn score_field(
field: &ScoreField,
key: &str,
store: &Store,
graph: &Graph,
precomputed: &CompositeCache,
) -> f64 {
match field {
ScoreField::Isolation => {
let comm = graph.communities().get(key).copied().unwrap_or(0);
precomputed.isolation.get(&comm).copied().unwrap_or(1.0) as f64
}
ScoreField::Degree => {
let d = graph.degree(key) as f64;
let max = precomputed.max_degree.max(1.0);
(d / max).min(1.0)
}
ScoreField::Weight => {
store.nodes.get(key).map(|n| n.weight as f64).unwrap_or(0.0)
}
ScoreField::ContentLen => {
let len = store.nodes.get(key).map(|n| n.content.len()).unwrap_or(0) as f64;
let max = precomputed.max_content_len.max(1.0);
(len / max).min(1.0)
}
ScoreField::Priority => {
let p = crate::neuro::consolidation_priority(store, key, graph, None);
// Priority is already roughly 0-1 from the scoring function
p.min(1.0)
}
ScoreField::Recency(agent) => {
let last = store.last_visited(key, agent);
if last == 0 {
1.0 // never visited = highest recency score
} else {
let age = (crate::store::now_epoch() - last) as f64;
// Sigmoid decay: 1.0 at 7+ days, ~0.5 at 1 day, ~0.1 at 1 hour
let hours = age / 3600.0;
1.0 - (-0.03 * hours).exp()
}
}
}
}
/// Cached values for composite scoring (computed once per sort).
struct CompositeCache {
isolation: HashMap<u32, f32>,
max_degree: f64,
max_content_len: f64,
}
impl CompositeCache {
fn build(items: &[(String, f64)], store: &Store, graph: &Graph) -> Self {
let max_degree = items.iter()
.map(|(k, _)| graph.degree(k) as f64)
.fold(0.0f64, f64::max);
let max_content_len = items.iter()
.map(|(k, _)| store.nodes.get(k).map(|n| n.content.len()).unwrap_or(0) as f64)
.fold(0.0f64, f64::max);
Self {
isolation: graph.community_isolation(),
max_degree,
max_content_len,
}
}
}
impl fmt::Display for Stage {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Stage::Generator(Generator::All) => write!(f, "all"),
Stage::Generator(Generator::Match(terms)) => write!(f, "match:{}", terms.join(",")),
Stage::Filter(filt) => write!(f, "{}", filt),
Stage::Transform(Transform::Sort(field)) => write!(f, "sort:{:?}", field),
Stage::Transform(Transform::Limit(n)) => write!(f, "limit:{}", n),
Stage::Transform(Transform::Select(fields)) => write!(f, "select:{}", fields.join(",")),
Stage::Transform(Transform::Count) => write!(f, "count"),
Stage::Transform(Transform::Connectivity) => write!(f, "connectivity"),
Stage::Transform(Transform::DominatingSet) => write!(f, "dominating-set"),
Stage::Algorithm(a) => write!(f, "{}", a.algo),
}
}
}
impl fmt::Display for Filter {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Filter::Type(t) => write!(f, "type:{:?}", t),
Filter::KeyGlob(g) => write!(f, "key:{}", g),
Filter::Weight(c) => write!(f, "weight:{}", c),
Filter::Age(c) => write!(f, "age:{}", c),
Filter::ContentLen(c) => write!(f, "content-len:{}", c),
Filter::Provenance(p) => write!(f, "provenance:{}", p),
Filter::NotVisited { agent, duration } => write!(f, "not-visited:{},{}s", agent, duration),
Filter::Visited { agent } => write!(f, "visited:{}", agent),
Filter::Negated(inner) => write!(f, "!{}", inner),
}
}
}
impl fmt::Display for Cmp {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Cmp::Gt(v) => write!(f, ">{}", v),
Cmp::Gte(v) => write!(f, ">={}", v),
Cmp::Lt(v) => write!(f, "<{}", v),
Cmp::Lte(v) => write!(f, "<={}", v),
Cmp::Eq(v) => write!(f, "={}", v),
}
}
}
/// Simple glob matching (supports * and ?).
fn glob_matches(pattern: &str, text: &str) -> bool {
fn inner(pat: &[char], txt: &[char]) -> bool {
if pat.is_empty() { return txt.is_empty(); }
if pat[0] == '*' {
// Try matching * against 0..n characters
for skip in 0..=txt.len() {
if inner(&pat[1..], &txt[skip..]) { return true; }
}
return false;
}
if txt.is_empty() { return false; }
if pat[0] == '?' || pat[0] == txt[0] {
return inner(&pat[1..], &txt[1..]);
}
false
}
let pat: Vec<char> = pattern.chars().collect();
let txt: Vec<char> = text.chars().collect();
inner(&pat, &txt)
}
/// Run a unified query pipeline. Requires &Store for filter/transform stages.
///
/// If the pipeline starts with no generator, the input `seeds` are used.
/// Generators produce a fresh result set (ignoring seeds). Filters narrow
/// the current set. Transforms reorder/truncate. Algorithms do graph
/// exploration.
pub fn run_query(
stages: &[Stage],
seeds: Vec<(String, f64)>,
graph: &Graph,
store: &Store,
debug: bool,
max_results: usize,
) -> Vec<(String, f64)> {
let now = crate::store::now_epoch();
let mut current = seeds;
for stage in stages {
if debug {
println!("\n[query] === {} ({} items in) ===", stage, current.len());
}
current = match stage {
Stage::Generator(g) => run_generator(g, store),
Stage::Filter(filt) => {
current.into_iter()
.filter(|(key, _)| eval_filter(filt, key, store, now))
.collect()
}
Stage::Transform(xform) => run_transform(xform, current, store, graph),
Stage::Algorithm(algo_stage) => {
match algo_stage.algo {
Algorithm::Spread => run_spread(&current, graph, store, algo_stage, debug),
Algorithm::Spectral => run_spectral(&current, graph, algo_stage, debug),
Algorithm::Manifold => run_manifold(&current, graph, algo_stage, debug),
Algorithm::Confluence => run_confluence(&current, graph, store, algo_stage, debug),
Algorithm::Geodesic => run_geodesic(&current, graph, algo_stage, debug),
}
}
};
if debug {
println!("[query] → {} results", current.len());
for (key, score) in current.iter().take(10) {
println!(" [{:.4}] {}", score, key);
}
if current.len() > 10 {
println!(" ... ({} more)", current.len() - 10);
}
}
}
current.truncate(max_results);
current
}
fn run_generator(g: &Generator, store: &Store) -> Vec<(String, f64)> {
match g {
Generator::All => {
store.nodes.iter()
.filter(|(_, n)| !n.deleted)
.map(|(key, n)| (key.clone(), n.weight as f64))
.collect()
}
Generator::Match(terms) => {
let weighted: BTreeMap<String, f64> = terms.iter()
.map(|t| (t.to_lowercase(), 1.0))
.collect();
let (seeds, _) = match_seeds(&weighted, store);
seeds
}
}
}
pub fn eval_filter(filt: &Filter, key: &str, store: &Store, now: i64) -> bool {
let node = match store.nodes.get(key) {
Some(n) => n,
None => return false,
};
match filt {
Filter::Type(t) => node.node_type == *t,
Filter::KeyGlob(pattern) => glob_matches(pattern, key),
Filter::Weight(cmp) => cmp.matches(node.weight as f64),
Filter::Age(cmp) => {
let age_secs = (now - node.timestamp) as f64;
cmp.matches(age_secs)
}
Filter::ContentLen(cmp) => cmp.matches(node.content.len() as f64),
Filter::Provenance(p) => node.provenance == *p,
Filter::NotVisited { agent, duration } => {
let last = store.last_visited(key, agent);
last == 0 || (now - last) > *duration
}
Filter::Visited { agent } => {
store.last_visited(key, agent) > 0
}
Filter::Negated(inner) => !eval_filter(inner, key, store, now),
}
}
pub fn run_transform(
xform: &Transform,
mut items: Vec<(String, f64)>,
store: &Store,
graph: &Graph,
) -> Vec<(String, f64)> {
match xform {
Transform::Sort(field) => {
match field {
SortField::Weight => {
items.sort_by(|a, b| b.1.total_cmp(&a.1));
}
SortField::Timestamp => {
items.sort_by(|a, b| {
let ta = store.nodes.get(&a.0).map(|n| n.timestamp).unwrap_or(0);
let tb = store.nodes.get(&b.0).map(|n| n.timestamp).unwrap_or(0);
tb.cmp(&ta) // desc
});
}
SortField::ContentLen => {
items.sort_by(|a, b| {
let la = store.nodes.get(&a.0).map(|n| n.content.len()).unwrap_or(0);
let lb = store.nodes.get(&b.0).map(|n| n.content.len()).unwrap_or(0);
lb.cmp(&la) // desc
});
}
SortField::Degree => {
items.sort_by(|a, b| {
let da = graph.degree(&a.0);
let db = graph.degree(&b.0);
db.cmp(&da) // desc
});
}
SortField::Isolation => {
// Score nodes by their community's isolation.
// Most isolated communities first (highest internal edge ratio).
let iso = graph.community_isolation();
let comms = graph.communities();
items.sort_by(|a, b| {
let ca = comms.get(&a.0).copied().unwrap_or(0);
let cb = comms.get(&b.0).copied().unwrap_or(0);
let sa = iso.get(&ca).copied().unwrap_or(1.0);
let sb = iso.get(&cb).copied().unwrap_or(1.0);
sb.total_cmp(&sa) // most isolated first
});
}
SortField::Key => {
items.sort_by(|a, b| a.0.cmp(&b.0));
}
SortField::Named(field, asc) => {
// Resolve field from node properties
let resolve = |key: &str| -> Option<f64> {
let node = store.nodes.get(key)?;
match field.as_str() {
"weight" => Some(node.weight as f64),
"emotion" => Some(node.emotion as f64),
"retrievals" => Some(node.retrievals as f64),
"uses" => Some(node.uses as f64),
"wrongs" => Some(node.wrongs as f64),
"created" => Some(node.created_at as f64),
"timestamp" => Some(node.timestamp as f64),
"degree" => Some(graph.degree(key) as f64),
"content_len" => Some(node.content.len() as f64),
_ => None,
}
};
let asc = *asc;
items.sort_by(|a, b| {
let va = resolve(&a.0);
let vb = resolve(&b.0);
let ord = match (va, vb) {
(Some(a), Some(b)) => a.total_cmp(&b),
(Some(_), None) => std::cmp::Ordering::Less,
(None, Some(_)) => std::cmp::Ordering::Greater,
(None, None) => a.0.cmp(&b.0),
};
if asc { ord } else { ord.reverse() }
});
}
SortField::Priority => {
// Pre-compute priorities to avoid O(n log n) calls
// inside the sort comparator.
let priorities: HashMap<String, f64> = items.iter()
.map(|(key, _)| {
let p = crate::neuro::consolidation_priority(
store, key, graph, None);
(key.clone(), p)
})
.collect();
items.sort_by(|a, b| {
let pa = priorities.get(&a.0).copied().unwrap_or(0.0);
let pb = priorities.get(&b.0).copied().unwrap_or(0.0);
pb.total_cmp(&pa) // desc
});
}
SortField::Composite(terms) => {
let cache = CompositeCache::build(&items, store, graph);
let scores: HashMap<String, f64> = items.iter()
.map(|(key, _)| {
let s: f64 = terms.iter()
.map(|(field, w)| score_field(field, key, store, graph, &cache) * w)
.sum();
(key.clone(), s)
})
.collect();
items.sort_by(|a, b| {
let sa = scores.get(&a.0).copied().unwrap_or(0.0);
let sb = scores.get(&b.0).copied().unwrap_or(0.0);
sb.total_cmp(&sa) // highest composite score first
});
}
}
items
}
Transform::Limit(n) => {
items.truncate(*n);
items
}
// Output mode directives - don't modify result set, handled at output layer
Transform::Select(_) | Transform::Count | Transform::Connectivity => items,
Transform::DominatingSet => {
// Greedy 3-covering dominating set: pick the node that covers
// the most under-covered neighbors, repeat until every node
// has been covered 3 times (by 3 different selected seeds).
use std::collections::HashMap as HMap;
let input_keys: std::collections::HashSet<String> = items.iter().map(|(k, _)| k.clone()).collect();
let mut cover_count: HMap<String, usize> = items.iter().map(|(k, _)| (k.clone(), 0)).collect();
let mut selected: Vec<(String, f64)> = Vec::new();
let mut selected_set: std::collections::HashSet<String> = std::collections::HashSet::new();
const REQUIRED_COVERAGE: usize = 3;
loop {
// Find the unselected node that covers the most under-covered nodes
let best = items.iter()
.filter(|(k, _)| !selected_set.contains(k.as_str()))
.map(|(k, _)| {
let mut value = 0usize;
// Count self if under-covered
if cover_count.get(k).copied().unwrap_or(0) < REQUIRED_COVERAGE {
value += 1;
}
for (nbr, _) in graph.neighbors(k) {
if input_keys.contains(nbr.as_str())
&& cover_count.get(nbr.as_str()).copied().unwrap_or(0) < REQUIRED_COVERAGE {
value += 1;
}
}
(k.clone(), value)
})
.max_by_key(|(_, v)| *v);
let Some((key, value)) = best else { break };
if value == 0 { break; } // everything covered 3x
// Mark coverage
*cover_count.entry(key.clone()).or_default() += 1;
for (nbr, _) in graph.neighbors(&key) {
if let Some(c) = cover_count.get_mut(nbr.as_str()) {
*c += 1;
}
}
let score = items.iter().find(|(k, _)| k == &key).map(|(_, s)| *s).unwrap_or(1.0);
selected.push((key.clone(), score));
selected_set.insert(key);
}
selected
}
}
}
/// Extract seeds from weighted terms by matching against node keys and content.
///
/// Three matching strategies, in priority order:
/// 1. Exact key match: term matches a node key exactly → full weight
/// 2. Key component match: term matches a word in a hyphenated/underscored key → 0.5× weight
/// 3. Content match: term appears in node content → 0.2× weight (capped at 50 nodes)
///
/// Returns (seeds, direct_hits) where direct_hits tracks which keys
/// were matched directly (vs found by an algorithm stage).
pub fn match_seeds(
terms: &BTreeMap<String, f64>,
store: &impl StoreView,
) -> (Vec<(String, f64)>, HashSet<String>) {
match_seeds_opts(terms, store, false, false)
}
pub fn match_seeds_opts(
terms: &BTreeMap<String, f64>,
store: &impl StoreView,
component_match: bool,
content_fallback: bool,
) -> (Vec<(String, f64)>, HashSet<String>) {
let mut seed_map: HashMap<String, f64> = HashMap::new();
let mut direct_hits: HashSet<String> = HashSet::new();
// Build key lookup: lowercase key → (original key, weight)
let mut key_map: HashMap<String, (String, f64)> = HashMap::new();
// Build component index: word → vec of (original key, weight)
let mut component_map: HashMap<String, Vec<(String, f64)>> = HashMap::new();
store.for_each_node(|key, _content, weight| {
let lkey = key.to_lowercase();
key_map.insert(lkey.clone(), (key.to_owned(), weight as f64));
// Split key on hyphens, underscores, dots, hashes for component matching
for component in lkey.split(['-', '_', '.', '#']) {
if component.len() >= 3 {
component_map.entry(component.to_owned())
.or_default()
.push((key.to_owned(), weight as f64));
}
}
});
for (term, &term_weight) in terms {
// Strategy 1: exact key match
if let Some((orig_key, node_weight)) = key_map.get(term) {
let score = term_weight * node_weight;
*seed_map.entry(orig_key.clone()).or_insert(0.0) += score;
direct_hits.insert(orig_key.clone());
continue;
}
// Strategy 2: key component match (0.5× weight) — only when explicitly requested
if component_match
&& let Some(matches) = component_map.get(term.as_str()) {
for (orig_key, node_weight) in matches {
let score = term_weight * node_weight * 0.5;
*seed_map.entry(orig_key.clone()).or_insert(0.0) += score;
direct_hits.insert(orig_key.clone());
}
continue;
}
// Strategy 3: content match (0.2× weight) — only when explicitly requested
if content_fallback {
let term_lower = term.to_lowercase();
if term_lower.len() >= 3 {
let mut content_hits = 0;
store.for_each_node(|key, content, weight| {
if content_hits >= 50 { return; }
if content.to_lowercase().contains(&term_lower) {
let score = term_weight * weight as f64 * 0.2;
*seed_map.entry(key.to_owned()).or_insert(0.0) += score;
content_hits += 1;
}
});
}
}
}
let seeds: Vec<(String, f64)> = seed_map.into_iter().collect();
(seeds, direct_hits)
}
/// Run a pipeline of algorithm stages.
pub fn run_pipeline(
stages: &[AlgoStage],
seeds: Vec<(String, f64)>,
graph: &Graph,
store: &impl StoreView,
debug: bool,
max_results: usize,
) -> Vec<(String, f64)> {
let mut current = seeds;
for stage in stages {
if debug {
println!("\n[search] === {} ({} seeds in) ===", stage.algo, current.len());
}
current = match stage.algo {
Algorithm::Spread => run_spread(&current, graph, store, stage, debug),
Algorithm::Spectral => run_spectral(&current, graph, stage, debug),
Algorithm::Manifold => run_manifold(&current, graph, stage, debug),
Algorithm::Confluence => run_confluence(&current, graph, store, stage, debug),
Algorithm::Geodesic => run_geodesic(&current, graph, stage, debug),
};
if debug {
println!("[search] {}{} results", stage.algo, current.len());
for (i, (key, score)) in current.iter().enumerate().take(15) {
let cutoff = if i + 1 == max_results { " <-- cutoff" } else { "" };
println!(" [{:.4}] {}{}", score, key, cutoff);
}
if current.len() > 15 {
println!(" ... ({} more)", current.len() - 15);
}
}
}
current.truncate(max_results);
current
}
/// Spreading activation: propagate scores through graph edges.
///
/// Tunable params: max_hops (default from store), edge_decay (default from store),
/// min_activation (default from store).
fn run_spread(
seeds: &[(String, f64)],
graph: &Graph,
store: &impl StoreView,
stage: &AlgoStage,
_debug: bool,
) -> Vec<(String, f64)> {
let store_params = store.params();
let max_hops = stage.param_u32("max_hops", store_params.max_hops);
let edge_decay = stage.param_f64("edge_decay", store_params.edge_decay);
let min_activation = stage.param_f64("min_activation", store_params.min_activation * 0.1);
spreading_activation(seeds, graph, store, max_hops, edge_decay, min_activation)
}
/// Spectral projection: find nearest neighbors in spectral embedding space.
///
/// Tunable params: k (default 20, number of neighbors to find).
fn run_spectral(
seeds: &[(String, f64)],
graph: &Graph,
stage: &AlgoStage,
debug: bool,
) -> Vec<(String, f64)> {
let k = stage.param_usize("k", 20);
let emb = match spectral::load_embedding() {
Ok(e) => e,
Err(e) => {
if debug { println!(" no spectral embedding: {}", e); }
return seeds.to_vec();
}
};
let weighted_seeds: Vec<(&str, f64)> = seeds.iter()
.map(|(k, w)| (k.as_str(), *w))
.collect();
let projected = spectral::nearest_to_seeds_weighted(
&emb, &weighted_seeds, Some(graph), k,
);
if debug {
for (key, dist) in &projected {
let score = 1.0 / (1.0 + dist);
println!(" dist={:.6} score={:.4} {}", dist, score, key);
}
}
// Merge: keep original seeds, add spectral results as new seeds
let seed_set: HashSet<&str> = seeds.iter().map(|(k, _)| k.as_str()).collect();
let mut result = seeds.to_vec();
for (key, dist) in projected {
if !seed_set.contains(key.as_str()) {
let score = 1.0 / (1.0 + dist);
result.push((key, score));
}
}
result
}
/// Confluence: multi-source reachability scoring.
///
/// Unlike spreading activation (which takes max activation from any source),
/// confluence rewards nodes reachable from *multiple* seeds. For each candidate
/// node within k hops, score = sum of (seed_weight * edge_decay^distance) across
/// all seeds that can reach it. Nodes at the intersection of multiple seeds'
/// neighborhoods score highest.
///
/// This naturally handles mixed seeds: unrelated seeds activate disjoint
/// neighborhoods that don't overlap, so their results separate naturally.
///
/// Tunable params: max_hops (default 3), edge_decay (default 0.5),
/// min_sources (default 2, minimum number of distinct seeds that must reach a node).
fn run_confluence(
seeds: &[(String, f64)],
graph: &Graph,
store: &impl StoreView,
stage: &AlgoStage,
debug: bool,
) -> Vec<(String, f64)> {
let max_hops = stage.param_u32("max_hops", 3);
let edge_decay = stage.param_f64("edge_decay", 0.5);
let min_sources = stage.param_usize("min_sources", 2);
// For each seed, BFS outward collecting (node → activation) at each distance
// Track which seeds contributed to each node's score
let mut node_scores: HashMap<String, f64> = HashMap::new();
let mut node_sources: HashMap<String, HashSet<usize>> = HashMap::new();
for (seed_idx, (seed_key, seed_weight)) in seeds.iter().enumerate() {
let mut visited: HashMap<String, f64> = HashMap::new();
let mut queue: VecDeque<(String, u32)> = VecDeque::new();
visited.insert(seed_key.clone(), *seed_weight);
queue.push_back((seed_key.clone(), 0));
while let Some((key, depth)) = queue.pop_front() {
if depth >= max_hops { continue; }
let act = visited[&key];
for (neighbor, strength) in graph.neighbors(&key) {
let neighbor_weight = store.node_weight(neighbor.as_str());
let propagated = act * edge_decay * neighbor_weight * strength as f64;
if propagated < 0.001 { continue; }
if !visited.contains_key(neighbor.as_str()) || visited[neighbor.as_str()] < propagated {
visited.insert(neighbor.clone(), propagated);
queue.push_back((neighbor.clone(), depth + 1));
}
}
}
// Accumulate into global scores (additive across seeds)
for (key, act) in visited {
*node_scores.entry(key.clone()).or_insert(0.0) += act;
node_sources.entry(key).or_default().insert(seed_idx);
}
}
// Filter to nodes reached by min_sources distinct seeds
let mut results: Vec<(String, f64)> = node_scores.into_iter()
.filter(|(key, _)| {
node_sources.get(key).map(|s| s.len()).unwrap_or(0) >= min_sources
})
.collect();
if debug {
// Show source counts
for (key, score) in results.iter().take(15) {
let sources = node_sources.get(key).map(|s| s.len()).unwrap_or(0);
println!(" [{:.4}] {} (from {} seeds)", score, key, sources);
}
}
results.sort_by(|a, b| b.1.total_cmp(&a.1));
results
}
/// Geodesic: straightest paths between seed pairs in spectral space.
///
/// For each pair of seeds, walk the graph from one to the other, at each
/// step choosing the neighbor whose spectral direction most aligns with
/// the target direction. Nodes along these geodesic paths score higher
/// the more paths pass through them and the straighter those paths are.
///
/// Tunable params: max_path (default 6), k (default 20 results).
fn run_geodesic(
seeds: &[(String, f64)],
graph: &Graph,
stage: &AlgoStage,
debug: bool,
) -> Vec<(String, f64)> {
let max_path = stage.param_usize("max_path", 6);
let k = stage.param_usize("k", 20);
let emb = match spectral::load_embedding() {
Ok(e) => e,
Err(e) => {
if debug { println!(" no spectral embedding: {}", e); }
return seeds.to_vec();
}
};
// Filter seeds to those with valid spectral coords
let valid_seeds: Vec<(&str, f64, &Vec<f64>)> = seeds.iter()
.filter_map(|(key, weight)| {
emb.coords.get(key.as_str())
.filter(|c| c.iter().any(|&v| v.abs() > 1e-12))
.map(|c| (key.as_str(), *weight, c))
})
.collect();
if valid_seeds.len() < 2 {
if debug { println!(" need ≥2 seeds with spectral coords, have {}", valid_seeds.len()); }
return seeds.to_vec();
}
// For each pair of seeds, find the geodesic path
let mut path_counts: HashMap<String, f64> = HashMap::new();
let seed_set: HashSet<&str> = seeds.iter().map(|(k, _)| k.as_str()).collect();
for i in 0..valid_seeds.len() {
for j in (i + 1)..valid_seeds.len() {
let (key_a, weight_a, coords_a) = &valid_seeds[i];
let (key_b, weight_b, coords_b) = &valid_seeds[j];
let pair_weight = weight_a * weight_b;
// Walk from A toward B
let path_ab = geodesic_walk(
key_a, coords_a, coords_b, graph, &emb, max_path,
);
// Walk from B toward A
let path_ba = geodesic_walk(
key_b, coords_b, coords_a, graph, &emb, max_path,
);
// Score nodes on both paths (nodes found from both directions score double)
for (node, alignment) in path_ab.iter().chain(path_ba.iter()) {
if !seed_set.contains(node.as_str()) {
*path_counts.entry(node.clone()).or_insert(0.0) += pair_weight * alignment;
}
}
}
}
if debug && !path_counts.is_empty() {
println!(" {} pairs examined, {} distinct nodes on paths",
valid_seeds.len() * (valid_seeds.len() - 1) / 2,
path_counts.len());
}
// Merge with original seeds
let mut results = seeds.to_vec();
let mut path_results: Vec<(String, f64)> = path_counts.into_iter().collect();
path_results.sort_by(|a, b| b.1.total_cmp(&a.1));
path_results.truncate(k);
for (key, score) in path_results {
if !seed_set.contains(key.as_str()) {
results.push((key, score));
}
}
results.sort_by(|a, b| b.1.total_cmp(&a.1));
results
}
/// Walk from `start` toward `target_coords` in spectral space, choosing
/// the neighbor at each step whose direction most aligns with the target.
/// Returns (node_key, alignment_score) for each intermediate node.
fn geodesic_walk(
start: &str,
start_coords: &[f64],
target_coords: &[f64],
graph: &Graph,
emb: &spectral::SpectralEmbedding,
max_steps: usize,
) -> Vec<(String, f64)> {
let mut path = Vec::new();
let mut current = start.to_string();
let mut current_coords = start_coords.to_vec();
let mut visited: HashSet<String> = HashSet::new();
visited.insert(current.clone());
for _ in 0..max_steps {
// Direction we want to travel: from current toward target
let direction: Vec<f64> = target_coords.iter()
.zip(current_coords.iter())
.map(|(t, c)| t - c)
.collect();
let dir_norm = direction.iter().map(|d| d * d).sum::<f64>().sqrt();
if dir_norm < 1e-12 { break; } // arrived
// Among neighbors with spectral coords, find the one most aligned
let mut best: Option<(String, Vec<f64>, f64)> = None;
for (neighbor, _strength) in graph.neighbors(&current) {
if visited.contains(neighbor.as_str()) { continue; }
let neighbor_coords = match emb.coords.get(neighbor.as_str()) {
Some(c) if c.iter().any(|&v| v.abs() > 1e-12) => c,
_ => continue,
};
// Direction to this neighbor
let step: Vec<f64> = neighbor_coords.iter()
.zip(current_coords.iter())
.map(|(n, c)| n - c)
.collect();
let step_norm = step.iter().map(|s| s * s).sum::<f64>().sqrt();
if step_norm < 1e-12 { continue; }
// Cosine similarity between desired direction and step direction
let dot: f64 = direction.iter().zip(step.iter()).map(|(d, s)| d * s).sum();
let alignment = dot / (dir_norm * step_norm);
if alignment > 0.0 { // only consider forward-facing neighbors
if best.as_ref().map(|(_, _, a)| alignment > *a).unwrap_or(true) {
best = Some((neighbor.clone(), neighbor_coords.clone(), alignment));
}
}
}
match best {
Some((next_key, next_coords, alignment)) => {
path.push((next_key.clone(), alignment));
visited.insert(next_key.clone());
current = next_key;
current_coords = next_coords;
}
None => break, // no forward-facing neighbors
}
}
path
}
/// Manifold: extrapolation along the direction defined by seeds.
///
/// Instead of finding what's *near* the seeds in spectral space (proximity),
/// find what's in the *direction* the seeds define. Given a weighted centroid
/// of seeds and the principal direction they span, find nodes that continue
/// along that direction.
///
/// Tunable params: k (default 20 results).
fn run_manifold(
seeds: &[(String, f64)],
graph: &Graph,
stage: &AlgoStage,
debug: bool,
) -> Vec<(String, f64)> {
let k = stage.param_usize("k", 20);
let emb = match spectral::load_embedding() {
Ok(e) => e,
Err(e) => {
if debug { println!(" no spectral embedding: {}", e); }
return seeds.to_vec();
}
};
// Collect seeds with valid spectral coordinates
let seed_data: Vec<(&str, f64, &Vec<f64>)> = seeds.iter()
.filter_map(|(key, weight)| {
emb.coords.get(key.as_str())
.filter(|c| c.iter().any(|&v| v.abs() > 1e-12))
.map(|c| (key.as_str(), *weight, c))
})
.collect();
if seed_data.is_empty() {
if debug { println!(" no seeds with spectral coords"); }
return seeds.to_vec();
}
let dims = emb.dims;
// Compute weighted centroid of seeds
let mut centroid = vec![0.0f64; dims];
let mut total_weight = 0.0;
for (_, weight, coords) in &seed_data {
for (i, &c) in coords.iter().enumerate() {
centroid[i] += c * weight;
}
total_weight += weight;
}
if total_weight > 0.0 {
for c in &mut centroid {
*c /= total_weight;
}
}
// Compute principal direction via power iteration on seed covariance.
// Initialize with the two most separated seeds (largest spectral distance).
let mut direction = vec![0.0f64; dims];
if seed_data.len() >= 2 {
// Find the two seeds furthest apart in spectral space
let mut best_dist = 0.0f64;
for i in 0..seed_data.len() {
for j in (i + 1)..seed_data.len() {
let dist: f64 = seed_data[i].2.iter().zip(seed_data[j].2.iter())
.map(|(a, b)| (a - b).powi(2)).sum::<f64>().sqrt();
if dist > best_dist {
best_dist = dist;
for d in 0..dims {
direction[d] = seed_data[j].2[d] - seed_data[i].2[d];
}
}
}
}
// Power iteration: 3 rounds on the weighted covariance matrix
for _ in 0..3 {
let mut new_dir = vec![0.0f64; dims];
for (_, weight, coords) in &seed_data {
let dev: Vec<f64> = coords.iter().zip(centroid.iter()).map(|(c, m)| c - m).collect();
let dot: f64 = dev.iter().zip(direction.iter()).map(|(d, v)| d * v).sum();
for d in 0..dims {
new_dir[d] += weight * dot * dev[d];
}
}
// Normalize
let norm = new_dir.iter().map(|d| d * d).sum::<f64>().sqrt();
if norm > 1e-12 {
for d in &mut new_dir { *d /= norm; }
}
direction = new_dir;
}
}
let dir_norm = direction.iter().map(|d| d * d).sum::<f64>().sqrt();
let seed_set: HashSet<&str> = seeds.iter().map(|(k, _)| k.as_str()).collect();
// Score each non-seed node by projection onto the direction from centroid
let mut candidates: Vec<(String, f64)> = emb.coords.iter()
.filter(|(key, coords)| {
!seed_set.contains(key.as_str())
&& coords.iter().any(|&v| v.abs() > 1e-12)
})
.map(|(key, coords)| {
let deviation: Vec<f64> = coords.iter().zip(centroid.iter())
.map(|(c, m)| c - m)
.collect();
let score = if dir_norm > 1e-12 {
// Project onto direction: how far along the principal axis
let projection: f64 = deviation.iter().zip(direction.iter())
.map(|(d, v)| d * v)
.sum::<f64>() / dir_norm;
// Distance from the axis (perpendicular component)
let proj_vec: Vec<f64> = direction.iter()
.map(|&d| d * projection / dir_norm)
.collect();
let perp_dist: f64 = deviation.iter().zip(proj_vec.iter())
.map(|(d, p)| (d - p).powi(2))
.sum::<f64>()
.sqrt();
// Score: prefer nodes far along the direction but close to the axis
// Use absolute projection (both directions from centroid are interesting)
let along = projection.abs();
if perp_dist < 1e-12 {
along
} else {
along / (1.0 + perp_dist)
}
} else {
// No direction (single seed or all seeds coincide): use distance from centroid
let dist: f64 = deviation.iter().map(|d| d * d).sum::<f64>().sqrt();
1.0 / (1.0 + dist)
};
// Bonus for being connected to seeds in the graph
let graph_bonus: f64 = graph.neighbors(key).iter()
.filter(|(n, _)| seed_set.contains(n.as_str()))
.map(|(_, s)| *s as f64 * 0.1)
.sum();
(key.clone(), score + graph_bonus)
})
.collect();
candidates.sort_by(|a, b| b.1.total_cmp(&a.1));
candidates.truncate(k);
if debug {
for (key, score) in candidates.iter().take(15) {
println!(" [{:.4}] {}", score, key);
}
}
// Merge with original seeds
let mut results = seeds.to_vec();
for (key, score) in candidates {
results.push((key, score));
}
results.sort_by(|a, b| b.1.total_cmp(&a.1));
results
}
/// Simultaneous wavefront spreading activation.
///
/// All seeds emit at once. At each hop, activations from all sources
/// sum at each node, and the combined activation map propagates on
/// the next hop. This creates interference patterns — nodes where
/// multiple wavefronts overlap get reinforced and radiate stronger.
pub fn spreading_activation(
seeds: &[(String, f64)],
graph: &Graph,
store: &impl StoreView,
max_hops: u32,
edge_decay: f64,
min_activation: f64,
) -> Vec<(String, f64)> {
let mut activation: HashMap<String, f64> = HashMap::new();
// Initialize wavefront from all seeds
let mut frontier: HashMap<String, f64> = HashMap::new();
for (key, act) in seeds {
*frontier.entry(key.clone()).or_insert(0.0) += act;
*activation.entry(key.clone()).or_insert(0.0) += act;
}
// Propagate hop by hop — all sources simultaneously
// Node weight does NOT gate traversal — only edge_decay and edge strength.
// Node weight is applied at the end for ranking.
for _hop in 0..max_hops {
let mut next_frontier: HashMap<String, f64> = HashMap::new();
for (key, act) in &frontier {
for (neighbor, strength) in graph.neighbors(key) {
let propagated = act * edge_decay * strength as f64;
if propagated < min_activation { continue; }
*next_frontier.entry(neighbor.clone()).or_insert(0.0) += propagated;
}
}
if next_frontier.is_empty() { break; }
// Merge into total activation and advance frontier
for (key, act) in &next_frontier {
*activation.entry(key.clone()).or_insert(0.0) += act;
}
frontier = next_frontier;
}
// Apply node weight for ranking, not traversal
let mut results: Vec<_> = activation.into_iter()
.map(|(key, act)| {
let weight = store.node_weight(&key);
(key, act * weight)
})
.collect();
results.sort_by(|a, b| b.1.total_cmp(&a.1));
results
}
/// Search with weighted terms: exact key matching + spectral projection.
///
/// Terms are matched against node keys. Matching nodes become seeds,
/// scored by term_weight × node_weight. Seeds are then projected into
/// spectral space to find nearby nodes, with link weights modulating distance.
pub fn search_weighted(
terms: &BTreeMap<String, f64>,
store: &impl StoreView,
) -> Vec<SearchResult> {
search_weighted_inner(terms, store, false, 5)
}
fn search_weighted_inner(
terms: &BTreeMap<String, f64>,
store: &impl StoreView,
debug: bool,
max_results: usize,
) -> Vec<SearchResult> {
let graph = crate::graph::build_graph_fast(store);
let (seeds, direct_hits) = match_seeds(terms, store);
if seeds.is_empty() {
return Vec::new();
}
if debug {
println!("\n[search] === SEEDS ({}) ===", seeds.len());
let mut sorted_seeds = seeds.clone();
sorted_seeds.sort_by(|a, b| b.1.total_cmp(&a.1));
for (key, score) in sorted_seeds.iter().take(20) {
println!(" {:.4} {}", score, key);
}
}
// Default pipeline: spectral → spread (legacy behavior)
let pipeline = vec![
AlgoStage { algo: Algorithm::Spectral, params: HashMap::new() },
AlgoStage { algo: Algorithm::Spread, params: HashMap::new() },
];
let raw_results = run_pipeline(&pipeline, seeds, &graph, store, debug, max_results);
raw_results.into_iter()
.take(max_results)
.map(|(key, activation)| {
let is_direct = direct_hits.contains(&key);
SearchResult { key, activation, is_direct, snippet: None }
}).collect()
}
/// Search with equal-weight terms (for interactive use).
pub fn search(query: &str, store: &impl StoreView) -> Vec<SearchResult> {
let terms: BTreeMap<String, f64> = query.split_whitespace()
.map(|t| (t.to_lowercase(), 1.0))
.collect();
search_weighted(&terms, store)
}