agents: deduplicate timestamps, plan expansion, rename agent
- Add compact_timestamp() to store — replaces 5 copies of format_datetime(now_epoch()).replace([':', '-', 'T'], "") Also fixes missing seconds (format_datetime only had HH:MM). - Add ConsolidationPlan::to_agent_runs() — replaces identical plan-to-runs-list expansion in consolidate.rs and daemon.rs. - Port job_rename_agent to use run_one_agent — eliminates manual prompt building, LLM call, report storage, and visit recording that duplicated the shared pipeline. - Rename Confidence::weight()/value() to delta_weight()/gate_value() to clarify the distinction (delta metrics vs depth gating).
This commit is contained in:
parent
fe7f636ad3
commit
abab85d249
6 changed files with 49 additions and 82 deletions
|
|
@ -35,8 +35,7 @@ pub fn consolidate_full_with_progress(
|
|||
on_progress: &dyn Fn(&str),
|
||||
) -> Result<(), String> {
|
||||
let start = std::time::Instant::now();
|
||||
let log_key = format!("_consolidate-log-{}",
|
||||
store::format_datetime(store::now_epoch()).replace([':', '-', 'T'], ""));
|
||||
let log_key = format!("_consolidate-log-{}", store::compact_timestamp());
|
||||
let mut log_buf = String::new();
|
||||
|
||||
log_line(&mut log_buf, "=== CONSOLIDATE FULL ===");
|
||||
|
|
@ -64,25 +63,8 @@ pub fn consolidate_full_with_progress(
|
|||
let mut total_applied = 0usize;
|
||||
let mut total_actions = 0usize;
|
||||
|
||||
// Build the list of (agent_type, batch_size) runs
|
||||
let mut runs: Vec<(&str, usize)> = Vec::new();
|
||||
if plan.run_health {
|
||||
runs.push(("health", 0));
|
||||
}
|
||||
let batch_size = 5;
|
||||
for (name, count) in [
|
||||
("replay", plan.replay_count),
|
||||
("linker", plan.linker_count),
|
||||
("separator", plan.separator_count),
|
||||
("transfer", plan.transfer_count),
|
||||
] {
|
||||
let mut remaining = count;
|
||||
while remaining > 0 {
|
||||
let batch = remaining.min(batch_size);
|
||||
runs.push((name, batch));
|
||||
remaining -= batch;
|
||||
}
|
||||
}
|
||||
let runs = plan.to_agent_runs(batch_size);
|
||||
|
||||
for (agent_type, count) in &runs {
|
||||
agent_num += 1;
|
||||
|
|
@ -112,8 +94,7 @@ pub fn consolidate_full_with_progress(
|
|||
}
|
||||
};
|
||||
|
||||
let ts = store::format_datetime(store::now_epoch())
|
||||
.replace([':', '-', 'T'], "");
|
||||
let ts = store::compact_timestamp();
|
||||
let mut applied = 0;
|
||||
for action in &result.actions {
|
||||
if knowledge::apply_action(store, action, agent_type, &ts, 0) {
|
||||
|
|
|
|||
|
|
@ -133,8 +133,7 @@ fn job_consolidation_agent(
|
|||
ctx.log_line(&format!("running agent: {} (batch={})", agent, batch));
|
||||
let result = super::knowledge::run_one_agent(&mut store, &agent, batch, "consolidate")?;
|
||||
|
||||
let ts = crate::store::format_datetime(crate::store::now_epoch())
|
||||
.replace([':', '-', 'T'], "");
|
||||
let ts = crate::store::compact_timestamp();
|
||||
let mut applied = 0;
|
||||
for action in &result.actions {
|
||||
if super::knowledge::apply_action(&mut store, action, &agent, &ts, 0) {
|
||||
|
|
@ -157,34 +156,24 @@ fn job_rename_agent(
|
|||
let mut store = crate::store::Store::load()?;
|
||||
|
||||
let batch = if batch_size == 0 { 10 } else { batch_size };
|
||||
ctx.log_line(&format!("building prompt: rename (batch={})", batch));
|
||||
ctx.log_line(&format!("running rename agent (batch={})", batch));
|
||||
|
||||
let agent_batch = super::prompts::agent_prompt(&store, "rename", batch)?;
|
||||
ctx.log_line(&format!("prompt: {} chars ({} nodes), calling Sonnet",
|
||||
agent_batch.prompt.len(), agent_batch.node_keys.len()));
|
||||
let result = super::knowledge::run_one_agent(&mut store, "rename", batch, "consolidate")?;
|
||||
|
||||
let response = super::llm::call_sonnet("consolidate", &agent_batch.prompt)?;
|
||||
|
||||
// Parse RENAME actions directly from response
|
||||
// Parse RENAME actions from response (rename uses its own format, not WRITE_NODE/LINK/REFINE)
|
||||
let mut applied = 0;
|
||||
let mut skipped = 0;
|
||||
let mut successfully_renamed: Vec<String> = Vec::new();
|
||||
for line in response.lines() {
|
||||
for line in result.output.lines() {
|
||||
let trimmed = line.trim();
|
||||
if !trimmed.starts_with("RENAME ") { continue; }
|
||||
|
||||
let rest = &trimmed[7..];
|
||||
// Split on first space after the old key — tricky because keys contain spaces? No, they don't.
|
||||
// Keys are single tokens with hyphens/underscores/hashes.
|
||||
let parts: Vec<&str> = rest.splitn(2, ' ').collect();
|
||||
let parts: Vec<&str> = trimmed[7..].splitn(2, ' ').collect();
|
||||
if parts.len() != 2 { skipped += 1; continue; }
|
||||
|
||||
let old_key = parts[0].trim();
|
||||
let new_key = parts[1].trim();
|
||||
|
||||
if old_key.is_empty() || new_key.is_empty() { skipped += 1; continue; }
|
||||
|
||||
// Resolve old key (handles partial matches)
|
||||
let resolved = match store.resolve_key(old_key) {
|
||||
Ok(k) => k,
|
||||
Err(e) => {
|
||||
|
|
@ -194,7 +183,6 @@ fn job_rename_agent(
|
|||
}
|
||||
};
|
||||
|
||||
// Don't rename to something that already exists
|
||||
if store.nodes.contains_key(new_key) {
|
||||
ctx.log_line(&format!("skip: {} already exists", new_key));
|
||||
skipped += 1;
|
||||
|
|
@ -204,7 +192,6 @@ fn job_rename_agent(
|
|||
match store.rename_node(&resolved, new_key) {
|
||||
Ok(()) => {
|
||||
ctx.log_line(&format!("renamed: {} → {}", resolved, new_key));
|
||||
successfully_renamed.push(new_key.to_string());
|
||||
applied += 1;
|
||||
}
|
||||
Err(e) => {
|
||||
|
|
@ -218,20 +205,6 @@ fn job_rename_agent(
|
|||
store.save()?;
|
||||
}
|
||||
|
||||
// Record visits for successfully renamed nodes
|
||||
if !successfully_renamed.is_empty() {
|
||||
if let Err(e) = store.record_agent_visits(&successfully_renamed, "rename") {
|
||||
ctx.log_line(&format!("visit recording: {}", e));
|
||||
}
|
||||
}
|
||||
|
||||
// Also store the report for auditing
|
||||
let ts = crate::store::format_datetime(crate::store::now_epoch())
|
||||
.replace([':', '-', 'T'], "");
|
||||
let report_key = format!("_consolidation-rename-{}", ts);
|
||||
store.upsert_provenance(&report_key, &response,
|
||||
crate::store::Provenance::AgentConsolidate).ok();
|
||||
|
||||
ctx.log_line(&format!("done: {} applied, {} skipped", applied, skipped));
|
||||
Ok(())
|
||||
})
|
||||
|
|
@ -1107,25 +1080,7 @@ pub fn run_daemon() -> Result<(), String> {
|
|||
};
|
||||
|
||||
let batch_size = 5;
|
||||
|
||||
// Build the list of (agent_type, batch_size) runs
|
||||
let mut runs: Vec<(&str, usize)> = Vec::new();
|
||||
if plan.run_health {
|
||||
runs.push(("health", 0));
|
||||
}
|
||||
for (name, count) in [
|
||||
("replay", plan.replay_count),
|
||||
("linker", plan.linker_count),
|
||||
("separator", plan.separator_count),
|
||||
("transfer", plan.transfer_count),
|
||||
] {
|
||||
let mut remaining = count;
|
||||
while remaining > 0 {
|
||||
let batch = remaining.min(batch_size);
|
||||
runs.push((name, batch));
|
||||
remaining -= batch;
|
||||
}
|
||||
}
|
||||
let runs = plan.to_agent_runs(batch_size);
|
||||
|
||||
log_event("scheduler", "consolidation-plan",
|
||||
&format!("{} agents ({}r {}l {}s {}t)",
|
||||
|
|
|
|||
|
|
@ -59,7 +59,8 @@ pub enum Confidence {
|
|||
}
|
||||
|
||||
impl Confidence {
|
||||
fn weight(self) -> f64 {
|
||||
/// Weight for delta metrics — how much this action contributes to change measurement.
|
||||
fn delta_weight(self) -> f64 {
|
||||
match self {
|
||||
Self::High => 1.0,
|
||||
Self::Medium => 0.6,
|
||||
|
|
@ -67,7 +68,8 @@ impl Confidence {
|
|||
}
|
||||
}
|
||||
|
||||
fn value(self) -> f64 {
|
||||
/// Confidence value for depth gating — capped below 1.0 so even "high" must clear thresholds.
|
||||
fn gate_value(self) -> f64 {
|
||||
match self {
|
||||
Self::High => 0.9,
|
||||
Self::Medium => 0.6,
|
||||
|
|
@ -111,7 +113,7 @@ pub fn parse_write_nodes(text: &str) -> Vec<Action> {
|
|||
content = covers_re.replace(&content, "").trim().to_string();
|
||||
|
||||
Action {
|
||||
weight: confidence.weight(),
|
||||
weight: confidence.delta_weight(),
|
||||
kind: ActionKind::WriteNode { key, content, covers },
|
||||
confidence,
|
||||
depth: 0,
|
||||
|
|
@ -347,8 +349,7 @@ pub fn run_one_agent(
|
|||
let output = llm::call_sonnet(llm_tag, &agent_batch.prompt)?;
|
||||
|
||||
// Store raw output for audit trail
|
||||
let ts = store::format_datetime(store::now_epoch())
|
||||
.replace([':', '-', 'T'], "");
|
||||
let ts = store::compact_timestamp();
|
||||
let report_key = format!("_{}-{}-{}", llm_tag, agent_name, ts);
|
||||
let provenance = agent_provenance(agent_name);
|
||||
store.upsert_provenance(&report_key, &output, provenance).ok();
|
||||
|
|
@ -602,7 +603,7 @@ fn run_cycle(
|
|||
config: &KnowledgeLoopConfig,
|
||||
depth_db: &mut DepthDb,
|
||||
) -> Result<CycleResult, String> {
|
||||
let timestamp = chrono::Local::now().format("%Y%m%dT%H%M%S").to_string();
|
||||
let timestamp = store::compact_timestamp();
|
||||
eprintln!("\n{}", "=".repeat(60));
|
||||
eprintln!("CYCLE {} — {}", cycle_num, timestamp);
|
||||
eprintln!("{}", "=".repeat(60));
|
||||
|
|
@ -644,7 +645,7 @@ fn run_cycle(
|
|||
|
||||
match &action.kind {
|
||||
ActionKind::WriteNode { key, covers, .. } => {
|
||||
let conf_val = action.confidence.value();
|
||||
let conf_val = action.confidence.gate_value();
|
||||
let req = required_confidence(depth, config.confidence_base);
|
||||
|
||||
let source_uses: Vec<u32> = covers.iter()
|
||||
|
|
|
|||
|
|
@ -174,6 +174,30 @@ pub struct ConsolidationPlan {
|
|||
pub rationale: Vec<String>,
|
||||
}
|
||||
|
||||
impl ConsolidationPlan {
|
||||
/// Expand the plan into a flat list of (agent_name, batch_size) runs.
|
||||
pub fn to_agent_runs(&self, batch_size: usize) -> Vec<(&'static str, usize)> {
|
||||
let mut runs = Vec::new();
|
||||
if self.run_health {
|
||||
runs.push(("health", 0));
|
||||
}
|
||||
for (name, count) in [
|
||||
("replay", self.replay_count),
|
||||
("linker", self.linker_count),
|
||||
("separator", self.separator_count),
|
||||
("transfer", self.transfer_count),
|
||||
] {
|
||||
let mut remaining = count;
|
||||
while remaining > 0 {
|
||||
let batch = remaining.min(batch_size);
|
||||
runs.push((name, batch));
|
||||
remaining -= batch;
|
||||
}
|
||||
}
|
||||
runs
|
||||
}
|
||||
}
|
||||
|
||||
/// Analyze metrics and decide how much each agent needs to run.
|
||||
///
|
||||
/// This is the control loop: metrics → error signal → agent allocation.
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ mod ops;
|
|||
// Re-export everything callers need
|
||||
pub use types::{
|
||||
memory_dir, nodes_path,
|
||||
now_epoch, epoch_to_local, format_date, format_datetime, format_datetime_space, today,
|
||||
now_epoch, epoch_to_local, format_date, format_datetime, format_datetime_space, compact_timestamp, today,
|
||||
Node, Relation, NodeType, Provenance, RelationType,
|
||||
RetrievalEvent, Params, GapRecord, Store,
|
||||
new_node, new_relation,
|
||||
|
|
|
|||
|
|
@ -172,6 +172,12 @@ pub fn format_datetime_space(epoch: i64) -> String {
|
|||
format!("{:04}-{:02}-{:02} {:02}:{:02}", y, m, d, h, min)
|
||||
}
|
||||
|
||||
/// Compact timestamp for use in keys: "YYYYMMDDTHHMMss"
|
||||
pub fn compact_timestamp() -> String {
|
||||
let (y, m, d, h, min, s) = epoch_to_local(now_epoch());
|
||||
format!("{:04}{:02}{:02}T{:02}{:02}{:02}", y, m, d, h, min, s)
|
||||
}
|
||||
|
||||
pub fn today() -> String {
|
||||
format_date(now_epoch())
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue