Add progress callback to run_one_agent and run_and_apply so callers can see: prompt size, node list, LLM call timing, parsed action count, and per-action applied/skipped status. Daemon writes these to the persistent event log via log_event. Cap organize cluster to 20 nodes - 126 nodes produced a 682KB prompt that timed out every time. Agent has tools to explore further if needed. Restore general query for production runs.
1046 lines
36 KiB
Rust
1046 lines
36 KiB
Rust
// knowledge.rs — knowledge agent action parsing, depth tracking, and convergence loop
|
|
//
|
|
// Agent prompts live in agents/*.agent files, dispatched via defs.rs.
|
|
// This module handles:
|
|
// - Action parsing (WRITE_NODE, LINK, REFINE from LLM output)
|
|
// - Inference depth tracking (prevents runaway abstraction)
|
|
// - Action application (write to store with provenance)
|
|
// - Convergence loop (sequences agents, measures graph stability)
|
|
// - Conversation fragment selection (for observation agent)
|
|
|
|
use crate::graph::Graph;
|
|
use crate::spectral;
|
|
use super::llm;
|
|
use crate::store::{self, Store, new_relation, RelationType};
|
|
|
|
use regex::Regex;
|
|
use serde::{Deserialize, Serialize};
|
|
use std::collections::HashMap;
|
|
use std::fs;
|
|
use std::path::{Path, PathBuf};
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Action types
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct Action {
|
|
pub kind: ActionKind,
|
|
pub confidence: Confidence,
|
|
pub weight: f64,
|
|
pub depth: i32,
|
|
pub applied: Option<bool>,
|
|
pub rejected_reason: Option<String>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub enum ActionKind {
|
|
WriteNode {
|
|
key: String,
|
|
content: String,
|
|
covers: Vec<String>,
|
|
},
|
|
Link {
|
|
source: String,
|
|
target: String,
|
|
},
|
|
Refine {
|
|
key: String,
|
|
content: String,
|
|
},
|
|
Demote {
|
|
key: String,
|
|
},
|
|
Delete {
|
|
key: String,
|
|
},
|
|
}
|
|
|
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
|
#[serde(rename_all = "lowercase")]
|
|
pub enum Confidence {
|
|
High,
|
|
Medium,
|
|
Low,
|
|
}
|
|
|
|
impl Confidence {
|
|
/// Weight for delta metrics — how much this action contributes to change measurement.
|
|
fn delta_weight(self) -> f64 {
|
|
match self {
|
|
Self::High => 1.0,
|
|
Self::Medium => 0.6,
|
|
Self::Low => 0.3,
|
|
}
|
|
}
|
|
|
|
/// Confidence value for depth gating — capped below 1.0 so even "high" must clear thresholds.
|
|
fn gate_value(self) -> f64 {
|
|
match self {
|
|
Self::High => 0.9,
|
|
Self::Medium => 0.6,
|
|
Self::Low => 0.3,
|
|
}
|
|
}
|
|
|
|
fn parse(s: &str) -> Self {
|
|
match s.to_lowercase().as_str() {
|
|
"high" => Self::High,
|
|
"low" => Self::Low,
|
|
_ => Self::Medium,
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Action parsing
|
|
// ---------------------------------------------------------------------------
|
|
|
|
pub fn parse_write_nodes(text: &str) -> Vec<Action> {
|
|
let re = Regex::new(r"(?s)WRITE_NODE\s+(\S+)\s*\n(.*?)END_NODE").unwrap();
|
|
let conf_re = Regex::new(r"(?i)CONFIDENCE:\s*(high|medium|low)").unwrap();
|
|
let covers_re = Regex::new(r"COVERS:\s*(.+)").unwrap();
|
|
|
|
re.captures_iter(text)
|
|
.map(|cap| {
|
|
let key = cap[1].to_string();
|
|
let mut content = cap[2].trim().to_string();
|
|
|
|
let confidence = conf_re
|
|
.captures(&content)
|
|
.map(|c| Confidence::parse(&c[1]))
|
|
.unwrap_or(Confidence::Medium);
|
|
content = conf_re.replace(&content, "").trim().to_string();
|
|
|
|
let covers: Vec<String> = covers_re
|
|
.captures(&content)
|
|
.map(|c| c[1].split(',').map(|s| s.trim().to_string()).collect())
|
|
.unwrap_or_default();
|
|
content = covers_re.replace(&content, "").trim().to_string();
|
|
|
|
Action {
|
|
weight: confidence.delta_weight(),
|
|
kind: ActionKind::WriteNode { key, content, covers },
|
|
confidence,
|
|
depth: 0,
|
|
applied: None,
|
|
rejected_reason: None,
|
|
}
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
pub fn parse_links(text: &str) -> Vec<Action> {
|
|
let re = Regex::new(r"(?m)^LINK\s+(\S+)\s+(\S+)").unwrap();
|
|
re.captures_iter(text)
|
|
.map(|cap| Action {
|
|
kind: ActionKind::Link {
|
|
source: cap[1].to_string(),
|
|
target: cap[2].to_string(),
|
|
},
|
|
confidence: Confidence::Low,
|
|
weight: 0.3,
|
|
depth: -1,
|
|
applied: None,
|
|
rejected_reason: None,
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
pub fn parse_refines(text: &str) -> Vec<Action> {
|
|
let re = Regex::new(r"(?s)REFINE\s+(\S+)\s*\n(.*?)END_REFINE").unwrap();
|
|
re.captures_iter(text)
|
|
.map(|cap| {
|
|
let key = cap[1].trim_matches('*').trim().to_string();
|
|
Action {
|
|
kind: ActionKind::Refine {
|
|
key,
|
|
content: cap[2].trim().to_string(),
|
|
},
|
|
confidence: Confidence::Medium,
|
|
weight: 0.7,
|
|
depth: 0,
|
|
applied: None,
|
|
rejected_reason: None,
|
|
}
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
pub fn parse_demotes(text: &str) -> Vec<Action> {
|
|
let re = Regex::new(r"(?m)^DEMOTE\s+(\S+)").unwrap();
|
|
re.captures_iter(text)
|
|
.map(|cap| Action {
|
|
kind: ActionKind::Demote {
|
|
key: cap[1].to_string(),
|
|
},
|
|
confidence: Confidence::Medium,
|
|
weight: 0.5,
|
|
depth: -1,
|
|
applied: None,
|
|
rejected_reason: None,
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
pub fn parse_deletes(text: &str) -> Vec<Action> {
|
|
let re = Regex::new(r"(?m)^DELETE\s+(\S+)").unwrap();
|
|
re.captures_iter(text)
|
|
.map(|cap| Action {
|
|
kind: ActionKind::Delete {
|
|
key: cap[1].to_string(),
|
|
},
|
|
confidence: Confidence::High,
|
|
weight: 1.0,
|
|
depth: 0,
|
|
applied: None,
|
|
rejected_reason: None,
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
pub fn parse_all_actions(text: &str) -> Vec<Action> {
|
|
let mut actions = parse_write_nodes(text);
|
|
actions.extend(parse_links(text));
|
|
actions.extend(parse_refines(text));
|
|
actions.extend(parse_demotes(text));
|
|
actions.extend(parse_deletes(text));
|
|
actions
|
|
}
|
|
|
|
pub fn count_no_ops(text: &str) -> usize {
|
|
let no_conn = Regex::new(r"\bNO_CONNECTION\b").unwrap().find_iter(text).count();
|
|
let affirm = Regex::new(r"\bAFFIRM\b").unwrap().find_iter(text).count();
|
|
let no_extract = Regex::new(r"\bNO_EXTRACTION\b").unwrap().find_iter(text).count();
|
|
no_conn + affirm + no_extract
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Inference depth tracking
|
|
// ---------------------------------------------------------------------------
|
|
|
|
const DEPTH_DB_KEY: &str = "_knowledge-depths";
|
|
|
|
#[derive(Default)]
|
|
pub struct DepthDb {
|
|
depths: HashMap<String, i32>,
|
|
}
|
|
|
|
impl DepthDb {
|
|
pub fn load(store: &Store) -> Self {
|
|
let depths = store.nodes.get(DEPTH_DB_KEY)
|
|
.and_then(|n| serde_json::from_str(&n.content).ok())
|
|
.unwrap_or_default();
|
|
Self { depths }
|
|
}
|
|
|
|
pub fn save(&self, store: &mut Store) {
|
|
if let Ok(json) = serde_json::to_string(&self.depths) {
|
|
store.upsert_provenance(DEPTH_DB_KEY, &json,
|
|
"observation:write").ok();
|
|
}
|
|
}
|
|
|
|
pub fn get(&self, key: &str) -> i32 {
|
|
self.depths.get(key).copied().unwrap_or(0)
|
|
}
|
|
|
|
pub fn set(&mut self, key: String, depth: i32) {
|
|
self.depths.insert(key, depth);
|
|
}
|
|
}
|
|
|
|
/// Agent base depths: observation=1, extractor=2, connector=3
|
|
fn agent_base_depth(agent: &str) -> Option<i32> {
|
|
match agent {
|
|
"observation" => Some(1),
|
|
"extractor" => Some(2),
|
|
"connector" => Some(3),
|
|
"challenger" => None,
|
|
_ => Some(2),
|
|
}
|
|
}
|
|
|
|
pub fn compute_action_depth(db: &DepthDb, action: &Action, agent: &str) -> i32 {
|
|
match &action.kind {
|
|
ActionKind::Link { .. } | ActionKind::Demote { .. } | ActionKind::Delete { .. } => -1,
|
|
ActionKind::Refine { key, .. } => db.get(key),
|
|
ActionKind::WriteNode { covers, .. } => {
|
|
if !covers.is_empty() {
|
|
covers.iter().map(|k| db.get(k)).max().unwrap_or(0) + 1
|
|
} else {
|
|
agent_base_depth(agent).unwrap_or(2)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Confidence threshold that scales with inference depth.
|
|
pub fn required_confidence(depth: i32, base: f64) -> f64 {
|
|
if depth <= 0 {
|
|
return 0.0;
|
|
}
|
|
1.0 - (1.0 - base).powi(depth)
|
|
}
|
|
|
|
/// Confidence bonus from real-world use.
|
|
pub fn use_bonus(use_count: u32) -> f64 {
|
|
if use_count == 0 {
|
|
return 0.0;
|
|
}
|
|
1.0 - 1.0 / (1.0 + 0.15 * use_count as f64)
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Action application
|
|
// ---------------------------------------------------------------------------
|
|
|
|
fn stamp_content(content: &str, agent: &str, timestamp: &str, depth: i32) -> String {
|
|
format!("<!-- author: {} | created: {} | depth: {} -->\n{}", agent, timestamp, depth, content)
|
|
}
|
|
|
|
/// Check if a link already exists between two keys.
|
|
fn has_edge(store: &Store, source: &str, target: &str) -> bool {
|
|
store.relations.iter().any(|r| {
|
|
!r.deleted
|
|
&& ((r.source_key == source && r.target_key == target)
|
|
|| (r.source_key == target && r.target_key == source))
|
|
})
|
|
}
|
|
|
|
pub fn apply_action(
|
|
store: &mut Store,
|
|
action: &Action,
|
|
agent: &str,
|
|
timestamp: &str,
|
|
depth: i32,
|
|
) -> bool {
|
|
match &action.kind {
|
|
ActionKind::WriteNode { key, content, .. } => {
|
|
let stamped = stamp_content(content, agent, timestamp, depth);
|
|
let prov = format!("{}:write", agent);
|
|
store.upsert_provenance(key, &stamped, &prov).is_ok()
|
|
}
|
|
ActionKind::Link { source, target } => {
|
|
if has_edge(store, source, target) {
|
|
return false;
|
|
}
|
|
let source_uuid = match store.nodes.get(source.as_str()) {
|
|
Some(n) => n.uuid,
|
|
None => return false,
|
|
};
|
|
let target_uuid = match store.nodes.get(target.as_str()) {
|
|
Some(n) => n.uuid,
|
|
None => return false,
|
|
};
|
|
let mut rel = new_relation(
|
|
source_uuid, target_uuid,
|
|
RelationType::Link,
|
|
0.3,
|
|
source, target,
|
|
);
|
|
rel.provenance = format!("{}:link", agent);
|
|
store.add_relation(rel).is_ok()
|
|
}
|
|
ActionKind::Refine { key, content } => {
|
|
let stamped = stamp_content(content, agent, timestamp, depth);
|
|
let prov = format!("{}:refine", agent);
|
|
store.upsert_provenance(key, &stamped, &prov).is_ok()
|
|
}
|
|
ActionKind::Demote { key } => {
|
|
if let Some(node) = store.nodes.get_mut(key) {
|
|
node.provenance = format!("{}:demote", agent);
|
|
node.weight = (node.weight * 0.5).max(0.05);
|
|
true
|
|
} else {
|
|
false
|
|
}
|
|
}
|
|
ActionKind::Delete { key } => {
|
|
store.delete_node(key).is_ok()
|
|
}
|
|
}
|
|
}
|
|
|
|
fn agent_provenance(agent: &str) -> String {
|
|
match agent {
|
|
"observation" => "agent:knowledge-observation".to_string(),
|
|
"extractor" | "pattern" => "agent:knowledge-pattern".to_string(),
|
|
"connector" => "agent:knowledge-connector".to_string(),
|
|
"challenger" => "agent:knowledge-challenger".to_string(),
|
|
_ => format!("agent:{}", agent),
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Naming resolution — called before creating any new node
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/// Resolution from the naming agent.
|
|
#[derive(Debug)]
|
|
pub enum NamingResolution {
|
|
/// Create with the proposed key (or a better one).
|
|
Create(String),
|
|
/// Merge content into an existing node instead.
|
|
MergeInto(String),
|
|
}
|
|
|
|
/// Find existing nodes that might conflict with a proposed new node.
|
|
/// Returns up to `limit` (key, content_preview) pairs.
|
|
fn find_conflicts(
|
|
store: &Store,
|
|
proposed_key: &str,
|
|
proposed_content: &str,
|
|
limit: usize,
|
|
) -> Vec<(String, String)> {
|
|
use std::collections::BTreeMap;
|
|
|
|
// Extract search terms from the key (split on separators) and first ~200 chars of content
|
|
let mut terms: BTreeMap<String, f64> = BTreeMap::new();
|
|
for part in proposed_key.split(|c: char| c == '-' || c == '_' || c == '#' || c == '.') {
|
|
let p = part.to_lowercase();
|
|
if p.len() >= 3 {
|
|
terms.insert(p, 1.0);
|
|
}
|
|
}
|
|
// Add a few content terms
|
|
let content_terms = crate::search::extract_query_terms(proposed_content, 5);
|
|
for term in content_terms.split_whitespace() {
|
|
terms.entry(term.to_string()).or_insert(0.5);
|
|
}
|
|
|
|
if terms.is_empty() {
|
|
return Vec::new();
|
|
}
|
|
|
|
// Use component matching to find related nodes
|
|
let (seeds, _) = crate::search::match_seeds_opts(&terms, store, true, false);
|
|
|
|
let mut results: Vec<(String, f64)> = seeds.into_iter()
|
|
.filter(|(k, _)| k != proposed_key)
|
|
.collect();
|
|
results.sort_by(|a, b| b.1.total_cmp(&a.1));
|
|
|
|
results.into_iter()
|
|
.take(limit)
|
|
.filter_map(|(key, _)| {
|
|
let node = store.nodes.get(key.as_str())?;
|
|
let preview: String = node.content.chars().take(200).collect();
|
|
Some((key, preview))
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
/// Format the naming prompt for a proposed node.
|
|
fn format_naming_prompt(
|
|
proposed_key: &str,
|
|
proposed_content: &str,
|
|
conflicts: &[(String, String)],
|
|
) -> String {
|
|
let conflict_section = if conflicts.is_empty() {
|
|
"(no existing nodes found with overlapping content)".to_string()
|
|
} else {
|
|
conflicts.iter()
|
|
.map(|(key, preview)| format!("### `{}`\n\n{}", key, preview))
|
|
.collect::<Vec<_>>()
|
|
.join("\n\n")
|
|
};
|
|
|
|
// Truncate content for the prompt (don't send huge nodes to Haiku)
|
|
let content_preview: String = proposed_content.chars().take(1000).collect();
|
|
|
|
format!(
|
|
"# Naming Agent — Node Key Resolution\n\n\
|
|
You are given a proposed new node (key + content) and a list of existing\n\
|
|
nodes that might overlap with it. Decide what to do:\n\n\
|
|
1. **CREATE** — the proposed key is good and there's no meaningful overlap.\n\
|
|
2. **RENAME** — the content is unique but the key is bad (UUID, truncated, generic).\n\
|
|
3. **MERGE_INTO** — an existing node already covers this content.\n\n\
|
|
Good keys: 2-5 words in kebab-case, optionally with `#` subtopic.\n\
|
|
Bad keys: UUIDs, single generic words, truncated auto-slugs.\n\n\
|
|
Respond with exactly ONE line: `CREATE key`, `RENAME better_key`, or `MERGE_INTO existing_key`.\n\n\
|
|
## Proposed node\n\n\
|
|
Key: `{}`\n\n\
|
|
Content:\n```\n{}\n```\n\n\
|
|
## Existing nodes that might overlap\n\n\
|
|
{}",
|
|
proposed_key, content_preview, conflict_section,
|
|
)
|
|
}
|
|
|
|
/// Parse naming agent response.
|
|
fn parse_naming_response(response: &str) -> Option<NamingResolution> {
|
|
for line in response.lines() {
|
|
// Strip backticks — Haiku sometimes wraps the response line in them
|
|
let trimmed = line.trim().trim_matches('`').trim();
|
|
if let Some(key) = trimmed.strip_prefix("CREATE ") {
|
|
return Some(NamingResolution::Create(key.trim().trim_matches('`').to_string()));
|
|
}
|
|
if let Some(key) = trimmed.strip_prefix("RENAME ") {
|
|
return Some(NamingResolution::Create(key.trim().trim_matches('`').to_string()));
|
|
}
|
|
if let Some(key) = trimmed.strip_prefix("MERGE_INTO ") {
|
|
return Some(NamingResolution::MergeInto(key.trim().trim_matches('`').to_string()));
|
|
}
|
|
}
|
|
None
|
|
}
|
|
|
|
/// Resolve naming for a proposed WriteNode action.
|
|
///
|
|
/// Searches for conflicts, calls the naming LLM (Haiku), and returns
|
|
/// either a Create (possibly with a better key) or MergeInto resolution.
|
|
/// On LLM failure, falls through to using the proposed key as-is.
|
|
pub fn resolve_naming(
|
|
store: &Store,
|
|
proposed_key: &str,
|
|
proposed_content: &str,
|
|
) -> NamingResolution {
|
|
let conflicts = find_conflicts(store, proposed_key, proposed_content, 5);
|
|
let prompt = format_naming_prompt(proposed_key, proposed_content, &conflicts);
|
|
|
|
match llm::call_haiku("naming", &prompt) {
|
|
Ok(response) => {
|
|
match parse_naming_response(&response) {
|
|
Some(resolution) => resolution,
|
|
None => {
|
|
eprintln!("naming: unparseable response, using proposed key");
|
|
NamingResolution::Create(proposed_key.to_string())
|
|
}
|
|
}
|
|
}
|
|
Err(e) => {
|
|
eprintln!("naming: LLM error ({}), using proposed key", e);
|
|
NamingResolution::Create(proposed_key.to_string())
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Shared agent execution
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/// Result of running a single agent through the common pipeline.
|
|
pub struct AgentResult {
|
|
pub output: String,
|
|
pub actions: Vec<Action>,
|
|
pub no_ops: usize,
|
|
pub node_keys: Vec<String>,
|
|
}
|
|
|
|
/// Resolve naming for all WriteNode actions in a list.
|
|
///
|
|
/// For each WriteNode, calls the naming agent to check for conflicts and
|
|
/// get a good key. May convert WriteNode → Refine (if MERGE_INTO) or
|
|
/// update the key (if RENAME/CREATE with different key).
|
|
pub fn resolve_action_names(store: &Store, actions: Vec<Action>) -> Vec<Action> {
|
|
actions.into_iter().map(|action| {
|
|
match &action.kind {
|
|
ActionKind::WriteNode { key, content, covers } => {
|
|
match resolve_naming(store, key, content) {
|
|
NamingResolution::Create(new_key) => {
|
|
if new_key == *key {
|
|
action // keep as-is
|
|
} else {
|
|
eprintln!("naming: {} → {}", key, new_key);
|
|
Action {
|
|
kind: ActionKind::WriteNode {
|
|
key: new_key,
|
|
content: content.clone(),
|
|
covers: covers.clone(),
|
|
},
|
|
..action
|
|
}
|
|
}
|
|
}
|
|
NamingResolution::MergeInto(existing_key) => {
|
|
eprintln!("naming: {} → MERGE_INTO {}", key, existing_key);
|
|
Action {
|
|
kind: ActionKind::Refine {
|
|
key: existing_key,
|
|
content: content.clone(),
|
|
},
|
|
..action
|
|
}
|
|
}
|
|
}
|
|
}
|
|
_ => action,
|
|
}
|
|
}).collect()
|
|
}
|
|
|
|
/// Run a single agent and apply its actions (no depth tracking).
|
|
///
|
|
/// Returns (total_actions, applied_count) or an error.
|
|
pub fn run_and_apply(
|
|
store: &mut Store,
|
|
agent_name: &str,
|
|
batch_size: usize,
|
|
llm_tag: &str,
|
|
) -> Result<(usize, usize), String> {
|
|
run_and_apply_with_log(store, agent_name, batch_size, llm_tag, &|_| {})
|
|
}
|
|
|
|
pub fn run_and_apply_with_log(
|
|
store: &mut Store,
|
|
agent_name: &str,
|
|
batch_size: usize,
|
|
llm_tag: &str,
|
|
log: &dyn Fn(&str),
|
|
) -> Result<(usize, usize), String> {
|
|
let result = run_one_agent(store, agent_name, batch_size, llm_tag, log)?;
|
|
let actions = resolve_action_names(store, result.actions);
|
|
let ts = store::compact_timestamp();
|
|
let mut applied = 0;
|
|
for action in &actions {
|
|
let desc = match &action.kind {
|
|
ActionKind::WriteNode { key, .. } => format!("WRITE {}", key),
|
|
ActionKind::Refine { key, .. } => format!("REFINE {}", key),
|
|
ActionKind::Link { source, target } => format!("LINK {} → {}", source, target),
|
|
ActionKind::Demote { key } => format!("DEMOTE {}", key),
|
|
ActionKind::Delete { key } => format!("DELETE {}", key),
|
|
};
|
|
if apply_action(store, action, agent_name, &ts, 0) {
|
|
log(&format!("applied: {}", desc));
|
|
applied += 1;
|
|
} else {
|
|
log(&format!("skipped: {}", desc));
|
|
}
|
|
}
|
|
Ok((actions.len(), applied))
|
|
}
|
|
|
|
/// Run a single agent: build prompt → call LLM → store output → parse actions → record visits.
|
|
///
|
|
/// This is the common pipeline shared by the knowledge loop, consolidation pipeline,
|
|
/// and daemon. Callers handle action application (with or without depth tracking).
|
|
pub fn run_one_agent(
|
|
store: &mut Store,
|
|
agent_name: &str,
|
|
batch_size: usize,
|
|
llm_tag: &str,
|
|
log: &dyn Fn(&str),
|
|
) -> Result<AgentResult, String> {
|
|
let def = super::defs::get_def(agent_name)
|
|
.ok_or_else(|| format!("no .agent file for {}", agent_name))?;
|
|
|
|
log("building prompt");
|
|
let agent_batch = super::defs::run_agent(store, &def, batch_size)?;
|
|
|
|
let prompt_kb = agent_batch.prompt.len() / 1024;
|
|
let tools_desc = if def.tools.is_empty() { "no tools".into() }
|
|
else { format!("{} tools", def.tools.len()) };
|
|
log(&format!("prompt {}KB, model={}, {}, {} nodes",
|
|
prompt_kb, def.model, tools_desc, agent_batch.node_keys.len()));
|
|
for key in &agent_batch.node_keys {
|
|
log(&format!(" node: {}", key));
|
|
}
|
|
|
|
log("calling LLM");
|
|
let output = llm::call_for_def(&def, &agent_batch.prompt)?;
|
|
|
|
let output_kb = output.len() / 1024;
|
|
log(&format!("response {}KB", output_kb));
|
|
|
|
// Store raw output for audit trail
|
|
let ts = store::compact_timestamp();
|
|
let report_key = format!("_{}-{}-{}", llm_tag, agent_name, ts);
|
|
let provenance = agent_provenance(agent_name);
|
|
store.upsert_provenance(&report_key, &output, &provenance).ok();
|
|
|
|
let actions = parse_all_actions(&output);
|
|
let no_ops = count_no_ops(&output);
|
|
|
|
log(&format!("parsed {} actions, {} no-ops", actions.len(), no_ops));
|
|
|
|
// Record visits for processed nodes
|
|
if !agent_batch.node_keys.is_empty() {
|
|
store.record_agent_visits(&agent_batch.node_keys, agent_name).ok();
|
|
}
|
|
|
|
Ok(AgentResult {
|
|
output,
|
|
actions,
|
|
no_ops,
|
|
node_keys: agent_batch.node_keys,
|
|
})
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Conversation fragment selection
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/// Extract human-readable dialogue from a conversation JSONL
|
|
const OBSERVED_PREFIX: &str = "_observed-transcripts";
|
|
|
|
/// Select conversation fragments (per-segment) for the observation extractor.
|
|
/// Skips segments already processed, marks selected segments as observed.
|
|
pub fn select_conversation_fragments(n: usize) -> Vec<(String, String)> {
|
|
let projects = crate::config::get().projects_dir.clone();
|
|
if !projects.exists() { return Vec::new(); }
|
|
|
|
let observed = super::enrich::keys_with_prefix(&format!("{}#", OBSERVED_PREFIX));
|
|
|
|
let mut jsonl_files: Vec<PathBuf> = Vec::new();
|
|
if let Ok(dirs) = fs::read_dir(&projects) {
|
|
for dir in dirs.filter_map(|e| e.ok()) {
|
|
if !dir.path().is_dir() { continue; }
|
|
if let Ok(files) = fs::read_dir(dir.path()) {
|
|
for f in files.filter_map(|e| e.ok()) {
|
|
let p = f.path();
|
|
if p.extension().map(|x| x == "jsonl").unwrap_or(false) {
|
|
if let Ok(meta) = p.metadata() {
|
|
if meta.len() > 50_000 {
|
|
jsonl_files.push(p);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Collect unmined segments across all transcripts, keeping the text
|
|
let mut candidates: Vec<(PathBuf, usize, String, String)> = Vec::new();
|
|
for path in &jsonl_files {
|
|
for (seg_idx, messages) in super::enrich::unmined_segments(path, OBSERVED_PREFIX, &observed) {
|
|
let text = format_segment(&messages, 8000);
|
|
if text.len() > 500 {
|
|
let session_id = path.file_stem()
|
|
.map(|s| s.to_string_lossy().to_string())
|
|
.unwrap_or_else(|| "unknown".into());
|
|
let id = format!("{}.{}", session_id, seg_idx);
|
|
candidates.push((path.clone(), seg_idx, id, text));
|
|
}
|
|
}
|
|
}
|
|
|
|
// Take up to n, mark them, and return the text
|
|
let selected: Vec<_> = candidates.into_iter().take(n).collect();
|
|
|
|
if !selected.is_empty() {
|
|
if let Ok(mut store) = crate::store::Store::load() {
|
|
for (path, seg_idx, _, _) in &selected {
|
|
super::enrich::mark_segment(
|
|
&mut store,
|
|
&path.to_string_lossy(),
|
|
OBSERVED_PREFIX,
|
|
*seg_idx,
|
|
"agent:knowledge-observation",
|
|
"observed",
|
|
);
|
|
}
|
|
let _ = store.save();
|
|
}
|
|
}
|
|
|
|
selected.into_iter()
|
|
.map(|(_, _, id, text)| (id, text))
|
|
.collect()
|
|
}
|
|
|
|
/// Format a segment's messages into readable text for the observation agent.
|
|
fn format_segment(messages: &[(usize, String, String, String)], max_chars: usize) -> String {
|
|
let cfg = crate::config::get();
|
|
let mut fragments = Vec::new();
|
|
let mut total = 0;
|
|
|
|
for (_, role, text, _) in messages {
|
|
let min_len = if role == "user" { 5 } else { 10 };
|
|
if text.len() <= min_len { continue; }
|
|
|
|
let name = if role == "user" { &cfg.user_name } else { &cfg.assistant_name };
|
|
fragments.push(format!("**{}:** {}", name, text));
|
|
total += text.len();
|
|
if total > max_chars { break; }
|
|
}
|
|
fragments.join("\n\n")
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Convergence metrics
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct CycleResult {
|
|
pub cycle: usize,
|
|
pub timestamp: String,
|
|
pub total_actions: usize,
|
|
pub total_applied: usize,
|
|
pub total_no_ops: usize,
|
|
pub depth_rejected: usize,
|
|
pub weighted_delta: f64,
|
|
pub graph_metrics_before: GraphMetrics,
|
|
pub graph_metrics_after: GraphMetrics,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
|
pub struct GraphMetrics {
|
|
pub nodes: usize,
|
|
pub edges: usize,
|
|
pub cc: f64,
|
|
pub sigma: f64,
|
|
pub communities: usize,
|
|
}
|
|
|
|
impl GraphMetrics {
|
|
pub fn from_graph(store: &Store, graph: &Graph) -> Self {
|
|
Self {
|
|
nodes: store.nodes.len(),
|
|
edges: graph.edge_count(),
|
|
cc: graph.avg_clustering_coefficient() as f64,
|
|
sigma: graph.small_world_sigma() as f64,
|
|
communities: graph.community_count(),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn metric_stability(history: &[CycleResult], key: &str, window: usize) -> f64 {
|
|
if history.len() < window { return f64::INFINITY; }
|
|
|
|
let values: Vec<f64> = history[history.len() - window..].iter()
|
|
.map(|h| match key {
|
|
"sigma" => h.graph_metrics_after.sigma,
|
|
"cc" => h.graph_metrics_after.cc,
|
|
"communities" => h.graph_metrics_after.communities as f64,
|
|
_ => 0.0,
|
|
})
|
|
.collect();
|
|
|
|
if values.len() < 2 { return f64::INFINITY; }
|
|
let mean = values.iter().sum::<f64>() / values.len() as f64;
|
|
if mean == 0.0 { return 0.0; }
|
|
let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
|
|
variance.sqrt() / mean.abs()
|
|
}
|
|
|
|
pub fn check_convergence(history: &[CycleResult], window: usize) -> bool {
|
|
if history.len() < window { return false; }
|
|
|
|
let sigma_cv = metric_stability(history, "sigma", window);
|
|
let cc_cv = metric_stability(history, "cc", window);
|
|
let comm_cv = metric_stability(history, "communities", window);
|
|
|
|
let recent = &history[history.len() - window..];
|
|
let avg_delta = recent.iter().map(|r| r.weighted_delta).sum::<f64>() / recent.len() as f64;
|
|
|
|
eprintln!("\n Convergence check (last {} cycles):", window);
|
|
eprintln!(" sigma CV: {:.4} (< 0.05?)", sigma_cv);
|
|
eprintln!(" CC CV: {:.4} (< 0.05?)", cc_cv);
|
|
eprintln!(" community CV: {:.4} (< 0.10?)", comm_cv);
|
|
eprintln!(" avg delta: {:.2} (< 1.00?)", avg_delta);
|
|
|
|
let structural = sigma_cv < 0.05 && cc_cv < 0.05 && comm_cv < 0.10;
|
|
let behavioral = avg_delta < 1.0;
|
|
|
|
if structural && behavioral {
|
|
eprintln!(" → CONVERGED");
|
|
true
|
|
} else {
|
|
false
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// The knowledge loop
|
|
// ---------------------------------------------------------------------------
|
|
|
|
pub struct KnowledgeLoopConfig {
|
|
pub max_cycles: usize,
|
|
pub batch_size: usize,
|
|
pub window: usize,
|
|
pub max_depth: i32,
|
|
pub confidence_base: f64,
|
|
}
|
|
|
|
impl Default for KnowledgeLoopConfig {
|
|
fn default() -> Self {
|
|
Self {
|
|
max_cycles: 20,
|
|
batch_size: 5,
|
|
window: 5,
|
|
max_depth: 4,
|
|
confidence_base: 0.3,
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn run_knowledge_loop(config: &KnowledgeLoopConfig) -> Result<Vec<CycleResult>, String> {
|
|
let mut store = Store::load()?;
|
|
let mut depth_db = DepthDb::load(&store);
|
|
let mut history = Vec::new();
|
|
|
|
eprintln!("Knowledge Loop — fixed-point iteration");
|
|
eprintln!(" max_cycles={} batch_size={}", config.max_cycles, config.batch_size);
|
|
eprintln!(" window={} max_depth={}", config.window, config.max_depth);
|
|
|
|
for cycle in 1..=config.max_cycles {
|
|
let result = run_cycle(cycle, config, &mut depth_db)?;
|
|
history.push(result);
|
|
|
|
if check_convergence(&history, config.window) {
|
|
eprintln!("\n CONVERGED after {} cycles", cycle);
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Save loop summary as a store node
|
|
if let Some(first) = history.first() {
|
|
let key = format!("_knowledge-loop-{}", first.timestamp);
|
|
if let Ok(json) = serde_json::to_string_pretty(&history) {
|
|
store = Store::load()?;
|
|
store.upsert_provenance(&key, &json,
|
|
"observation:write").ok();
|
|
depth_db.save(&mut store);
|
|
store.save()?;
|
|
}
|
|
}
|
|
|
|
Ok(history)
|
|
}
|
|
|
|
fn run_cycle(
|
|
cycle_num: usize,
|
|
config: &KnowledgeLoopConfig,
|
|
depth_db: &mut DepthDb,
|
|
) -> Result<CycleResult, String> {
|
|
let timestamp = store::compact_timestamp();
|
|
eprintln!("\n{}", "=".repeat(60));
|
|
eprintln!("CYCLE {} — {}", cycle_num, timestamp);
|
|
eprintln!("{}", "=".repeat(60));
|
|
|
|
let mut store = Store::load()?;
|
|
let graph = store.build_graph();
|
|
let metrics_before = GraphMetrics::from_graph(&store, &graph);
|
|
eprintln!(" Before: nodes={} edges={} cc={:.3} sigma={:.3}",
|
|
metrics_before.nodes, metrics_before.edges, metrics_before.cc, metrics_before.sigma);
|
|
|
|
let mut all_actions = Vec::new();
|
|
let mut all_no_ops = 0;
|
|
let mut depth_rejected = 0;
|
|
let mut total_applied = 0;
|
|
|
|
// Run each agent via .agent file dispatch
|
|
let agent_names = ["observation", "extractor", "connector", "challenger"];
|
|
|
|
for agent_name in &agent_names {
|
|
eprintln!("\n --- {} (n={}) ---", agent_name, config.batch_size);
|
|
|
|
let result = match run_one_agent(&mut store, agent_name, config.batch_size, "knowledge", &|msg| eprintln!(" {}", msg)) {
|
|
Ok(r) => r,
|
|
Err(e) => {
|
|
eprintln!(" ERROR: {}", e);
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let mut actions = result.actions;
|
|
all_no_ops += result.no_ops;
|
|
|
|
eprintln!(" Actions: {} No-ops: {}", actions.len(), result.no_ops);
|
|
|
|
let mut applied = 0;
|
|
for action in &mut actions {
|
|
let depth = compute_action_depth(depth_db, action, agent_name);
|
|
action.depth = depth;
|
|
|
|
match &action.kind {
|
|
ActionKind::WriteNode { key, covers, .. } => {
|
|
let conf_val = action.confidence.gate_value();
|
|
let req = required_confidence(depth, config.confidence_base);
|
|
|
|
let source_uses: Vec<u32> = covers.iter()
|
|
.filter_map(|k| store.nodes.get(k).map(|n| n.uses))
|
|
.collect();
|
|
let avg_uses = if source_uses.is_empty() { 0 }
|
|
else { source_uses.iter().sum::<u32>() / source_uses.len() as u32 };
|
|
let eff_conf = (conf_val + use_bonus(avg_uses)).min(1.0);
|
|
|
|
if eff_conf < req {
|
|
action.applied = Some(false);
|
|
action.rejected_reason = Some("depth_threshold".into());
|
|
depth_rejected += 1;
|
|
continue;
|
|
}
|
|
if depth > config.max_depth {
|
|
action.applied = Some(false);
|
|
action.rejected_reason = Some("max_depth".into());
|
|
depth_rejected += 1;
|
|
continue;
|
|
}
|
|
eprintln!(" WRITE {} depth={} conf={:.2} eff={:.2} req={:.2}",
|
|
key, depth, conf_val, eff_conf, req);
|
|
}
|
|
ActionKind::Link { source, target } => {
|
|
eprintln!(" LINK {} → {}", source, target);
|
|
}
|
|
ActionKind::Refine { key, .. } => {
|
|
eprintln!(" REFINE {} depth={}", key, depth);
|
|
}
|
|
ActionKind::Demote { key } => {
|
|
eprintln!(" DEMOTE {}", key);
|
|
}
|
|
ActionKind::Delete { key } => {
|
|
eprintln!(" DELETE {}", key);
|
|
}
|
|
}
|
|
|
|
if apply_action(&mut store, action, agent_name, ×tamp, depth) {
|
|
applied += 1;
|
|
action.applied = Some(true);
|
|
if let ActionKind::WriteNode { key, .. } | ActionKind::Refine { key, .. } = &action.kind {
|
|
depth_db.set(key.clone(), depth);
|
|
}
|
|
} else {
|
|
action.applied = Some(false);
|
|
}
|
|
}
|
|
|
|
eprintln!(" Applied: {}/{}", applied, actions.len());
|
|
total_applied += applied;
|
|
all_actions.extend(actions);
|
|
}
|
|
|
|
depth_db.save(&mut store);
|
|
|
|
// Recompute spectral at most once per hour — O(n³) is expensive at 14k+ nodes
|
|
if total_applied > 0 {
|
|
let stale = spectral::embedding_path()
|
|
.metadata()
|
|
.and_then(|m| m.modified())
|
|
.map(|t| t.elapsed().unwrap_or_default() > std::time::Duration::from_secs(3600))
|
|
.unwrap_or(true);
|
|
if stale {
|
|
eprintln!("\n Recomputing spectral embedding (>1h stale)...");
|
|
let graph = store.build_graph();
|
|
let result = spectral::decompose(&graph, 8);
|
|
let emb = spectral::to_embedding(&result);
|
|
spectral::save_embedding(&emb).ok();
|
|
}
|
|
}
|
|
|
|
let graph = store.build_graph();
|
|
let metrics_after = GraphMetrics::from_graph(&store, &graph);
|
|
let weighted_delta: f64 = all_actions.iter()
|
|
.filter(|a| a.applied == Some(true))
|
|
.map(|a| a.weight)
|
|
.sum();
|
|
|
|
eprintln!("\n CYCLE {} SUMMARY", cycle_num);
|
|
eprintln!(" Applied: {}/{} depth-rejected: {} no-ops: {}",
|
|
total_applied, all_actions.len(), depth_rejected, all_no_ops);
|
|
eprintln!(" Weighted delta: {:.2}", weighted_delta);
|
|
|
|
Ok(CycleResult {
|
|
cycle: cycle_num,
|
|
timestamp,
|
|
total_actions: all_actions.len(),
|
|
total_applied,
|
|
total_no_ops: all_no_ops,
|
|
depth_rejected,
|
|
weighted_delta,
|
|
graph_metrics_before: metrics_before,
|
|
graph_metrics_after: metrics_after,
|
|
})
|
|
}
|