query: unify PEG and engine parsers

PEG parser now handles both expression syntax (degree > 5 | sort degree)
and pipeline syntax (all | type:episodic | sort:timestamp). Deleted
Stage::parse() and helpers from engine.rs — it's now pure execution.

All callers use parse_stages() from parser.rs as the single entry point.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
ProofOfConcept 2026-04-11 20:42:58 -04:00
parent bc991c3521
commit aad227e487
8 changed files with 562 additions and 253 deletions

View file

@ -157,6 +157,9 @@ pub enum Filter {
pub enum Transform {
Sort(SortField),
Limit(usize),
Select(Vec<String>),
Count,
Connectivity,
DominatingSet,
}
@ -168,6 +171,8 @@ pub enum SortField {
Degree,
Weight,
Isolation,
Key,
Named(String, bool), // (field_name, ascending)
Composite(Vec<(ScoreField, f64)>),
}
@ -206,79 +211,6 @@ impl Cmp {
}
}
/// Parse a comparison like ">0.5", ">=60", "<7d" (durations converted to seconds).
fn parse_cmp(s: &str) -> Result<Cmp, String> {
let (op_len, ctor): (usize, fn(f64) -> Cmp) = if s.starts_with(">=") {
(2, Cmp::Gte)
} else if s.starts_with("<=") {
(2, Cmp::Lte)
} else if s.starts_with('>') {
(1, Cmp::Gt)
} else if s.starts_with('<') {
(1, Cmp::Lt)
} else if s.starts_with('=') {
(1, Cmp::Eq)
} else {
return Err(format!("expected comparison operator in '{}'", s));
};
let val_str = &s[op_len..];
let val = parse_duration_or_number(val_str)?;
Ok(ctor(val))
}
/// Parse "7d", "24h", "30m" as seconds, or plain numbers.
fn parse_duration_or_number(s: &str) -> Result<f64, String> {
if let Some(n) = s.strip_suffix('d') {
let v: f64 = n.parse().map_err(|_| format!("bad number: {}", n))?;
Ok(v * 86400.0)
} else if let Some(n) = s.strip_suffix('h') {
let v: f64 = n.parse().map_err(|_| format!("bad number: {}", n))?;
Ok(v * 3600.0)
} else if let Some(n) = s.strip_suffix('m') {
let v: f64 = n.parse().map_err(|_| format!("bad number: {}", n))?;
Ok(v * 60.0)
} else {
s.parse().map_err(|_| format!("bad number: {}", s))
}
}
/// Parse composite sort: "isolation*0.7+recency(linker)*0.3"
/// Each term is field or field(arg), optionally *weight (default 1.0).
fn parse_composite_sort(s: &str) -> Result<Vec<(ScoreField, f64)>, String> {
let mut terms = Vec::new();
for term in s.split('+') {
let term = term.trim();
let (field_part, weight) = if let Some((f, w)) = term.rsplit_once('*') {
(f, w.parse::<f64>().map_err(|_| format!("bad weight: {}", w))?)
} else {
(term, 1.0)
};
// Parse field, possibly with (arg)
let field = if let Some((name, arg)) = field_part.split_once('(') {
let arg = arg.strip_suffix(')').ok_or("missing ) in sort field")?;
match name {
"recency" => ScoreField::Recency(arg.to_string()),
_ => return Err(format!("unknown parameterized sort field: {}", name)),
}
} else {
match field_part {
"isolation" => ScoreField::Isolation,
"degree" => ScoreField::Degree,
"weight" => ScoreField::Weight,
"content-len" => ScoreField::ContentLen,
"priority" => ScoreField::Priority,
_ => return Err(format!("unknown sort field: {}", field_part)),
}
};
terms.push((field, weight));
}
if terms.is_empty() {
return Err("empty composite sort".into());
}
Ok(terms)
}
/// Compute a 0-1 score for a node on a single dimension.
fn score_field(
@ -348,129 +280,6 @@ impl CompositeCache {
}
}
/// Parse a NodeType from a label.
fn parse_node_type(s: &str) -> Result<NodeType, String> {
match s {
"episodic" | "session" => Ok(NodeType::EpisodicSession),
"daily" => Ok(NodeType::EpisodicDaily),
"weekly" => Ok(NodeType::EpisodicWeekly),
"monthly" => Ok(NodeType::EpisodicMonthly),
"semantic" => Ok(NodeType::Semantic),
_ => Err(format!("unknown node type: {} (use: episodic, semantic, daily, weekly, monthly)", s)),
}
}
impl Stage {
/// Parse a single stage from a string.
///
/// Algorithm names are tried first (bare words), then predicate syntax
/// (contains ':'). No ambiguity since algorithms are bare words.
pub fn parse(s: &str) -> Result<Self, String> {
let s = s.trim();
let (negated, s) = if let Some(rest) = s.strip_prefix('!') {
(true, rest)
} else {
(false, s)
};
// Generator: "all"
if s == "all" {
return Ok(Stage::Generator(Generator::All));
}
// Transform: "dominating-set"
if s == "dominating-set" {
return Ok(Stage::Transform(Transform::DominatingSet));
}
// Try algorithm parse first (bare words, no colon)
if !s.contains(':')
&& let Ok(algo) = AlgoStage::parse(s) {
return Ok(Stage::Algorithm(algo));
}
// Algorithm with params: "spread,max_hops=4" (contains comma but no colon)
if s.contains(',') && !s.contains(':') {
return AlgoStage::parse(s).map(Stage::Algorithm);
}
// Predicate/transform syntax: "key:value"
let (prefix, value) = s.split_once(':')
.ok_or_else(|| format!("unknown stage: {}", s))?;
let filter_or_transform = match prefix {
"type" => Stage::Filter(Filter::Type(parse_node_type(value)?)),
"key" => Stage::Filter(Filter::KeyGlob(value.to_string())),
"weight" => Stage::Filter(Filter::Weight(parse_cmp(value)?)),
"age" => Stage::Filter(Filter::Age(parse_cmp(value)?)),
"content-len" => Stage::Filter(Filter::ContentLen(parse_cmp(value)?)),
"provenance" => {
Stage::Filter(Filter::Provenance(value.to_string()))
}
"not-visited" => {
let (agent, dur) = value.split_once(',')
.ok_or("not-visited:AGENT,DURATION")?;
let secs = parse_duration_or_number(dur)?;
Stage::Filter(Filter::NotVisited {
agent: agent.to_string(),
duration: secs as i64,
})
}
"visited" => Stage::Filter(Filter::Visited {
agent: value.to_string(),
}),
"sort" => {
// Check for composite sort: field*weight+field*weight+...
let field = if value.contains('+') || value.contains('*') {
SortField::Composite(parse_composite_sort(value)?)
} else {
match value {
"priority" => SortField::Priority,
"timestamp" => SortField::Timestamp,
"content-len" => SortField::ContentLen,
"degree" => SortField::Degree,
"weight" => SortField::Weight,
"isolation" => SortField::Isolation,
_ => return Err(format!("unknown sort field: {}", value)),
}
};
Stage::Transform(Transform::Sort(field))
}
"limit" => {
let n: usize = value.parse()
.map_err(|_| format!("bad limit: {}", value))?;
Stage::Transform(Transform::Limit(n))
}
"match" => {
let terms: Vec<String> = value.split(',')
.map(|t| t.to_string())
.collect();
Stage::Generator(Generator::Match(terms))
}
// Algorithm with colon in params? Try fallback.
_ => return AlgoStage::parse(s).map(Stage::Algorithm)
.map_err(|_| format!("unknown stage: {}", s)),
};
// Apply negation to filters
if negated {
match filter_or_transform {
Stage::Filter(f) => Ok(Stage::Filter(Filter::Negated(Box::new(f)))),
_ => Err("! prefix only works on filter stages".to_string()),
}
} else {
Ok(filter_or_transform)
}
}
/// Parse a pipe-separated pipeline string.
pub fn parse_pipeline(s: &str) -> Result<Vec<Stage>, String> {
s.split('|')
.map(|part| Stage::parse(part.trim()))
.collect()
}
}
impl fmt::Display for Stage {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
@ -479,6 +288,9 @@ impl fmt::Display for Stage {
Stage::Filter(filt) => write!(f, "{}", filt),
Stage::Transform(Transform::Sort(field)) => write!(f, "sort:{:?}", field),
Stage::Transform(Transform::Limit(n)) => write!(f, "limit:{}", n),
Stage::Transform(Transform::Select(fields)) => write!(f, "select:{}", fields.join(",")),
Stage::Transform(Transform::Count) => write!(f, "count"),
Stage::Transform(Transform::Connectivity) => write!(f, "connectivity"),
Stage::Transform(Transform::DominatingSet) => write!(f, "dominating-set"),
Stage::Algorithm(a) => write!(f, "{}", a.algo),
}
@ -613,7 +425,7 @@ fn run_generator(g: &Generator, store: &Store) -> Vec<(String, f64)> {
}
}
fn eval_filter(filt: &Filter, key: &str, store: &Store, now: i64) -> bool {
pub fn eval_filter(filt: &Filter, key: &str, store: &Store, now: i64) -> bool {
let node = match store.nodes.get(key) {
Some(n) => n,
None => return false,
@ -686,6 +498,39 @@ pub fn run_transform(
sb.total_cmp(&sa) // most isolated first
});
}
SortField::Key => {
items.sort_by(|a, b| a.0.cmp(&b.0));
}
SortField::Named(field, asc) => {
// Resolve field from node properties
let resolve = |key: &str| -> Option<f64> {
let node = store.nodes.get(key)?;
match field.as_str() {
"weight" => Some(node.weight as f64),
"emotion" => Some(node.emotion as f64),
"retrievals" => Some(node.retrievals as f64),
"uses" => Some(node.uses as f64),
"wrongs" => Some(node.wrongs as f64),
"created" => Some(node.created_at as f64),
"timestamp" => Some(node.timestamp as f64),
"degree" => Some(graph.degree(key) as f64),
"content_len" => Some(node.content.len() as f64),
_ => None,
}
};
let asc = *asc;
items.sort_by(|a, b| {
let va = resolve(&a.0);
let vb = resolve(&b.0);
let ord = match (va, vb) {
(Some(a), Some(b)) => a.total_cmp(&b),
(Some(_), None) => std::cmp::Ordering::Less,
(None, Some(_)) => std::cmp::Ordering::Greater,
(None, None) => a.0.cmp(&b.0),
};
if asc { ord } else { ord.reverse() }
});
}
SortField::Priority => {
// Pre-compute priorities to avoid O(n log n) calls
// inside the sort comparator.
@ -725,6 +570,8 @@ pub fn run_transform(
items.truncate(*n);
items
}
// Output mode directives - don't modify result set, handled at output layer
Transform::Select(_) | Transform::Count | Transform::Connectivity => items,
Transform::DominatingSet => {
// Greedy 3-covering dominating set: pick the node that covers
// the most under-covered neighbors, repeat until every node