Compare commits

..

4 commits

Author SHA1 Message Date
ProofOfConcept
aad227e487 query: unify PEG and engine parsers
PEG parser now handles both expression syntax (degree > 5 | sort degree)
and pipeline syntax (all | type:episodic | sort:timestamp). Deleted
Stage::parse() and helpers from engine.rs — it's now pure execution.

All callers use parse_stages() from parser.rs as the single entry point.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-11 20:42:58 -04:00
ProofOfConcept
bc991c3521 unconscious: memory tools as base, agent def adds extras
Every unconscious agent gets memory_tools() as baseline. The tools
field in the agent def specifies additional tools on top of that —
digest agent now gets journal_tail, journal_new, journal_update.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-11 19:54:18 -04:00
ProofOfConcept
1c0967c4ec Agent:🆕 tool definitions from caller's tool list
The system prompt was advertising all tools to every agent, but
the runtime only dispatched the agent's actual subset. This caused
unconscious agents to call tools that returned "Unknown tool."

Agent::new now takes the tool list explicitly. Each caller passes
its own tools — the prompt and runtime always match. MCP tool
definitions are still appended for agents that use them.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-11 19:43:24 -04:00
ProofOfConcept
28e564aeb2 save_agent_log: write flat context array matching AST order
The old code wrote a JSON object with named section keys, which
serde_json serialized in alphabetical order — putting conversation
before system, making logs misleading. Write a single flat array
in section order instead, matching what the model actually sees.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-11 19:28:03 -04:00
15 changed files with 589 additions and 275 deletions

View file

@ -0,0 +1,94 @@
# Query Language Unification Plan
**Status: DONE** (2026-04-11)
## Problem (was)
Two query parsers that didn't agree on syntax:
1. **PEG parser** (`hippocampus/query/parser.rs`) — boolean logic, general
comparisons, operator precedence, parentheses. Used by CLI and compact
format path in `query()` tool.
2. **Pipeline parser** (`hippocampus/query/engine.rs`) — domain-specific
filters (type, age, provenance), graph algorithms (spread, spectral).
Used by full format path in `query()` tool.
`journal_tail` generates pipeline syntax but gets routed through the PEG
parser on the compact path. Result: parse errors.
## Approach
Keep the PEG parser (has the harder-to-build structural foundation),
extend it with the pipeline parser's domain features.
## Expression extensions (add to `expr` rule in parser.rs)
- `field:value` shorthand for `field = 'value'` (colon-separated equality)
- `*` already works as `Expr::All`
- `key ~ 'glob'` already works via match operator
## New stages (add to `stage` rule in parser.rs)
Domain filter stages from engine.rs:
- `type:X` — filter by node type (episodic, daily, weekly, monthly, semantic)
- `age:<7d` — duration comparison on timestamp
- `key:GLOB` — glob match on key
- `provenance:X` — provenance filter
- `weight:>N` — weight comparison (may already work via general comparison)
- `content-len:>N` — content size filter
Sort/limit syntax variants:
- `sort:field` alongside existing `sort field`
- `limit:N` alongside existing `limit N`
Graph algorithms:
- `spread` — spreading activation
- `spectral` — spectral nearest neighbors
- `confluence` — multi-source reachability
- `geodesic` — straightest spectral paths
- `manifold` — extrapolation along seed direction
## What changes
1. `parser.rs` — add field:value shorthand to expr, add domain stages
2. `engine.rs` — keep run_pipeline execution logic, have PEG parser emit
compatible Stage types (or convert PEG AST to Stage at boundary)
3. `query()` tool handler (memory.rs) — one parser path for all formats
4. `journal_tail` (memory.rs) — generate unified syntax
5. CLI `poc-memory query` — uses unified parser
## Migration path
1. Add field:value shorthand and type/age/key stages to PEG parser
2. Route query() through PEG parser for all formats
3. Migrate journal_tail and any other pipeline-syntax callers
4. Remove the pipeline parser (or keep as internal execution layer)
## What was done
**Deleted from engine.rs (-153 lines):**
- `Stage::parse()` and `Stage::parse_pipeline()` — redundant with PEG
- `parse_cmp()`, `parse_duration_or_number()`, `parse_composite_sort()`,
`parse_node_type()`, `parse_sort_field()` — helper functions for deleted parser
**Added to parser.rs (+120 lines):**
- Pipeline syntax in PEG grammar (`type:X`, `age:<Nd`, `sort:field`, etc.)
- `parse_stages()` — unified entry point returning `Vec<Stage>`
- Grammar helper functions
**Net: +17 lines**
**Architecture now:**
- parser.rs: PEG grammar handles ALL parsing (both syntaxes)
- engine.rs: Pure execution — types and `run_query()`, no parsing
Result: `all | type:episodic | sort:timestamp | limit:5` works everywhere.
Mixed syntax like `degree > 5 | type:semantic | sort degree` also works.
## What NOT to change (original note)
The run_pipeline execution logic stays — it's correct and well-tested.
Only the parsing front-end unifies. The pipeline parser's Stage enum
becomes the internal representation that both the PEG parser and any
remaining direct callers produce.

View file

@ -0,0 +1,198 @@
# Sparse Kernel Compilation: Napkin Math for Qwen3.5-27B
## Architecture recap
| Parameter | Value |
|-----------|-------|
| Layers | 64 (48 linear attention, 16 full attention) |
| Hidden dim (H) | 5,120 |
| Query heads | 24 |
| KV heads | 4 (GQA) |
| Head dim | 256 |
| FFN intermediate | 17,408 |
| Full attention interval | every 4th layer |
| Total params | ~27.5B |
**Key discovery**: Qwen3.5 already uses sparse attention — 48/64 layers use
linear attention (O(N)), only 16 use full O(N^2) attention. The attention
sparsity question is partially answered by the architecture itself. The
remaining opportunity is **weight sparsity** in the projection and FFN matrices.
## Measured weight sparsity (from B200, all 11 shards)
| Component | Count | Total params | <0.001 | <0.005 | <0.01 | <0.02 |
|-----------|-------|-------------|--------|--------|-------|-------|
| down_proj | 65 | 5,793,382,400 | 7.4% | 35.8% | 64.3% | 92.8% |
| gate_proj | 65 | 5,793,382,400 | 7.2% | 35.0% | 63.3% | 92.3% |
| up_proj | 65 | 5,793,382,400 | 7.3% | 35.2% | 63.5% | 92.5% |
| q_proj | 17 | 1,069,547,520 | 5.7% | 27.9% | 51.9% | 82.8% |
| k_proj | 17 | 89,128,960 | 6.0% | 29.4% | 54.1% | 84.1% |
| v_proj | 17 | 89,128,960 | 4.8% | 23.7% | 44.7% | 74.2% |
| o_proj | 17 | 534,773,760 | 5.3% | 25.9% | 48.7% | 79.6% |
| other | 605 | 6,070,652,272 | 5.4% | 26.4% | 49.6% | 80.9% |
**Key findings**:
- FFN layers (gate/up/down) are remarkably sparse: ~64% of weights below 0.01
- At threshold 0.02, FFN sparsity exceeds 92%
- Attention projections are less sparse but still significant: 45-52% below 0.01
- v_proj is the least sparse component (44.7% below 0.01)
## Per-layer parameter breakdown
- **QKV projection**: H × (Q_dim + K_dim + V_dim) ≈ 5120 × 13312 ≈ 68M
- **Output projection**: ~26M
- **FFN (gate + up + down)**: 5120 × 17408 × 3 ≈ 267M
- **Total per layer**: ~361M
- **64 layers**: ~23.1B (rest is embeddings, norms, etc.)
## Dense baseline: FLOPs per token
Each weight parameter contributes 2 FLOPs per token (multiply + accumulate).
- Per layer: ~722M FLOPs/token
- 64 layers: **~46.2B FLOPs/token**
On a B200 (theoretical ~4.5 PFLOPS FP8, ~1.1 PFLOPS BF16):
- BF16 throughput: 46.2B / 1.1e15 ≈ 0.042ms per token (compute-bound limit)
- But inference is usually **memory-bandwidth-bound** for small batch sizes
B200 HBM bandwidth: ~8 TB/s. At BF16 (2 bytes/param):
- Loading all weights once: 27.5B × 2 = 55GB → 55/8000 ≈ **6.9ms per token** (batch=1)
- This is the real bottleneck. Compute is cheap; loading weights is expensive.
## What sparsity buys you
At **X% sparsity** (X% of weights are zero), you need to load (1-X)% of the weights:
| Sparsity | Params loaded | Time (batch=1) | Speedup |
|----------|--------------|-----------------|---------|
| 0% (dense) | 27.5B | 6.9ms | 1.0x |
| 50% | 13.8B | 3.4ms | 2.0x |
| 75% | 6.9B | 1.7ms | 4.0x |
| 90% | 2.8B | 0.7ms | 10x |
**This is the key insight**: inference at small batch sizes is memory-bandwidth-bound.
Sparse weights = fewer bytes to load = directly proportional speedup,
**IF** the sparse kernel can avoid the gather/scatter overhead.
## The compilation problem
Dense GEMM is fast because:
1. Weights are contiguous in memory → sequential reads
2. Tiling fits perfectly in SRAM → high reuse
3. Hardware tensor cores expect dense blocks
Naive sparse matmul kills this:
- Irregular memory access → low bandwidth utilization
- Poor SRAM tiling → cache thrashing
- Tensor cores can't help
### The FlashAttention analogy
FlashAttention's insight: the N×N attention matrix doesn't fit in SRAM,
but you can tile it so each tile does. You recompute instead of materializing.
**Sparse kernel compilation insight**: the sparsity pattern is **known at compile time**.
A compiler can:
1. **Analyze the sparsity graph** of each weight matrix
2. **Find blocks** of non-zero weights that are close in memory
3. **Generate a tiling schedule** that loads these blocks into SRAM efficiently
4. **Emit fused kernels** where the memory access pattern is baked in as constants
The resulting kernel looks like a dense kernel to the hardware —
sequential reads, high SRAM reuse, maybe even tensor core compatible
(if the compiler finds dense sub-blocks within the sparse matrix).
## Block sparsity vs unstructured sparsity
**Block sparse** (e.g., 4×4 or 16×16 blocks zeroed out):
- GPU-friendly: blocks map to tensor core operations
- Less flexible: coarser pruning granularity → less achievable sparsity
- NVIDIA's 2:4 structured sparsity gets ~50% sparsity with tensor core support
- Real-world: typically 50-70% sparsity achievable without quality loss
**Unstructured sparse** (individual weights zeroed):
- Maximally flexible: fine-grained pruning → higher achievable sparsity
- GPU-hostile: the gather/scatter problem
- Real-world: 80-95% sparsity achievable in many layers without quality loss
**The compiled kernel approach bridges this**: take unstructured sparsity
(maximally flexible, high compression) and compile it into a kernel that
runs as efficiently as block-sparse. Best of both worlds.
## Recurrent depth composability
From our April 10 discussion: middle transformer layers are doing
open-coded simulated annealing — similar weights, similar computation.
If layers 8-24 have cosine similarity > 0.95:
- Replace 16 layers with 1 layer × 16 iterations
- **Parameter reduction**: 16 × 361M = 5.8B → 361M (16x reduction for those layers)
- **Memory bandwidth**: load one layer's weights, iterate in SRAM
- Combined with 50% sparsity on the remaining unique layers:
- Unique layers (48): 48 × 361M × 0.5 = 8.7B params
- Recurrent layer: 361M × 0.5 = 180M params (but iterated 16x in SRAM)
- Total loaded per token: ~8.9B × 2 bytes = 17.8GB
- Time: 17.8/8000 ≈ **2.2ms per token** (vs 6.9ms dense) — **3.1x speedup**
With higher sparsity (75%) + recurrence:
- Unique layers: 48 × 361M × 0.25 = 4.3B
- Recurrent: 90M
- Total: ~4.4B × 2 = 8.8GB → **1.1ms per token** — **6.3x speedup**
## What needs to happen
### Phase 1: Measure (can do now with B200 access)
1. Extract all weight matrices from Qwen3.5-27B
2. For each matrix, compute:
- Magnitude distribution (what % of weights are near-zero?)
- Achievable sparsity at various thresholds (L1 magnitude pruning)
- Dense sub-block statistics (how many 4×4, 16×16 blocks are all-zero?)
3. Layer similarity: pairwise cosine similarity of weight matrices across layers
- Which layers are nearly identical? (recurrence candidates)
4. Validate quality: run perplexity eval at various sparsity levels
### Phase 2: Compile (research project)
1. For a single sparse weight matrix, generate an optimized Triton kernel
2. Benchmark vs dense GEMM and vs NVIDIA's 2:4 sparse
3. Iterate on the tiling strategy
### Phase 3: End-to-end
1. Full model with compiled sparse kernels
2. Perplexity + latency benchmarks
3. Compare: dense, 2:4 structured, compiled unstructured
## Related work to read
- **SparseGPT** (Frantar & Alistarh, 2023): one-shot pruning to 50-60% unstructured sparsity
with minimal quality loss. Key result: large models are more prunable than small ones.
- **Wanda** (Sun et al., 2023): pruning by weight magnitude × input activation.
Simpler than SparseGPT, comparable results.
- **NVIDIA 2:4 sparsity**: hardware-supported structured sparsity on Ampere+.
50% sparsity, ~2x speedup on tensor cores. The existence proof that sparse can be fast.
- **Triton** (Tillet et al.): Python DSL for GPU kernel generation.
The right compilation target — can express arbitrary tiling strategies.
- **TACO** (Kjolstad et al.): tensor algebra compiler. Generates kernels for
specific sparse tensor formats. Academic but the ideas are right.
- **FlashAttention** (Dao et al.): the tiling strategy to learn from.
- **DejaVu** (Liu et al., 2023): contextual sparsity — predicting which neurons
to activate per input. Dynamic sparsity, complementary to weight sparsity.
## The bigger picture
Current state of the art: dense models with FlashAttention for the N×N attention part.
Weight sparsity is known to work (SparseGPT, Wanda) but isn't deployed because
the GPU kernels don't exist to exploit it efficiently.
The gap: nobody has built a compiler that takes a specific sparse weight matrix
and emits a kernel optimized for that exact pattern. FlashAttention proved
that custom kernels for specific computational patterns beat general-purpose ones.
The same should hold for sparse weight patterns.
**The bet**: a compiled sparse kernel for Qwen3.5-27B's actual sparsity pattern
would be within 80% of the theoretical bandwidth-bound speedup. If true,
50% sparsity → 1.6x real speedup, 75% → 3.2x, composing with recurrent depth
for potentially 5-6x total.
That would make 27B inference as fast as a 5B dense model, with 27B quality.

View file

@ -764,7 +764,7 @@ impl ContextState {
pub fn conversation(&self) -> &[AstNode] { &self.conversation }
pub fn conversation_mut(&mut self) -> &mut Vec<AstNode> { &mut self.conversation }
fn sections(&self) -> [&Vec<AstNode>; 4] {
pub fn sections(&self) -> [&Vec<AstNode>; 4] {
[&self.system, &self.identity, &self.journal, &self.conversation]
}
}

View file

@ -192,12 +192,15 @@ impl Agent {
prompt_file: String,
conversation_log: Option<ConversationLog>,
active_tools: tools::ActiveTools,
agent_tools: Vec<tools::Tool>,
) -> Arc<Self> {
let mut context = ContextState::new();
context.conversation_log = conversation_log;
context.push_no_log(Section::System, AstNode::system_msg(&system_prompt));
let tool_defs = tools::all_tool_definitions().await;
let mut tool_defs: Vec<String> = agent_tools.iter().map(|t| t.to_json()).collect();
tool_defs.extend(tools::all_mcp_tool_definitions().await);
if !tool_defs.is_empty() {
let tools_text = format!(
"# Tools\n\nYou have access to the following functions:\n\n<tools>\n{}\n</tools>\n\n\
@ -223,7 +226,7 @@ impl Agent {
session_id,
context: tokio::sync::Mutex::new(context),
state: tokio::sync::Mutex::new(AgentState {
tools: tools::tools(),
tools: agent_tools,
mcp_tools: McpToolAccess::All,
last_prompt_tokens: 0,
reasoning_effort: "none".to_string(),

View file

@ -138,6 +138,7 @@ impl AutoAgent {
app, String::new(),
None,
super::tools::ActiveTools::new(),
super::tools::tools(),
).await;
{
let mut st = agent.state.lock().await;

View file

@ -260,7 +260,7 @@ async fn query(args: &serde_json::Value) -> Result<String> {
let store = arc.lock().await;
let graph = store.build_graph();
let stages = crate::search::Stage::parse_pipeline(query_str)
let stages = crate::query_parser::parse_stages(query_str)
.map_err(|e| anyhow::anyhow!("{}", e))?;
let results = crate::search::run_query(&stages, vec![], &graph, &store, false, 100);
let keys: Vec<String> = results.into_iter().map(|(k, _)| k).collect();
@ -272,9 +272,58 @@ async fn query(args: &serde_json::Value) -> Result<String> {
Ok(crate::subconscious::prompts::format_nodes_section(&store, &items, &graph))
}
_ => {
crate::query_parser::query_to_string(&store, &graph, query_str)
.map_err(|e| anyhow::anyhow!("{}", e))
// Compact output: check for count/select stages, else just list keys
use crate::search::{Stage, Transform};
let has_count = stages.iter().any(|s| matches!(s, Stage::Transform(Transform::Count)));
if has_count {
return Ok(keys.len().to_string());
}
if keys.is_empty() {
return Ok("no results".to_string());
}
let select_fields: Option<&Vec<String>> = stages.iter().find_map(|s| match s {
Stage::Transform(Transform::Select(f)) => Some(f),
_ => None,
});
if let Some(fields) = select_fields {
let mut out = String::from("key\t");
out.push_str(&fields.join("\t"));
out.push('\n');
for key in &keys {
out.push_str(key);
for f in fields {
out.push('\t');
out.push_str(&resolve_field_str(&store, &graph, key, f));
}
out.push('\n');
}
Ok(out)
} else {
Ok(keys.join("\n"))
}
}
}
}
fn resolve_field_str(store: &crate::store::Store, graph: &crate::graph::Graph, key: &str, field: &str) -> String {
let node = match store.nodes.get(key) {
Some(n) => n,
None => return "-".to_string(),
};
match field {
"key" => key.to_string(),
"weight" => format!("{:.3}", node.weight),
"node_type" => format!("{:?}", node.node_type),
"provenance" => node.provenance.clone(),
"emotion" => format!("{}", node.emotion),
"retrievals" => format!("{}", node.retrievals),
"uses" => format!("{}", node.uses),
"wrongs" => format!("{}", node.wrongs),
"created" => format!("{}", node.created_at),
"timestamp" => format!("{}", node.timestamp),
"degree" => format!("{}", graph.degree(key)),
"content_len" => format!("{}", node.content.len()),
_ => "-".to_string(),
}
}

View file

@ -195,6 +195,10 @@ pub async fn all_tool_definitions() -> Vec<String> {
defs
}
pub async fn all_mcp_tool_definitions() -> Vec<String> {
mcp_client::tool_definitions_json().await
}
/// Memory + journal tools only — for subconscious agents.
pub fn memory_and_journal_tools() -> Vec<Tool> {
let mut all = memory::memory_tools().to_vec();

View file

@ -25,7 +25,7 @@ pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option
target.to_vec()
} else if let Some(q) = query {
let graph = store.build_graph();
let stages = crate::search::Stage::parse_pipeline(q)?;
let stages = crate::query_parser::parse_stages(q)?;
let results = crate::search::run_query(&stages, vec![], &graph, &store, false, count);
if results.is_empty() {
return Err(format!("query returned no results: {}", q));

View file

@ -3,25 +3,26 @@
pub fn cmd_search(terms: &[String], pipeline_args: &[String], expand: bool, full: bool, debug: bool, fuzzy: bool, content: bool) -> Result<(), String> {
use std::collections::BTreeMap;
use crate::search::{Stage, Algorithm, AlgoStage};
// When running inside an agent session, exclude already-surfaced nodes
let seen = crate::session::HookSession::from_env()
.map(|s| s.seen())
.unwrap_or_default();
// Parse pipeline stages (unified: algorithms, filters, transforms, generators)
let stages: Vec<crate::search::Stage> = if pipeline_args.is_empty() {
vec![crate::search::Stage::Algorithm(crate::search::AlgoStage::parse("spread").unwrap())]
// Build pipeline: if args provided, parse them; otherwise default to spread
let stages: Vec<Stage> = if pipeline_args.is_empty() {
vec![Stage::Algorithm(AlgoStage { algo: Algorithm::Spread, params: std::collections::HashMap::new() })]
} else {
pipeline_args.iter()
.map(|a| crate::search::Stage::parse(a))
.collect::<Result<Vec<_>, _>>()?
// Join args with | and parse as unified query
let pipeline_str = format!("all | {}", pipeline_args.join(" | "));
crate::query_parser::parse_stages(&pipeline_str)?
};
// Check if pipeline needs full Store (has filters/transforms/generators)
let needs_store = stages.iter().any(|s| !matches!(s, crate::search::Stage::Algorithm(_)));
let needs_store = stages.iter().any(|s| !matches!(s, Stage::Algorithm(_)));
// Check if pipeline starts with a generator (doesn't need seed terms)
let has_generator = stages.first().map(|s| matches!(s, crate::search::Stage::Generator(_))).unwrap_or(false);
let has_generator = stages.first().map(|s| matches!(s, Stage::Generator(_))).unwrap_or(false);
if terms.is_empty() && !has_generator {
return Err("search requires terms or a generator stage (e.g. 'all')".into());

View file

@ -157,6 +157,9 @@ pub enum Filter {
pub enum Transform {
Sort(SortField),
Limit(usize),
Select(Vec<String>),
Count,
Connectivity,
DominatingSet,
}
@ -168,6 +171,8 @@ pub enum SortField {
Degree,
Weight,
Isolation,
Key,
Named(String, bool), // (field_name, ascending)
Composite(Vec<(ScoreField, f64)>),
}
@ -206,79 +211,6 @@ impl Cmp {
}
}
/// Parse a comparison like ">0.5", ">=60", "<7d" (durations converted to seconds).
fn parse_cmp(s: &str) -> Result<Cmp, String> {
let (op_len, ctor): (usize, fn(f64) -> Cmp) = if s.starts_with(">=") {
(2, Cmp::Gte)
} else if s.starts_with("<=") {
(2, Cmp::Lte)
} else if s.starts_with('>') {
(1, Cmp::Gt)
} else if s.starts_with('<') {
(1, Cmp::Lt)
} else if s.starts_with('=') {
(1, Cmp::Eq)
} else {
return Err(format!("expected comparison operator in '{}'", s));
};
let val_str = &s[op_len..];
let val = parse_duration_or_number(val_str)?;
Ok(ctor(val))
}
/// Parse "7d", "24h", "30m" as seconds, or plain numbers.
fn parse_duration_or_number(s: &str) -> Result<f64, String> {
if let Some(n) = s.strip_suffix('d') {
let v: f64 = n.parse().map_err(|_| format!("bad number: {}", n))?;
Ok(v * 86400.0)
} else if let Some(n) = s.strip_suffix('h') {
let v: f64 = n.parse().map_err(|_| format!("bad number: {}", n))?;
Ok(v * 3600.0)
} else if let Some(n) = s.strip_suffix('m') {
let v: f64 = n.parse().map_err(|_| format!("bad number: {}", n))?;
Ok(v * 60.0)
} else {
s.parse().map_err(|_| format!("bad number: {}", s))
}
}
/// Parse composite sort: "isolation*0.7+recency(linker)*0.3"
/// Each term is field or field(arg), optionally *weight (default 1.0).
fn parse_composite_sort(s: &str) -> Result<Vec<(ScoreField, f64)>, String> {
let mut terms = Vec::new();
for term in s.split('+') {
let term = term.trim();
let (field_part, weight) = if let Some((f, w)) = term.rsplit_once('*') {
(f, w.parse::<f64>().map_err(|_| format!("bad weight: {}", w))?)
} else {
(term, 1.0)
};
// Parse field, possibly with (arg)
let field = if let Some((name, arg)) = field_part.split_once('(') {
let arg = arg.strip_suffix(')').ok_or("missing ) in sort field")?;
match name {
"recency" => ScoreField::Recency(arg.to_string()),
_ => return Err(format!("unknown parameterized sort field: {}", name)),
}
} else {
match field_part {
"isolation" => ScoreField::Isolation,
"degree" => ScoreField::Degree,
"weight" => ScoreField::Weight,
"content-len" => ScoreField::ContentLen,
"priority" => ScoreField::Priority,
_ => return Err(format!("unknown sort field: {}", field_part)),
}
};
terms.push((field, weight));
}
if terms.is_empty() {
return Err("empty composite sort".into());
}
Ok(terms)
}
/// Compute a 0-1 score for a node on a single dimension.
fn score_field(
@ -348,129 +280,6 @@ impl CompositeCache {
}
}
/// Parse a NodeType from a label.
fn parse_node_type(s: &str) -> Result<NodeType, String> {
match s {
"episodic" | "session" => Ok(NodeType::EpisodicSession),
"daily" => Ok(NodeType::EpisodicDaily),
"weekly" => Ok(NodeType::EpisodicWeekly),
"monthly" => Ok(NodeType::EpisodicMonthly),
"semantic" => Ok(NodeType::Semantic),
_ => Err(format!("unknown node type: {} (use: episodic, semantic, daily, weekly, monthly)", s)),
}
}
impl Stage {
/// Parse a single stage from a string.
///
/// Algorithm names are tried first (bare words), then predicate syntax
/// (contains ':'). No ambiguity since algorithms are bare words.
pub fn parse(s: &str) -> Result<Self, String> {
let s = s.trim();
let (negated, s) = if let Some(rest) = s.strip_prefix('!') {
(true, rest)
} else {
(false, s)
};
// Generator: "all"
if s == "all" {
return Ok(Stage::Generator(Generator::All));
}
// Transform: "dominating-set"
if s == "dominating-set" {
return Ok(Stage::Transform(Transform::DominatingSet));
}
// Try algorithm parse first (bare words, no colon)
if !s.contains(':')
&& let Ok(algo) = AlgoStage::parse(s) {
return Ok(Stage::Algorithm(algo));
}
// Algorithm with params: "spread,max_hops=4" (contains comma but no colon)
if s.contains(',') && !s.contains(':') {
return AlgoStage::parse(s).map(Stage::Algorithm);
}
// Predicate/transform syntax: "key:value"
let (prefix, value) = s.split_once(':')
.ok_or_else(|| format!("unknown stage: {}", s))?;
let filter_or_transform = match prefix {
"type" => Stage::Filter(Filter::Type(parse_node_type(value)?)),
"key" => Stage::Filter(Filter::KeyGlob(value.to_string())),
"weight" => Stage::Filter(Filter::Weight(parse_cmp(value)?)),
"age" => Stage::Filter(Filter::Age(parse_cmp(value)?)),
"content-len" => Stage::Filter(Filter::ContentLen(parse_cmp(value)?)),
"provenance" => {
Stage::Filter(Filter::Provenance(value.to_string()))
}
"not-visited" => {
let (agent, dur) = value.split_once(',')
.ok_or("not-visited:AGENT,DURATION")?;
let secs = parse_duration_or_number(dur)?;
Stage::Filter(Filter::NotVisited {
agent: agent.to_string(),
duration: secs as i64,
})
}
"visited" => Stage::Filter(Filter::Visited {
agent: value.to_string(),
}),
"sort" => {
// Check for composite sort: field*weight+field*weight+...
let field = if value.contains('+') || value.contains('*') {
SortField::Composite(parse_composite_sort(value)?)
} else {
match value {
"priority" => SortField::Priority,
"timestamp" => SortField::Timestamp,
"content-len" => SortField::ContentLen,
"degree" => SortField::Degree,
"weight" => SortField::Weight,
"isolation" => SortField::Isolation,
_ => return Err(format!("unknown sort field: {}", value)),
}
};
Stage::Transform(Transform::Sort(field))
}
"limit" => {
let n: usize = value.parse()
.map_err(|_| format!("bad limit: {}", value))?;
Stage::Transform(Transform::Limit(n))
}
"match" => {
let terms: Vec<String> = value.split(',')
.map(|t| t.to_string())
.collect();
Stage::Generator(Generator::Match(terms))
}
// Algorithm with colon in params? Try fallback.
_ => return AlgoStage::parse(s).map(Stage::Algorithm)
.map_err(|_| format!("unknown stage: {}", s)),
};
// Apply negation to filters
if negated {
match filter_or_transform {
Stage::Filter(f) => Ok(Stage::Filter(Filter::Negated(Box::new(f)))),
_ => Err("! prefix only works on filter stages".to_string()),
}
} else {
Ok(filter_or_transform)
}
}
/// Parse a pipe-separated pipeline string.
pub fn parse_pipeline(s: &str) -> Result<Vec<Stage>, String> {
s.split('|')
.map(|part| Stage::parse(part.trim()))
.collect()
}
}
impl fmt::Display for Stage {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
@ -479,6 +288,9 @@ impl fmt::Display for Stage {
Stage::Filter(filt) => write!(f, "{}", filt),
Stage::Transform(Transform::Sort(field)) => write!(f, "sort:{:?}", field),
Stage::Transform(Transform::Limit(n)) => write!(f, "limit:{}", n),
Stage::Transform(Transform::Select(fields)) => write!(f, "select:{}", fields.join(",")),
Stage::Transform(Transform::Count) => write!(f, "count"),
Stage::Transform(Transform::Connectivity) => write!(f, "connectivity"),
Stage::Transform(Transform::DominatingSet) => write!(f, "dominating-set"),
Stage::Algorithm(a) => write!(f, "{}", a.algo),
}
@ -613,7 +425,7 @@ fn run_generator(g: &Generator, store: &Store) -> Vec<(String, f64)> {
}
}
fn eval_filter(filt: &Filter, key: &str, store: &Store, now: i64) -> bool {
pub fn eval_filter(filt: &Filter, key: &str, store: &Store, now: i64) -> bool {
let node = match store.nodes.get(key) {
Some(n) => n,
None => return false,
@ -686,6 +498,39 @@ pub fn run_transform(
sb.total_cmp(&sa) // most isolated first
});
}
SortField::Key => {
items.sort_by(|a, b| a.0.cmp(&b.0));
}
SortField::Named(field, asc) => {
// Resolve field from node properties
let resolve = |key: &str| -> Option<f64> {
let node = store.nodes.get(key)?;
match field.as_str() {
"weight" => Some(node.weight as f64),
"emotion" => Some(node.emotion as f64),
"retrievals" => Some(node.retrievals as f64),
"uses" => Some(node.uses as f64),
"wrongs" => Some(node.wrongs as f64),
"created" => Some(node.created_at as f64),
"timestamp" => Some(node.timestamp as f64),
"degree" => Some(graph.degree(key) as f64),
"content_len" => Some(node.content.len() as f64),
_ => None,
}
};
let asc = *asc;
items.sort_by(|a, b| {
let va = resolve(&a.0);
let vb = resolve(&b.0);
let ord = match (va, vb) {
(Some(a), Some(b)) => a.total_cmp(&b),
(Some(_), None) => std::cmp::Ordering::Less,
(None, Some(_)) => std::cmp::Ordering::Greater,
(None, None) => a.0.cmp(&b.0),
};
if asc { ord } else { ord.reverse() }
});
}
SortField::Priority => {
// Pre-compute priorities to avoid O(n log n) calls
// inside the sort comparator.
@ -725,6 +570,8 @@ pub fn run_transform(
items.truncate(*n);
items
}
// Output mode directives - don't modify result set, handled at output layer
Transform::Select(_) | Transform::Count | Transform::Connectivity => items,
Transform::DominatingSet => {
// Greedy 3-covering dominating set: pick the node that covers
// the most under-covered neighbors, repeat until every node

View file

@ -26,6 +26,12 @@ use crate::graph::Graph;
use regex::Regex;
use std::collections::BTreeMap;
// Re-export engine types used by Query
pub use super::engine::{
Stage, Filter, Transform, Generator, SortField,
Algorithm, AlgoStage, Cmp,
};
// -- AST types --
#[derive(Debug, Clone)]
@ -57,16 +63,6 @@ pub enum CmpOp {
Gt, Lt, Ge, Le, Eq, Ne, Match,
}
#[derive(Debug, Clone)]
pub enum Stage {
Sort { field: String, ascending: bool },
Limit(usize),
Select(Vec<String>),
Count,
Connectivity,
DominatingSet,
}
#[derive(Debug, Clone)]
pub struct Query {
pub expr: Expr,
@ -86,18 +82,54 @@ peg::parser! {
= s:(_ "|" _ s:stage() { s })* { s }
rule stage() -> Stage
= "sort" _ f:field() _ a:asc_desc() { Stage::Sort { field: f, ascending: a } }
/ "limit" _ n:integer() { Stage::Limit(n) }
/ "select" _ f:field_list() { Stage::Select(f) }
/ "count" { Stage::Count }
/ "connectivity" { Stage::Connectivity }
/ "dominating-set" { Stage::DominatingSet }
// Original PEG syntax (space-separated)
= "sort" _ f:field() _ a:asc_desc() {
Stage::Transform(Transform::Sort(make_sort_field(&f, a)))
}
/ "limit" _ n:integer() { Stage::Transform(Transform::Limit(n)) }
/ "select" _ f:field_list() { Stage::Transform(Transform::Select(f)) }
/ "count" { Stage::Transform(Transform::Count) }
/ "connectivity" { Stage::Transform(Transform::Connectivity) }
/ "dominating-set" { Stage::Transform(Transform::DominatingSet) }
// Pipeline syntax (colon-separated)
/ "sort:" f:field() { Stage::Transform(Transform::Sort(make_sort_field(&f, false))) }
/ "limit:" n:integer() { Stage::Transform(Transform::Limit(n)) }
/ "select:" f:field_list_colon() { Stage::Transform(Transform::Select(f)) }
/ "type:" t:ident() { make_type_filter(&t) }
/ "age:" c:cmp_duration() { Stage::Filter(Filter::Age(c)) }
/ "key:" g:ident() { Stage::Filter(Filter::KeyGlob(g)) }
/ "provenance:" p:ident() { Stage::Filter(Filter::Provenance(p)) }
/ "all" { Stage::Generator(Generator::All) }
// Graph algorithms
/ "spread" { Stage::Algorithm(AlgoStage { algo: Algorithm::Spread, params: std::collections::HashMap::new() }) }
/ "spectral" { Stage::Algorithm(AlgoStage { algo: Algorithm::Spectral, params: std::collections::HashMap::new() }) }
rule asc_desc() -> bool
= "asc" { true }
/ "desc" { false }
/ { false } // default: descending
rule field_list_colon() -> Vec<String>
= f:field() fs:("," f:field() { f })* {
let mut v = vec![f];
v.extend(fs);
v
}
rule cmp_duration() -> Cmp
= ">=" n:duration() { Cmp::Gte(n) }
/ "<=" n:duration() { Cmp::Lte(n) }
/ ">" n:duration() { Cmp::Gt(n) }
/ "<" n:duration() { Cmp::Lt(n) }
/ "=" n:duration() { Cmp::Eq(n) }
rule duration() -> f64
= n:number() "d" { n * 86400.0 }
/ n:number() "h" { n * 3600.0 }
/ n:number() "m" { n * 60.0 }
/ n:number() "s" { n }
/ n:number() { n }
rule field_list() -> Vec<String>
= f:field() fs:(_ "," _ f:field() { f })* {
let mut v = vec![f];
@ -122,6 +154,7 @@ peg::parser! {
Expr::Comparison { field: f, op, value: v }
}
"*" { Expr::All }
"all" { Expr::All }
"(" _ e:expr() _ ")" { e }
}
@ -167,6 +200,55 @@ peg::parser! {
}
}
// -- Helper functions for PEG grammar --
fn make_sort_field(field: &str, ascending: bool) -> SortField {
match field {
"priority" => SortField::Priority,
"timestamp" => SortField::Timestamp,
"content-len" | "content_len" => SortField::ContentLen,
"degree" => SortField::Degree,
"weight" => SortField::Weight,
"isolation" => SortField::Isolation,
"key" => SortField::Key,
_ => SortField::Named(field.to_string(), ascending),
}
}
fn make_type_filter(type_name: &str) -> Stage {
let node_type = match type_name {
"episodic" | "session" => NodeType::EpisodicSession,
"daily" => NodeType::EpisodicDaily,
"weekly" => NodeType::EpisodicWeekly,
"monthly" => NodeType::EpisodicMonthly,
"semantic" => NodeType::Semantic,
_ => NodeType::Semantic, // fallback
};
Stage::Filter(Filter::Type(node_type))
}
/// Parse a query string into Vec<Stage> for pipeline execution.
/// This is the unified entry point — replaces engine::Stage::parse_pipeline.
pub fn parse_stages(s: &str) -> Result<Vec<Stage>, String> {
let q = query_parser::query(s)
.map_err(|e| format!("Parse error: {}", e))?;
let mut stages = Vec::new();
// Convert Expr to a Generator stage
match &q.expr {
Expr::All => stages.push(Stage::Generator(Generator::All)),
_ => {
// For complex expressions, we need the Query-based path
// This shouldn't happen for pipeline queries
return Err("Complex expressions not supported in pipeline mode; use CLI query".into());
}
}
stages.extend(q.stages);
Ok(stages)
}
// -- Field resolution --
/// Resolve a field value from a node + graph context, returning a comparable Value.
@ -377,12 +459,12 @@ fn execute_parsed(
let mut set = Vec::new();
for stage in &q.stages {
match stage {
Stage::Select(fields) => {
Stage::Transform(Transform::Select(fields)) => {
for f in fields {
if !set.contains(f) { set.push(f.clone()); }
}
}
Stage::Sort { field, .. } => {
Stage::Transform(Transform::Sort(SortField::Named(field, _))) => {
if !set.contains(field) { set.push(field.clone()); }
}
_ => {}
@ -404,37 +486,75 @@ fn execute_parsed(
let mut has_sort = false;
for stage in &q.stages {
match stage {
Stage::Sort { field, ascending } => {
Stage::Transform(Transform::Sort(sort_field)) => {
has_sort = true;
let asc = *ascending;
match sort_field {
SortField::Named(field, asc) => {
let asc = *asc;
let field = field.clone();
results.sort_by(|a, b| {
let va = a.fields.get(field).and_then(as_num);
let vb = b.fields.get(field).and_then(as_num);
let va = a.fields.get(&field).and_then(as_num);
let vb = b.fields.get(&field).and_then(as_num);
let ord = match (va, vb) {
(Some(a), Some(b)) => a.total_cmp(&b),
_ => {
let sa = a.fields.get(field).map(as_str).unwrap_or_default();
let sb = b.fields.get(field).map(as_str).unwrap_or_default();
let sa = a.fields.get(&field).map(as_str).unwrap_or_default();
let sb = b.fields.get(&field).map(as_str).unwrap_or_default();
sa.cmp(&sb)
}
};
if asc { ord } else { ord.reverse() }
});
}
Stage::Limit(n) => {
SortField::Key => {
results.sort_by(|a, b| a.key.cmp(&b.key));
}
SortField::Degree => {
results.sort_by(|a, b| {
let da = graph.degree(&a.key);
let db = graph.degree(&b.key);
db.cmp(&da)
});
}
SortField::Weight => {
results.sort_by(|a, b| {
let wa = store.nodes.get(&a.key).map(|n| n.weight).unwrap_or(0.0);
let wb = store.nodes.get(&b.key).map(|n| n.weight).unwrap_or(0.0);
wb.total_cmp(&wa)
});
}
SortField::Timestamp => {
results.sort_by(|a, b| {
let ta = store.nodes.get(&a.key).map(|n| n.timestamp).unwrap_or(0);
let tb = store.nodes.get(&b.key).map(|n| n.timestamp).unwrap_or(0);
tb.cmp(&ta)
});
}
_ => {} // other sort fields handled by default degree sort
}
}
Stage::Transform(Transform::Limit(n)) => {
results.truncate(*n);
}
Stage::Connectivity => {} // handled in output
Stage::Select(_) | Stage::Count => {} // handled in output
Stage::DominatingSet => {
Stage::Transform(Transform::Connectivity) => {} // handled in output
Stage::Transform(Transform::Select(_) | Transform::Count) => {} // handled in output
Stage::Transform(Transform::DominatingSet) => {
let mut items: Vec<(String, f64)> = results.iter()
.map(|r| (r.key.clone(), graph.degree(&r.key) as f64))
.collect();
let xform = super::engine::Transform::DominatingSet;
let xform = Transform::DominatingSet;
items = super::engine::run_transform(&xform, items, store, graph);
let keep: std::collections::HashSet<String> = items.into_iter().map(|(k, _)| k).collect();
results.retain(|r| keep.contains(&r.key));
}
Stage::Filter(filt) => {
// Apply filter to narrow results
let now = crate::store::now_epoch();
results.retain(|r| super::engine::eval_filter(filt, &r.key, store, now));
}
Stage::Generator(_) | Stage::Algorithm(_) => {
// Generators are handled by Expr, algorithms not applicable here
}
}
}
@ -474,7 +594,7 @@ pub fn run_query(store: &Store, graph: &Graph, query_str: &str) -> Result<(), St
let results = execute_parsed(store, graph, &q)?;
// Count stage
if q.stages.iter().any(|s| matches!(s, Stage::Count)) {
if q.stages.iter().any(|s| matches!(s, Stage::Transform(Transform::Count))) {
println!("{}", results.len());
return Ok(());
}
@ -485,14 +605,14 @@ pub fn run_query(store: &Store, graph: &Graph, query_str: &str) -> Result<(), St
}
// Connectivity stage
if q.stages.iter().any(|s| matches!(s, Stage::Connectivity)) {
if q.stages.iter().any(|s| matches!(s, Stage::Transform(Transform::Connectivity))) {
print_connectivity(&results, graph);
return Ok(());
}
// Select stage
let fields: Option<&Vec<String>> = q.stages.iter().find_map(|s| match s {
Stage::Select(f) => Some(f),
Stage::Transform(Transform::Select(f)) => Some(f),
_ => None,
});
@ -527,7 +647,7 @@ pub fn query_to_string(store: &Store, graph: &Graph, query_str: &str) -> Result<
let results = execute_parsed(store, graph, &q)?;
if q.stages.iter().any(|s| matches!(s, Stage::Count)) {
if q.stages.iter().any(|s| matches!(s, Stage::Transform(Transform::Count))) {
return Ok(results.len().to_string());
}
if results.is_empty() {
@ -535,7 +655,7 @@ pub fn query_to_string(store: &Store, graph: &Graph, query_str: &str) -> Result<
}
let fields: Option<&Vec<String>> = q.stages.iter().find_map(|s| match s {
Stage::Select(f) => Some(f),
Stage::Transform(Transform::Select(f)) => Some(f),
_ => None,
});

View file

@ -298,6 +298,7 @@ impl Mind {
config.prompt_file.clone(),
conversation_log,
crate::agent::tools::ActiveTools::new(),
crate::agent::tools::tools(),
).await;
let shared = Arc::new(std::sync::Mutex::new(MindState::new(config.app.dmn.max_turns)));

View file

@ -77,20 +77,19 @@ impl Unconscious {
// Scan all .agent files, exclude subconscious-* and surface-observe
let mut agents: Vec<UnconsciousAgent> = Vec::new();
let all_tools = tools::memory::memory_tools().to_vec();
let base_tools = tools::memory::memory_tools().to_vec();
let extra_tools = tools::memory::journal_tools().to_vec();
for def in defs::load_defs() {
if def.agent.starts_with("subconscious-") { continue; }
if def.agent == "surface-observe" { continue; }
let enabled = enabled_map.get(&def.agent).copied()
.unwrap_or(false);
let effective_tools: Vec<tools::Tool> = if def.tools.is_empty() {
all_tools.clone()
} else {
all_tools.iter()
.filter(|t| def.tools.iter().any(|w| w == t.name))
.cloned()
.collect()
};
let mut effective_tools = base_tools.clone();
for name in &def.tools {
if let Some(t) = extra_tools.iter().find(|t| t.name == name) {
effective_tools.push(t.clone());
}
}
let steps: Vec<AutoStep> = def.steps.iter().map(|s| AutoStep {
prompt: s.prompt.clone(),
phase: s.phase.clone(),
@ -282,11 +281,11 @@ impl Unconscious {
client, system_prompt, personality,
app, String::new(), None,
crate::agent::tools::ActiveTools::new(),
auto.tools.clone(),
).await;
{
let mut st = agent.state.lock().await;
st.provenance = format!("unconscious:{}", auto.name);
st.tools = auto.tools.clone();
st.priority = Some(10);
}
@ -309,14 +308,11 @@ pub async fn save_agent_log(name: &str, agent: &std::sync::Arc<crate::agent::Age
if std::fs::create_dir_all(&dir).is_ok() {
let ts = chrono::Utc::now().format("%Y%m%d-%H%M%S");
let path = dir.join(format!("{}.json", ts));
let sections = serde_json::json!({
"system": ctx.system(),
"identity": ctx.identity(),
"journal": ctx.journal(),
"conversation": ctx.conversation(),
"stats": stats,
});
if let Ok(json) = serde_json::to_string_pretty(&sections) {
let mut context: Vec<&crate::agent::context::AstNode> = Vec::new();
for section in ctx.sections() {
context.extend(section);
}
if let Ok(json) = serde_json::to_string_pretty(&context) {
let _ = std::fs::write(&path, json);
}
}

View file

@ -1,4 +1,4 @@
{"agent": "digest", "schedule": "daily"}
{"agent": "digest", "schedule": "daily", "tools": ["journal_tail", "journal_new", "journal_update"]}
# Digest Agent — Episodic Consolidation

View file

@ -801,7 +801,7 @@ pub fn run_agent(
// Run the query if present
let keys = if !def.query.is_empty() {
let mut stages = search::Stage::parse_pipeline(&def.query)?;
let mut stages = crate::query_parser::parse_stages(&def.query)?;
let has_limit = stages.iter().any(|s|
matches!(s, search::Stage::Transform(search::Transform::Limit(_))));
if !has_limit {