query: add connectivity pipe stage

BFS-based connectivity analysis as a query pipeline stage. Shows
connected components, islands, and sample paths between result nodes
through the full graph (max 4 hops).

  poc-memory query "content ~ 'made love' | connectivity"
  poc-memory query "(content ~ 'A' OR content ~ 'B') | connectivity"

Also documented in query --help.
This commit is contained in:
ProofOfConcept 2026-03-11 17:04:59 -04:00
parent 0e971dee61
commit 9a0908fbc6
2 changed files with 109 additions and 1 deletions

View file

@ -143,6 +143,7 @@ PIPE STAGES:
| limit N cap results
| select F,F,... output fields as TSV
| count just show count
| connectivity show graph structure between results
FUNCTIONS:
community('key') community id of a node
@ -151,11 +152,12 @@ FUNCTIONS:
EXAMPLES:
key ~ 'inner-life' substring match on keys
content ~ 'made love' full-text search
content ~ 'made love' | connectivity find clusters among results
(content ~ 'A' OR content ~ 'B') | connectivity
degree > 15 | sort degree | limit 10 high-degree nodes
key ~ 'journal' AND degree > 10 | count count matching nodes
neighbors('identity') WHERE strength > 0.5 | sort strength
* | sort weight asc | limit 20 lowest-weight nodes
node_type = semantic | sort degree all semantic nodes by degree
")]
Query {
/// Query expression (e.g. "key ~ 'inner-life'")

View file

@ -63,6 +63,7 @@ pub enum Stage {
Limit(usize),
Select(Vec<String>),
Count,
Connectivity,
}
#[derive(Debug, Clone)]
@ -88,6 +89,7 @@ peg::parser! {
/ "limit" _ n:integer() { Stage::Limit(n) }
/ "select" _ f:field_list() { Stage::Select(f) }
/ "count" { Stage::Count }
/ "connectivity" { Stage::Connectivity }
rule asc_desc() -> bool
= "asc" { true }
@ -420,6 +422,7 @@ fn execute_parsed(
Stage::Limit(n) => {
results.truncate(*n);
}
Stage::Connectivity => {} // handled in output
Stage::Select(_) | Stage::Count => {} // handled in output
}
}
@ -470,6 +473,12 @@ pub fn run_query(store: &Store, graph: &Graph, query_str: &str) -> Result<(), St
return Ok(());
}
// Connectivity stage
if q.stages.iter().any(|s| matches!(s, Stage::Connectivity)) {
print_connectivity(&results, graph);
return Ok(());
}
// Select stage
let fields: Option<&Vec<String>> = q.stages.iter().find_map(|s| match s {
Stage::Select(f) => Some(f),
@ -499,3 +508,100 @@ pub fn run_query(store: &Store, graph: &Graph, query_str: &str) -> Result<(), St
Ok(())
}
// -- Connectivity analysis --
/// BFS shortest path between two nodes, max_hops limit.
fn bfs_path(graph: &Graph, from: &str, to: &str, max_hops: usize) -> Option<Vec<String>> {
use std::collections::{VecDeque, HashMap};
if from == to { return Some(vec![from.to_string()]); }
let mut parent: HashMap<String, String> = HashMap::new();
parent.insert(from.to_string(), String::new());
let mut queue: VecDeque<(String, usize)> = VecDeque::new();
queue.push_back((from.to_string(), 0));
while let Some((current, depth)) = queue.pop_front() {
if depth >= max_hops { continue; }
for (neighbor, _) in graph.neighbors(&current) {
if parent.contains_key(neighbor.as_str()) { continue; }
parent.insert(neighbor.clone(), current.clone());
if neighbor == to {
let mut path = vec![to.to_string()];
let mut node = to.to_string();
while let Some(p) = parent.get(&node) {
if p.is_empty() { break; }
path.push(p.clone());
node = p.clone();
}
path.reverse();
return Some(path);
}
queue.push_back((neighbor.clone(), depth + 1));
}
}
None
}
/// Find connected components among result nodes via BFS through the full graph.
fn find_components(keys: &[&str], graph: &Graph, max_hops: usize) -> Vec<Vec<String>> {
use std::collections::HashSet;
let mut assigned: HashSet<&str> = HashSet::new();
let mut components: Vec<Vec<String>> = Vec::new();
for &start in keys {
if assigned.contains(start) { continue; }
let mut component = vec![start.to_string()];
assigned.insert(start);
for &other in keys {
if assigned.contains(other) { continue; }
if bfs_path(graph, start, other, max_hops).is_some() {
component.push(other.to_string());
assigned.insert(other);
}
}
components.push(component);
}
components
}
/// Print connectivity report for query results.
fn print_connectivity(results: &[QueryResult], graph: &Graph) {
let max_hops = 4;
let keys: Vec<&str> = results.iter().map(|r| r.key.as_str()).collect();
let components = find_components(&keys, graph, max_hops);
println!("Connectivity: {} nodes, {} components (max {} hops)\n",
results.len(), components.len(), max_hops);
let result_set: std::collections::HashSet<&str> = keys.iter().copied().collect();
for (i, component) in components.iter().enumerate() {
if component.len() == 1 {
println!(" island: {}", component[0]);
} else {
println!(" cluster {} ({} nodes):", i + 1, component.len());
for node in component {
println!(" {} (degree {})", node, graph.degree(node));
}
// Show a sample path between first two nodes
if component.len() >= 2 {
if let Some(path) = bfs_path(graph, &component[0], &component[1], max_hops) {
print!(" path: ");
for (j, step) in path.iter().enumerate() {
if j > 0 { print!(""); }
if result_set.contains(step.as_str()) {
print!("{}", step);
} else {
print!("[{}]", step);
}
}
println!();
}
}
}
}
}