consciousness/poc-memory/src/agents/daemon.rs
Kent Overstreet 6069efb7fc agents: always use API backend, remove tools field from .agent files
- Remove is_split special case in daemon — split now goes through
  job_consolidation_agent like all other agents
- call_for_def uses API whenever api_base_url is configured, regardless
  of tools field (was requiring non-empty tools to use API)
- Remove "tools" field from all .agent files — memory tools are always
  provided by the API layer, not configured per-agent
- Add prompt size guard: reject prompts over 800KB (~200K tokens) with
  clear error instead of hitting the model's context limit

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 14:26:39 -04:00

1935 lines
76 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// poc-memory daemon: background job orchestration for memory maintenance
//
// Replaces the fragile cron+shell approach with a single long-running process
// that owns all background memory work. Uses jobkit for worker pool, status
// tracking, retry, and cancellation.
//
// Architecture:
// - Scheduler task: runs every 60s, scans filesystem state, spawns jobs
// - Session watcher task: detects ended Claude sessions, triggers extraction
// - Jobs shell out to existing poc-memory subcommands (Phase 1)
// - Status written to daemon-status.json for `poc-memory daemon status`
//
// Phase 2 will inline job logic; Phase 3 integrates into poc-agent.
use jobkit::{Choir, ExecutionContext, TaskError, TaskInfo, TaskStatus};
use std::collections::{HashMap, HashSet};
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
const SESSION_STALE_SECS: u64 = 600; // 10 minutes
const SCHEDULER_INTERVAL: Duration = Duration::from_secs(60);
const HEALTH_INTERVAL: Duration = Duration::from_secs(3600);
fn log_path() -> PathBuf {
crate::config::get().data_dir.join("daemon.log")
}
// --- Logging ---
fn log_event(job: &str, event: &str, detail: &str) {
jobkit::daemon::event_log::log(&crate::config::get().data_dir, job, event, detail);
}
/// Public wrapper for logging from other agent modules.
pub fn log_event_pub(job: &str, event: &str, detail: &str) {
log_event(job, event, detail);
}
/// Verbose log — only written if verbose logging is enabled.
pub fn log_verbose(job: &str, event: &str, detail: &str) {
jobkit::daemon::event_log::verbose(&crate::config::get().data_dir, job, event, detail);
}
// --- Job functions (direct, no subprocess) ---
static DAEMON_POOL: std::sync::OnceLock<Arc<jobkit::ResourcePool>> = std::sync::OnceLock::new();
/// Run a named job with logging, progress reporting, and error mapping.
fn run_job(ctx: &ExecutionContext, name: &str, f: impl FnOnce() -> Result<(), String>) -> Result<(), TaskError> {
let pool = DAEMON_POOL.get().map(|p| p.as_ref());
jobkit::daemon::Daemon::run_job(&crate::config::get().data_dir, ctx, name, pool, f)
}
// experience_mine and fact_mine removed — observation.agent handles all transcript mining
/// Run an agent targeted at a specific node key.
fn job_targeted_agent(
ctx: &ExecutionContext,
agent_type: &str,
target_key: &str,
) -> Result<(), TaskError> {
let agent = agent_type.to_string();
let key = target_key.to_string();
let job_name = format!("c-{}-target({})", agent, key);
run_job(ctx, &job_name, || {
let mut store = crate::store::Store::load()?;
ctx.log_line(&format!("targeting: {}", key));
let job = job_name.clone();
let log = |msg: &str| {
ctx.log_line(msg);
log_event(&job, "progress", msg);
};
super::knowledge::run_one_agent_with_keys(
&mut store, &agent, &[key.clone()], 5, "daemon", &log, false,
)?;
ctx.log_line("done");
Ok(())
})
}
/// Run a single consolidation agent (replay, linker, separator, transfer, health).
/// Shared set of node keys currently being processed by agents.
/// Prevents concurrent agents from working on overlapping graph regions.
type InFlightNodes = Arc<Mutex<std::collections::HashSet<String>>>;
fn job_consolidation_agent(
ctx: &ExecutionContext,
agent_type: &str,
batch_size: usize,
in_flight: &InFlightNodes,
) -> Result<(), TaskError> {
let agent = agent_type.to_string();
let batch = batch_size;
let job_name = format!("c-{}", agent);
let job_name2 = job_name.clone();
let in_flight = Arc::clone(in_flight);
run_job(ctx, &job_name, || {
ctx.log_line("loading store");
let mut store = crate::store::Store::load()?;
// Claim seeds: lock in-flight set, run query excluding it,
// add selected seeds + strongly-connected neighbors, then unlock.
let mut claimed_keys: Vec<String>;
let graph = store.build_graph();
{
let mut locked = in_flight.lock().unwrap();
ctx.log_line(&format!("running agent: {} (batch={}, {} in-flight)",
agent, batch, locked.len()));
// Run the agent's query, filtering out in-flight nodes
let def = super::defs::get_def(&agent)
.ok_or_else(|| format!("no .agent file for {}", agent))?;
let query = &def.query;
let keys = if !query.is_empty() {
use crate::query::engine as search;
let mut stages = search::Stage::parse_pipeline(query)?;
let padded = batch + locked.len().min(100);
if !stages.iter().any(|s| matches!(s, search::Stage::Transform(search::Transform::Limit(_)))) {
stages.push(search::Stage::Transform(search::Transform::Limit(padded)));
}
let results = search::run_query(&stages, vec![], &graph, &store, false, padded);
results.into_iter()
.map(|(k, _)| k)
.filter(|k| !locked.contains(k))
.take(batch)
.collect::<Vec<_>>()
} else {
vec![]
};
if keys.is_empty() {
return Err("query returned no results (after exclusion)".into());
}
// Claim seeds + strongly-connected neighbors.
// Only exclude neighbors with score > threshold to avoid
// blacking out the graph via high-degree hub nodes.
claimed_keys = Vec::with_capacity(batch * 20);
for key in &keys {
claimed_keys.push(key.clone());
locked.insert(key.clone());
for (nbr, strength) in graph.neighbors(key) {
let weight = store.nodes.get(nbr.as_str())
.map(|n| n.weight).unwrap_or(0.1);
if strength * weight > 0.15 {
claimed_keys.push(nbr.clone());
locked.insert(nbr.clone());
}
}
}
}
// in_flight lock released — run LLM without holding it
let log = |msg: &str| {
ctx.log_line(msg);
log_event(&job_name2, "progress", msg);
};
// Use run_one_agent_with_keys — we already selected seeds above,
// no need to re-run the query.
let result = super::knowledge::run_one_agent_with_keys(
&mut store, &agent, &claimed_keys, batch, "consolidate", &log, false,
).map(|_| ());
// Release all claimed keys (seeds + neighbors)
{
let mut locked = in_flight.lock().unwrap();
for key in &claimed_keys {
locked.remove(key);
}
}
result?;
ctx.log_line("done");
Ok(())
})
}
/// Run the rename agent: generates renames via LLM, applies them directly.
fn job_rename_agent(
ctx: &ExecutionContext,
batch_size: usize,
) -> Result<(), TaskError> {
run_job(ctx, "c-rename", || {
ctx.log_line("loading store");
let mut store = crate::store::Store::load()?;
let batch = if batch_size == 0 { 10 } else { batch_size };
ctx.log_line(&format!("running rename agent (batch={})", batch));
let log = |msg: &str| ctx.log_line(msg);
let result = super::knowledge::run_one_agent(&mut store, "rename", batch, "consolidate", &log, false)?;
// Parse RENAME actions from response (rename uses its own format, not WRITE_NODE/LINK/REFINE)
let mut applied = 0;
let mut skipped = 0;
for line in result.output.lines() {
let trimmed = line.trim();
if !trimmed.starts_with("RENAME ") { continue; }
let parts: Vec<&str> = trimmed[7..].splitn(2, ' ').collect();
if parts.len() != 2 { skipped += 1; continue; }
let old_key = parts[0].trim();
let new_key = parts[1].trim();
if old_key.is_empty() || new_key.is_empty() { skipped += 1; continue; }
let resolved = match store.resolve_key(old_key) {
Ok(k) => k,
Err(e) => {
ctx.log_line(&format!("skip: {}{}: {}", old_key, new_key, e));
skipped += 1;
continue;
}
};
if store.nodes.contains_key(new_key) {
ctx.log_line(&format!("skip: {} already exists", new_key));
skipped += 1;
continue;
}
match store.rename_node(&resolved, new_key) {
Ok(()) => {
ctx.log_line(&format!("renamed: {}{}", resolved, new_key));
applied += 1;
}
Err(e) => {
ctx.log_line(&format!("error: {}{}: {}", resolved, new_key, e));
skipped += 1;
}
}
}
if applied > 0 {
store.save()?;
}
ctx.log_line(&format!("done: {} applied, {} skipped", applied, skipped));
Ok(())
})
}
/// Run the split agent: two-phase decomposition of large nodes.
///
/// Phase 1: Send node + neighbors to LLM, get back a JSON split plan
/// (child keys, descriptions, section hints).
/// Phase 2: For each child, send parent content + child description to LLM,
/// get back the extracted/reorganized content for that child.
///
/// This handles arbitrarily large nodes because the output of each phase 2
/// call is proportional to one child, not the whole parent.
/// Split a single node by key. Called as an independent task so multiple
/// splits can run in parallel. Each task loads the store fresh, checks the
/// node still exists and hasn't been split, does the LLM work, then saves.
fn job_split_one(
ctx: &ExecutionContext,
parent_key: String,
) -> Result<(), TaskError> {
run_job(ctx, "c-split", || {
ctx.log_line(&format!("loading store for {}", parent_key));
let mut store = crate::store::Store::load()?;
// Check node still exists and hasn't been deleted/split already
let content_len = match store.nodes.get(parent_key.as_str()) {
Some(n) if !n.deleted => n.content.len(),
_ => {
ctx.log_line(&format!("skip: {} no longer exists or deleted", parent_key));
return Ok(());
}
};
ctx.log_line(&format!("--- splitting: {} ({} chars)", parent_key, content_len));
// Phase 1: get split plan
let plan_prompt = super::prompts::split_plan_prompt(&store, &parent_key)?;
ctx.log_line(&format!("phase 1: plan prompt {} chars", plan_prompt.len()));
let plan_response = super::llm::call_sonnet("split-plan", &plan_prompt)?;
let plan = match super::llm::parse_json_response(&plan_response) {
Ok(v) => v,
Err(e) => {
ctx.log_line(&format!("phase 1 parse error: {}", e));
return Ok(());
}
};
let action = plan.get("action").and_then(|v| v.as_str()).unwrap_or("");
if action == "keep" {
let reason = plan.get("reason").and_then(|v| v.as_str()).unwrap_or("");
ctx.log_line(&format!("keep: {} ({})", parent_key, reason));
return Ok(());
}
if action != "split" {
ctx.log_line(&format!("unexpected action: {}", action));
return Ok(());
}
let children_plan = match plan.get("children").and_then(|v| v.as_array()) {
Some(c) if c.len() >= 2 => c,
_ => {
ctx.log_line("plan has fewer than 2 children, skipping");
return Ok(());
}
};
ctx.log_line(&format!("phase 1: {} children planned", children_plan.len()));
for child in children_plan {
let key = child.get("key").and_then(|v| v.as_str()).unwrap_or("?");
let desc = child.get("description").and_then(|v| v.as_str()).unwrap_or("");
ctx.log_line(&format!(" planned: {}{}", key, desc));
}
// Phase 2: extract content for each child
let mut children: Vec<(String, String)> = Vec::new();
// Collect neighbor assignments from plan: child_key -> [neighbor_keys]
let mut neighbor_map: HashMap<String, Vec<String>> = HashMap::new();
for child_plan in children_plan {
let child_key = match child_plan.get("key").and_then(|v| v.as_str()) {
Some(k) => k.to_string(),
None => continue,
};
let child_desc = child_plan.get("description")
.and_then(|v| v.as_str()).unwrap_or("");
let child_sections = child_plan.get("sections")
.and_then(|v| v.as_array())
.map(|arr| arr.iter()
.filter_map(|v| v.as_str())
.collect::<Vec<_>>()
.join(", "))
.unwrap_or_default();
let child_neighbors: Vec<String> = child_plan.get("neighbors")
.and_then(|v| v.as_array())
.map(|arr| arr.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect())
.unwrap_or_default();
neighbor_map.insert(child_key.clone(), child_neighbors);
ctx.log_line(&format!("phase 2: extracting {}", child_key));
let extract_prompt = super::prompts::split_extract_prompt(
&store, &parent_key, &child_key, child_desc, &child_sections)?;
ctx.log_line(&format!(" extract prompt: {} chars", extract_prompt.len()));
let content = match super::llm::call_sonnet("split-extract", &extract_prompt) {
Ok(c) => c,
Err(e) => {
ctx.log_line(&format!(" extract error: {}", e));
continue;
}
};
ctx.log_line(&format!(" extracted: {} chars", content.len()));
children.push((child_key, content));
}
if children.len() < 2 {
ctx.log_line(&format!("only {} children extracted, skipping", children.len()));
return Ok(());
}
// Reload store before mutations — another split may have saved meanwhile
store = crate::store::Store::load()?;
// Re-check parent still exists after reload
if !store.nodes.contains_key(parent_key.as_str()) ||
store.nodes.get(parent_key.as_str()).map_or(true, |n| n.deleted) {
ctx.log_line(&format!("skip: {} was split by another task", parent_key));
return Ok(());
}
// Collect parent's edges before modifications
let parent_edges: Vec<_> = store.relations.iter()
.filter(|r| !r.deleted && (r.source_key == *parent_key || r.target_key == *parent_key))
.cloned()
.collect();
// Create child nodes
let mut child_uuids: Vec<([u8; 16], String)> = Vec::new();
for (child_key, content) in &children {
if store.nodes.contains_key(child_key.as_str()) {
ctx.log_line(&format!(" skip: {} already exists", child_key));
continue;
}
store.upsert_provenance(child_key, content,
"consolidate:write")?;
let uuid = store.nodes.get(child_key.as_str()).unwrap().uuid;
child_uuids.push((uuid, child_key.clone()));
ctx.log_line(&format!(" created: {} ({} chars)", child_key, content.len()));
}
// Inherit edges using agent's neighbor assignments from the plan
for (child_uuid, child_key) in &child_uuids {
let neighbors = match neighbor_map.get(child_key) {
Some(n) => n,
None => continue,
};
for neighbor_key in neighbors {
// Find the parent edge for this neighbor to inherit its strength
let parent_edge = parent_edges.iter().find(|r| {
r.source_key == *neighbor_key || r.target_key == *neighbor_key
});
let strength = parent_edge.map(|e| e.strength).unwrap_or(0.3);
let neighbor_uuid = match store.nodes.get(neighbor_key.as_str()) {
Some(n) => n.uuid,
None => continue,
};
let rel = crate::store::new_relation(
*child_uuid, neighbor_uuid,
crate::store::RelationType::Auto, strength,
child_key, neighbor_key,
);
store.add_relation(rel).ok();
}
}
// Link siblings
for i in 0..child_uuids.len() {
for j in (i+1)..child_uuids.len() {
let rel = crate::store::new_relation(
child_uuids[i].0, child_uuids[j].0,
crate::store::RelationType::Auto, 0.5,
&child_uuids[i].1, &child_uuids[j].1,
);
store.add_relation(rel).ok();
}
}
// Tombstone parent
if let Some(parent) = store.nodes.get_mut(parent_key.as_str()) {
parent.deleted = true;
parent.version += 1;
let tombstone = parent.clone();
store.append_nodes(std::slice::from_ref(&tombstone)).ok();
}
store.nodes.remove(parent_key.as_str());
ctx.log_line(&format!("split complete: {}{} children", parent_key, child_uuids.len()));
store.save()?;
Ok(())
})
}
/// Link orphan nodes (CPU-heavy, no LLM).
fn job_link_orphans(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "c-orphans", || {
ctx.log_line("loading store");
let mut store = crate::store::Store::load()?;
ctx.log_line("linking orphans");
let (orphans, added) = crate::neuro::link_orphans(&mut store, 2, 3, 0.15);
ctx.log_line(&format!("{} orphans, {} links added", orphans, added));
Ok(())
})
}
/// Cap node degree to prevent mega-hubs.
fn job_cap_degree(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "c-cap", || {
ctx.log_line("loading store");
let mut store = crate::store::Store::load()?;
ctx.log_line("capping degree");
match store.cap_degree(50) {
Ok((hubs, pruned)) => {
store.save()?;
ctx.log_line(&format!("{} hubs capped, {} edges pruned", hubs, pruned));
Ok(())
}
Err(e) => Err(e),
}
})
}
/// Apply links extracted from digests.
fn job_digest_links(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "c-digest-links", || {
ctx.log_line("loading store");
let mut store = crate::store::Store::load()?;
ctx.log_line("applying digest links");
let links = super::digest::parse_all_digest_links(&store);
let (applied, skipped, fallbacks) = super::digest::apply_digest_links(&mut store, &links);
store.save()?;
ctx.log_line(&format!("{} applied, {} skipped, {} fallbacks", applied, skipped, fallbacks));
Ok(())
})
}
fn job_knowledge_loop(_ctx: &ExecutionContext) -> Result<(), TaskError> {
// Knowledge loop removed — agents now use tool calls directly.
// Consolidation is handled by consolidate_full() in the consolidate job.
Ok(())
}
fn job_digest(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "digest", || {
ctx.log_line("loading store");
let mut store = crate::store::Store::load()?;
ctx.log_line("generating digests");
crate::digest::digest_auto(&mut store)
})
}
fn job_daily_check(
ctx: &ExecutionContext,
graph_health: &Arc<Mutex<Option<GraphHealth>>>,
) -> Result<(), TaskError> {
let gh = Arc::clone(graph_health);
run_job(ctx, "daily-check", || {
ctx.log_line("loading store");
let store = crate::store::Store::load()?;
ctx.log_line("checking health");
let _report = crate::neuro::daily_check(&store);
// Decay search hit counters (10% daily decay)
ctx.log_line("decaying search counters");
match crate::counters::decay_all(0.9) {
Ok(removed) => ctx.log_line(&format!("decayed counters, removed {}", removed)),
Err(e) => ctx.log_line(&format!("counter decay failed: {}", e)),
}
// Compute graph health metrics for status display
ctx.log_line("computing graph health");
let health = compute_graph_health(&store);
*gh.lock().unwrap() = Some(health);
Ok(())
})
}
fn compute_graph_health(store: &crate::store::Store) -> GraphHealth {
let graph = store.build_graph();
let snap = crate::graph::current_metrics(&graph);
let episodic_count = store.nodes.iter()
.filter(|(_, n)| matches!(n.node_type, crate::store::NodeType::EpisodicSession))
.count();
let episodic_ratio = if store.nodes.is_empty() { 0.0 }
else { episodic_count as f32 / store.nodes.len() as f32 };
// Use the same planning logic as consolidation (skip O(n²) interference)
let plan = crate::neuro::consolidation_plan_quick(store);
GraphHealth {
nodes: snap.nodes,
edges: snap.edges,
communities: snap.communities,
alpha: snap.alpha,
gini: snap.gini,
avg_cc: snap.avg_cc,
sigma: snap.sigma,
episodic_ratio,
interference: 0,
plan_counts: plan.counts,
plan_rationale: plan.rationale,
computed_at: crate::store::format_datetime_space(crate::store::now_epoch()),
}
}
// --- Session detection ---
/// Find JSONL session files that are stale (not recently written) and not
/// held open by any process.
/// Find JSONL session files that haven't been written to recently.
/// Only checks metadata (stat), no file reads or subprocess calls.
/// The fuser check (is file open?) is deferred to the reconcile loop,
/// only for sessions that pass the mined-key filter.
/// Minimum file size for a session to be worth mining.
/// Daemon-spawned LLM calls are ~55KB/5 lines; real interactive
/// sessions are much larger. Skip anything too small to contain
/// meaningful conversation.
const MIN_SESSION_BYTES: u64 = 100_000;
fn find_stale_sessions() -> Vec<PathBuf> {
let projects = crate::config::get().projects_dir.clone();
if !projects.exists() {
return Vec::new();
}
let mut stale = Vec::new();
let now = SystemTime::now();
let Ok(dirs) = fs::read_dir(&projects) else { return stale };
for dir_entry in dirs.filter_map(|e| e.ok()) {
if !dir_entry.path().is_dir() { continue; }
let Ok(files) = fs::read_dir(dir_entry.path()) else { continue };
for f in files.filter_map(|e| e.ok()) {
let path = f.path();
if path.extension().map(|x| x == "jsonl").unwrap_or(false) {
if let Ok(meta) = path.metadata() {
// Skip tiny sessions (daemon-spawned LLM calls, aborted sessions)
if meta.len() < MIN_SESSION_BYTES { continue; }
if let Ok(mtime) = meta.modified() {
let age = now.duration_since(mtime).unwrap_or_default();
if age.as_secs() >= SESSION_STALE_SECS {
stale.push(path);
}
}
}
}
}
}
stale
}
/// Check if any other process has a file open by scanning /proc/*/fd/.
/// This is what `fuser` does internally, without the subprocess overhead.
fn is_file_open(path: &Path) -> bool {
let Ok(target) = path.canonicalize() else { return false };
let Ok(procs) = fs::read_dir("/proc") else { return false };
let my_pid = std::process::id().to_string();
for proc_entry in procs.filter_map(|e| e.ok()) {
let name = proc_entry.file_name();
let name = name.to_string_lossy();
if !name.chars().all(|c| c.is_ascii_digit()) { continue; }
if *name == my_pid { continue; }
let fd_dir = proc_entry.path().join("fd");
let Ok(fds) = fs::read_dir(&fd_dir) else { continue };
for fd in fds.filter_map(|e| e.ok()) {
if let Ok(link) = fs::read_link(fd.path()) {
if link == target { return true; }
}
}
}
false
}
/// Get process uptime as human-readable string by reading /proc/<pid>/stat.
fn proc_uptime(pid: u32) -> Option<String> {
// /proc/<pid>/stat field 22 (1-indexed) is start time in clock ticks
let stat = fs::read_to_string(format!("/proc/{}/stat", pid)).ok()?;
// Fields after comm (which may contain spaces/parens) — find closing paren
let after_comm = stat.get(stat.rfind(')')? + 2..)?;
let fields: Vec<&str> = after_comm.split_whitespace().collect();
// Field 22 in stat is index 19 after comm (fields[0] = state, field 22 = starttime = index 19)
let start_ticks: u64 = fields.get(19)?.parse().ok()?;
let ticks_per_sec = unsafe { libc::sysconf(libc::_SC_CLK_TCK) } as u64;
let boot_time_secs = {
let uptime = fs::read_to_string("/proc/uptime").ok()?;
let sys_uptime: f64 = uptime.split_whitespace().next()?.parse().ok()?;
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).ok()?.as_secs();
now - sys_uptime as u64
};
let start_secs = boot_time_secs + start_ticks / ticks_per_sec;
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).ok()?.as_secs();
let uptime = now.saturating_sub(start_secs);
Some(format_duration_human(uptime as u128 * 1000))
}
// --- Status writing ---
fn write_status(
choir: &Choir,
last_daily: Option<chrono::NaiveDate>,
graph_health: &Arc<Mutex<Option<GraphHealth>>>,
) {
let status = build_status(choir, last_daily, graph_health);
jobkit::daemon::status::write(&crate::config::get().data_dir, &status);
}
#[derive(Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct GraphHealth {
pub nodes: usize,
pub edges: usize,
pub communities: usize,
pub alpha: f32, // power-law exponent (target ≥2.5)
pub gini: f32, // degree inequality (target ≤0.4)
pub avg_cc: f32, // clustering coefficient (target ≥0.2)
pub sigma: f32, // small-world sigma
pub episodic_ratio: f32, // episodic/total nodes (target <0.4)
pub interference: usize, // interfering pairs (target <50)
// Consolidation work estimate from plan
#[serde(default)]
pub plan_counts: std::collections::HashMap<String, usize>,
pub plan_rationale: Vec<String>,
pub computed_at: String,
}
#[derive(serde::Serialize, serde::Deserialize)]
struct DaemonStatus {
pid: u32,
tasks: Vec<TaskInfo>,
#[serde(default)]
last_daily: Option<String>,
#[serde(default)]
graph_health: Option<GraphHealth>,
}
// --- The daemon ---
pub fn run_daemon() -> Result<(), String> {
let config = crate::config::get();
let mut daemon = jobkit::daemon::Daemon::new(jobkit::daemon::DaemonConfig {
data_dir: config.data_dir.clone(),
resource_slots: config.llm_concurrency,
resource_name: "llm".to_string(),
extra_workers: 3,
});
let choir = Arc::clone(&daemon.choir);
let llm = Arc::clone(&daemon.resource);
let _ = DAEMON_POOL.set(Arc::clone(&llm));
let task_log_dir = config.data_dir.join("logs");
let _ = fs::create_dir_all(&task_log_dir);
// Enable verbose logging if POC_MEMORY_VERBOSE is set
if std::env::var("POC_MEMORY_VERBOSE").is_ok() {
jobkit::daemon::event_log::set_level(jobkit::daemon::event_log::LogLevel::Verbose);
}
// Recover last_daily from previous status file
let last_daily: Arc<Mutex<Option<chrono::NaiveDate>>> = Arc::new(Mutex::new(
jobkit::daemon::status::load::<DaemonStatus>(&config.data_dir)
.and_then(|s| s.last_daily)
.and_then(|d| d.parse().ok())
));
let graph_health: Arc<Mutex<Option<GraphHealth>>> = Arc::new(Mutex::new(None));
// Nodes currently being processed by agents — prevents concurrent
// agents from working on overlapping graph regions.
let in_flight: InFlightNodes = Arc::new(Mutex::new(std::collections::HashSet::new()));
log_event("daemon", "started", &format!("pid {}", std::process::id()));
eprintln!("poc-memory daemon started (pid {})", std::process::id());
// Write initial status
write_status(&choir, *last_daily.lock().unwrap(), &graph_health);
// Session watcher: reconcile-based extraction
// Each tick: scan filesystem for stale sessions, check store for what's
// already mined, check task registry for what's in-flight, spawn the diff.
// No persistent tracking state — the store is the source of truth.
let choir_sw = Arc::clone(&choir);
let _llm_sw = Arc::clone(&llm); // kept for future use
let last_daily_sw = Arc::clone(&last_daily);
let graph_health_sw = Arc::clone(&graph_health);
choir.spawn("session-watcher").init(move |ctx| {
ctx.set_progress("idle");
// Cache: path → (file_size, segment_count). Invalidated when size changes.
let mut seg_cache: HashMap<String, (u64, usize)> = HashMap::new();
// Retry backoff: filename → (next_retry_after, current_backoff).
// Exponential from 5min, cap 30min. Resets on daemon restart.
let mut retry_backoff: HashMap<String, (std::time::Instant, Duration)> = HashMap::new();
const BACKOFF_INITIAL: Duration = Duration::from_secs(300); // 5 min
const BACKOFF_MAX: Duration = Duration::from_secs(1800); // 30 min
loop {
if ctx.is_cancelled() {
return Err(TaskError::Fatal("cancelled".into()));
}
ctx.set_progress("scanning");
// Check for failed tasks and update backoff.
// Task names are "extract:{filename}.{segment}" — extract the
// filename (UUID.jsonl) by stripping the trailing .N segment suffix.
let task_statuses = choir_sw.task_statuses();
for t in &task_statuses {
if let Some(label) = t.name.strip_prefix("extract:") {
// label is "UUID.jsonl.N" — strip last ".N" to get filename
let filename = match label.rfind('.') {
Some(pos) if label[pos+1..].chars().all(|c| c.is_ascii_digit()) => {
&label[..pos]
}
_ => label,
};
match t.status {
TaskStatus::Failed => {
let entry = retry_backoff.entry(filename.to_string())
.or_insert((std::time::Instant::now(), BACKOFF_INITIAL));
entry.1 = (entry.1 * 2).min(BACKOFF_MAX);
entry.0 = std::time::Instant::now() + entry.1;
}
TaskStatus::Completed => {
retry_backoff.remove(filename);
}
_ => {}
}
}
}
// What's currently running/pending? (avoid spawning duplicates)
let active: HashSet<String> = task_statuses.iter()
.filter(|t| !t.status.is_finished())
.map(|t| t.name.clone())
.collect();
let stale = find_stale_sessions();
// Load mined transcript keys once for this tick
let mined = std::collections::HashSet::<String>::new(); // mining removed
// MAX_NEW_PER_TICK removed — mining handled by observation agent
// Load fact-mined keys too
let fact_keys: HashSet<String> = {
use crate::store::StoreView;
let view = crate::store::AnyView::load().ok();
view.map(|v| {
let mut keys = HashSet::new();
v.for_each_node(|key, _, _| {
if key.starts_with("_facts-") {
keys.insert(key.to_string());
}
});
keys
}).unwrap_or_default()
};
let _extract_queued = 0usize;
let mut _extract_remaining = 0usize;
let mut _fact_remaining = 0usize;
let mut already_mined = 0;
let mut still_open = 0;
let mut backed_off = 0;
let total_stale = stale.len();
// Sessions with old whole-file keys that need per-segment migration
let mut migrate_keys: Vec<(String, String, usize)> = Vec::new();
let mut needs_extract: Vec<(String, String, Option<usize>)> = Vec::new();
let mut needs_fact: Vec<(String, String)> = Vec::new();
let now = std::time::Instant::now();
for session in stale {
let filename = session.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| "unknown".into());
let path_str = session.to_string_lossy().to_string();
// Check retry backoff before doing any work
if let Some((next_retry, _)) = retry_backoff.get(&filename) {
if now < *next_retry {
backed_off += 1;
continue;
}
}
if is_file_open(&session) {
still_open += 1;
continue;
}
// Get file size for cache invalidation
let file_size = fs::metadata(&session).map(|m| m.len()).unwrap_or(0);
// Get segment count, using cache with size-based invalidation
let seg_count = if let Some(&(cached_size, cached_count)) = seg_cache.get(&path_str) {
if cached_size == file_size {
cached_count
} else {
// File changed — re-parse
let messages = match super::enrich::extract_conversation(&path_str) {
Ok(m) => m,
Err(_) => continue,
};
let count = super::enrich::split_on_compaction(messages).len();
seg_cache.insert(path_str.clone(), (file_size, count));
count
}
} else {
let messages = match super::enrich::extract_conversation(&path_str) {
Ok(m) => m,
Err(_) => continue,
};
let count = super::enrich::split_on_compaction(messages).len();
seg_cache.insert(path_str.clone(), (file_size, count));
count
};
// No extractable messages — skip entirely
if seg_count == 0 {
already_mined += 1;
continue;
}
let fname_key = format!("_experience-{}", filename.trim_end_matches(".jsonl"));
let has_whole_file_key = mined.contains(&fname_key);
// Check per-segment keys, find unmined segments
let mut unmined_segs: Vec<usize> = Vec::new();
let mut has_any_seg_key = false;
for i in 0..seg_count {
let seg_key = format!("{}.{}", fname_key, i);
if mined.contains(&seg_key) {
has_any_seg_key = true;
} else {
unmined_segs.push(i);
}
}
// Migrate old whole-file key: if it exists but no per-segment keys,
// write per-segment keys for all current segments (they were mined
// under the old scheme)
if has_whole_file_key && !has_any_seg_key && seg_count > 0 {
migrate_keys.push((fname_key.clone(), path_str.clone(), seg_count));
// After migration, all current segments are covered
unmined_segs.clear();
}
if unmined_segs.is_empty() {
// All segments mined — check fact-mining
let fact_key = format!("_facts-{}", filename.trim_end_matches(".jsonl"));
if !fact_keys.contains(&fact_key) {
let task_name = format!("fact-mine:{}", filename);
if !active.contains(&task_name) {
needs_fact.push((filename, path_str));
}
} else {
already_mined += 1;
}
} else {
// Queue unmined segments
for i in unmined_segs {
let task_name = format!("extract:{}.{}", filename, i);
if active.contains(&task_name) { continue; }
needs_extract.push((
format!("{}.{}", filename, i),
path_str.clone(),
Some(i),
));
}
}
}
// Migrate old whole-file keys to per-segment keys
if !migrate_keys.is_empty() {
match crate::store::Store::load() {
Ok(mut store) => {
let mut ok = 0;
let mut fail = 0;
for (fname_key, path_str, seg_count) in &migrate_keys {
for i in 0..*seg_count {
let seg_key = format!("{}.{}", fname_key, i);
let content = format!("Migrated from whole-file key for {}", path_str);
let mut node = crate::store::new_node(&seg_key, &content);
node.provenance = "experience-mine:write".to_string();
match store.upsert_node(node) {
Ok(()) => ok += 1,
Err(e) => {
if fail == 0 {
eprintln!("migration upsert_node error: {}", e);
}
fail += 1;
}
}
}
}
if let Err(e) = store.save() {
eprintln!("migration save error: {}", e);
}
log_event("session-watcher", "migrated",
&format!("{} whole-file keys → per-segment ({} ok, {} fail)",
migrate_keys.len(), ok, fail));
}
Err(e) => {
eprintln!("migration store load error: {}", e);
}
}
}
// experience_mine and fact_mine killed — observation.agent handles transcript mining
_extract_remaining = needs_extract.len();
_fact_remaining = needs_fact.len();
let extract_pending = _extract_queued + _extract_remaining;
let fact_pending = _fact_remaining;
if extract_pending > 0 || fact_pending > 0 || still_open > 0 || backed_off > 0 {
log_event("session-watcher", "tick",
&format!("{} stale, {} mined, {} extract, {} fact, {} open, {} backoff",
total_stale, already_mined, extract_pending, fact_pending, still_open, backed_off));
let mut parts = Vec::new();
if extract_pending > 0 { parts.push(format!("{} extract", extract_pending)); }
if fact_pending > 0 { parts.push(format!("{} fact", fact_pending)); }
if still_open > 0 { parts.push(format!("{} open", still_open)); }
if backed_off > 0 { parts.push(format!("{} backoff", backed_off)); }
ctx.set_progress(parts.join(", "));
} else {
ctx.set_progress("idle");
}
write_status(&choir_sw, *last_daily_sw.lock().unwrap(), &graph_health_sw);
std::thread::sleep(SCHEDULER_INTERVAL);
}
});
// Scheduler: runs daily jobs based on filesystem state
let choir_sched = Arc::clone(&choir);
let llm_sched = Arc::clone(&llm);
let last_daily_sched = Arc::clone(&last_daily);
let graph_health_sched = Arc::clone(&graph_health);
let in_flight_sched = Arc::clone(&in_flight);
let log_dir_sched = task_log_dir.clone();
const CONSOLIDATION_INTERVAL: Duration = Duration::from_secs(6 * 3600); // 6 hours
choir.spawn("scheduler").init(move |ctx| {
let mut last_health = std::time::Instant::now() - HEALTH_INTERVAL;
let mut last_consolidation = std::time::Instant::now() - CONSOLIDATION_INTERVAL; // run on first tick
ctx.set_progress("idle");
loop {
if ctx.is_cancelled() {
return Err(TaskError::Fatal("cancelled".into()));
}
let today = chrono::Local::now().date_naive();
// Health check: every hour — also updates graph health metrics
if last_health.elapsed() >= HEALTH_INTERVAL {
let gh = Arc::clone(&graph_health_sched);
choir_sched.spawn("health").init(move |ctx| {
job_daily_check(ctx, &gh)
});
last_health = std::time::Instant::now();
}
// Consolidation cycle: every 6 hours (wait for health check to cache metrics first)
let gh = graph_health_sched.lock().unwrap().clone();
if last_consolidation.elapsed() >= CONSOLIDATION_INTERVAL && gh.is_some() {
log_event("scheduler", "consolidation-trigger",
&format!("{} (every 6h)", today));
// Use cached graph health plan (from consolidation_plan_quick).
let h = gh.as_ref().unwrap(); // guarded by gh.is_some() above
let plan = crate::neuro::ConsolidationPlan {
counts: h.plan_counts.clone(),
run_health: true,
rationale: Vec::new(),
};
let runs = plan.to_agent_runs(5);
let summary: Vec<String> = h.plan_counts.iter()
.filter(|(_, c)| **c > 0)
.map(|(a, c)| format!("{}{}", &a[..1], c))
.collect();
log_event("scheduler", "consolidation-plan",
&format!("{} agents ({})", runs.len(), summary.join(" ")));
// Phase 1: Agent runs — sequential within type, parallel across types.
// Same-type agents chain (they may touch overlapping graph regions),
// but different types run concurrently (different seed nodes).
let mut prev_by_type: std::collections::HashMap<String, jobkit::RunningTask> =
std::collections::HashMap::new();
for (i, (agent_type, batch)) in runs.iter().enumerate() {
let agent = agent_type.to_string();
let b = *batch;
let in_flight_clone = Arc::clone(&in_flight_sched);
let task_name = format!("c-{}-{}:{}", agent, i, today);
let mut builder = choir_sched.spawn(task_name)
.resource(&llm_sched)
.log_dir(&log_dir_sched)
.retries(1)
.init(move |ctx| {
job_consolidation_agent(ctx, &agent, b, &in_flight_clone)
});
if let Some(dep) = prev_by_type.get(agent_type.as_str()) {
builder.depend_on(dep);
}
prev_by_type.insert(agent_type.clone(), builder.run());
}
// Orphans phase depends on all agent type chains completing
let prev_agent = prev_by_type.into_values().last();
// Phase 2: Link orphans (CPU-only, no LLM)
let mut orphans = choir_sched.spawn(format!("c-orphans:{}", today))
.retries(1)
.init(move |ctx| job_link_orphans(ctx));
if let Some(ref dep) = prev_agent {
orphans.depend_on(dep);
}
let orphans = orphans.run();
// Phase 3: Cap degree
let mut cap = choir_sched.spawn(format!("c-cap:{}", today))
.retries(1)
.init(move |ctx| job_cap_degree(ctx));
cap.depend_on(&orphans);
let cap = cap.run();
// Phase 4: Generate digests
let mut digest = choir_sched.spawn(format!("c-digest:{}", today))
.resource(&llm_sched)
.retries(1)
.init(move |ctx| job_digest(ctx));
digest.depend_on(&cap);
let digest = digest.run();
// Phase 5: Apply digest links
let mut digest_links = choir_sched.spawn(format!("c-digest-links:{}", today))
.retries(1)
.init(move |ctx| job_digest_links(ctx));
digest_links.depend_on(&digest);
let digest_links = digest_links.run();
// Phase 7: Knowledge loop
let mut knowledge = choir_sched.spawn(format!("c-knowledge:{}", today))
.resource(&llm_sched)
.retries(1)
.init(move |ctx| job_knowledge_loop(ctx));
knowledge.depend_on(&digest_links);
*last_daily_sched.lock().unwrap() = Some(today);
last_consolidation = std::time::Instant::now();
ctx.set_progress(format!("daily pipeline triggered ({today})"));
}
// Prune finished tasks from registry
let pruned = choir_sched.gc_finished();
if pruned > 0 {
log::trace!("pruned {} finished tasks", pruned);
}
write_status(&choir_sched, *last_daily_sched.lock().unwrap(), &graph_health_sched);
std::thread::sleep(SCHEDULER_INTERVAL);
}
});
// Register RPC handlers
{
let last_daily_rpc = Arc::clone(&last_daily);
daemon.add_rpc_handler(move |cmd, _ctx| {
if cmd == "consolidate" {
*last_daily_rpc.lock().unwrap() = None;
log_event("rpc", "consolidate", "triggered via socket");
Some("{\"ok\":true,\"action\":\"consolidation scheduled\"}\n".into())
} else {
None
}
});
}
daemon.add_rpc_handler(|cmd, _ctx| {
if cmd != "reload-config" { return None; }
let changed = crate::config::reload();
let config = crate::config::get();
let api = config.api_base_url.as_deref().unwrap_or("(none)");
let model = config.api_model.as_deref().unwrap_or("(default)");
log_event("daemon", "config-reload",
&format!("changed={}, api={}, model={}", changed, api, model));
Some(format!("{{\"ok\":true,\"changed\":{},\"api_base_url\":\"{}\",\"api_model\":\"{}\"}}\n",
changed, api, model))
});
daemon.add_rpc_handler(|cmd, _ctx| {
if !cmd.starts_with("record-hits ") { return None; }
let keys: Vec<&str> = cmd.strip_prefix("record-hits ")
.unwrap_or("")
.split('\t')
.filter(|k| !k.is_empty())
.collect();
if keys.is_empty() {
return Some("{\"ok\":false,\"error\":\"no keys\"}\n".into());
}
let n = keys.len();
match crate::counters::record_search_hits(&keys) {
Ok(()) => Some(format!("{{\"ok\":true,\"recorded\":{}}}\n", n)),
Err(e) => Some(format!("{{\"ok\":false,\"error\":\"{}\"}}\n", e.replace('"', "'"))),
}
});
{
let choir_rpc = Arc::clone(&choir);
let llm_rpc = Arc::clone(&llm);
let log_dir_rpc = task_log_dir.clone();
let in_flight_rpc = Arc::clone(&in_flight);
daemon.add_rpc_handler(move |cmd, _ctx| {
if !cmd.starts_with("run-agent ") { return None; }
let parts: Vec<&str> = cmd.splitn(4, ' ').collect();
let agent_type = parts.get(1).unwrap_or(&"replay");
let count: usize = parts.get(2)
.and_then(|s| s.parse().ok())
.unwrap_or(1);
// Optional target key: "run-agent linker 1 target:KEY"
let target_key: Option<String> = parts.get(3)
.and_then(|s| s.strip_prefix("target:"))
.map(|s| s.to_string());
let batch_size = 5;
let today = chrono::Local::now().format("%Y-%m-%d");
let ts = chrono::Local::now().format("%H%M%S");
let mut prev = None;
let mut spawned = 0;
let mut remaining = count;
let is_rename = *agent_type == "rename";
// Targeted run: one task for a specific node
if let Some(ref key) = target_key {
let agent = agent_type.to_string();
let key = key.clone();
let task_name = format!("c-{}-{}:{}", agent, key.chars().take(30).collect::<String>(), today);
if jobkit::daemon::event_log::enabled(jobkit::daemon::event_log::LogLevel::Verbose) {
log_event("daemon", "spawn-targeted",
&format!("{} (pool: {}/{})", task_name, llm_rpc.available(), llm_rpc.capacity()));
}
choir_rpc.spawn(task_name)
.resource(&llm_rpc)
.log_dir(&log_dir_rpc)
.retries(1)
.init(move |ctx| {
job_targeted_agent(ctx, &agent, &key)
})
.run();
spawned = 1;
remaining = 0;
}
while remaining > 0 {
let batch = remaining.min(batch_size);
let agent = agent_type.to_string();
let in_flight_clone = Arc::clone(&in_flight_rpc);
let task_name = format!("c-{}-rpc{}:{}", agent, ts, today);
let mut builder = choir_rpc.spawn(task_name)
.resource(&llm_rpc)
.log_dir(&log_dir_rpc)
.retries(1)
.init(move |ctx| {
if is_rename {
job_rename_agent(ctx, batch)
} else {
job_consolidation_agent(ctx, &agent, batch, &in_flight_clone)
}
});
if let Some(ref dep) = prev {
builder.depend_on(dep);
}
prev = Some(builder.run());
remaining -= batch;
spawned += 1;
}
log_event("rpc", "run-agent", &format!("{} x{}", agent_type, count));
Some(format!("{{\"ok\":true,\"action\":\"queued {} {} run(s) ({} tasks)\"}}\n",
count, agent_type, spawned))
});
}
// Main thread: socket server + signal handling
let last_daily_status = Arc::clone(&last_daily);
let graph_health_status = Arc::clone(&graph_health);
daemon.run(move |ctx| {
build_status(&ctx.choir, *last_daily_status.lock().unwrap(), &graph_health_status)
});
log_event("daemon", "stopping", "");
eprintln!("Shutting down...");
log_event("daemon", "stopped", "");
std::process::exit(0)
}
pub fn send_rpc_pub(cmd: &str) -> Option<String> {
send_rpc(cmd)
}
fn send_rpc(cmd: &str) -> Option<String> {
jobkit::daemon::socket::send_rpc(&crate::config::get().data_dir, cmd)
}
pub fn rpc_consolidate() -> Result<(), String> {
match send_rpc("consolidate") {
Some(resp) => {
println!("{}", resp.trim());
Ok(())
}
None => Err("Daemon not running.".into()),
}
}
/// Record search hits for the given keys (fire-and-forget from memory-search).
pub fn rpc_record_hits(keys: &[&str]) -> Result<(), String> {
if keys.is_empty() { return Ok(()); }
let cmd = format!("record-hits {}", keys.join("\t"));
match send_rpc(&cmd) {
Some(_) => Ok(()),
None => Err("Daemon not running.".into()),
}
}
pub fn rpc_run_agent(agent: &str, count: usize) -> Result<(), String> {
let cmd = format!("run-agent {} {}", agent, count);
match send_rpc(&cmd) {
Some(resp) => {
println!("{}", resp.trim());
Ok(())
}
None => Err("Daemon not running.".into()),
}
}
fn read_status_socket() -> Option<DaemonStatus> {
let json = jobkit::daemon::socket::send_rpc(&crate::config::get().data_dir, "")?;
serde_json::from_str(&json).ok()
}
// status_socket_loop has been replaced by daemon.run() in jobkit-daemon.
fn build_status(
choir: &Choir,
last_daily: Option<chrono::NaiveDate>,
graph_health: &Arc<Mutex<Option<GraphHealth>>>,
) -> DaemonStatus {
DaemonStatus {
pid: std::process::id(),
tasks: choir.task_statuses(),
last_daily: last_daily.map(|d| d.to_string()),
graph_health: graph_health.lock().unwrap().clone(),
}
}
// --- Status display ---
fn format_duration_human(ms: u128) -> String {
if ms < 1_000 {
format!("{}ms", ms)
} else if ms < 60_000 {
format!("{:.1}s", ms as f64 / 1000.0)
} else if ms < 3_600_000 {
format!("{:.0}m{:.0}s", ms / 60_000, (ms % 60_000) / 1000)
} else {
format!("{:.0}h{:.0}m", ms / 3_600_000, (ms % 3_600_000) / 60_000)
}
}
fn task_group(name: &str) -> &str {
if name == "session-watcher" || name == "scheduler" { "core" }
else if name.starts_with("extract:") || name.starts_with("fact-mine:") { "extract" }
else if name.starts_with("c-") || name.starts_with("consolidate:")
|| name.starts_with("knowledge-loop:") || name.starts_with("digest:")
|| name.starts_with("decay:") { "daily" }
else if name == "health" { "health" }
else { "other" }
}
/// Compute elapsed time for a task, using absolute started_at if available.
fn task_elapsed(t: &TaskInfo) -> Duration {
if matches!(t.status, TaskStatus::Running) {
if let Some(started) = t.started_at {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64();
Duration::from_secs_f64((now - started).max(0.0))
} else {
t.elapsed
}
} else {
t.result.as_ref()
.map(|r| r.duration)
.unwrap_or(t.elapsed)
}
}
fn status_symbol(t: &TaskInfo) -> &'static str {
if t.cancelled { return "" }
match t.status {
TaskStatus::Running => "",
TaskStatus::Completed => "",
TaskStatus::Failed => "",
TaskStatus::Pending => "·",
}
}
/// Shorten a job name for display: "experience-mine /long/path/uuid.jsonl" → "experience-mine uuid…"
fn short_job_name(job: &str) -> String {
// Split "verb path" or just return as-is
if let Some((verb, path)) = job.split_once(' ') {
let file = path.rsplit('/').next().unwrap_or(path);
let file = file.strip_suffix(".jsonl").unwrap_or(file);
let short = if file.len() > 12 { &file[..12] } else { file };
format!("{} {}", verb, short)
} else {
job.to_string()
}
}
fn show_recent_completions(n: usize) {
let path = log_path();
let content = match fs::read_to_string(&path) {
Ok(c) => c,
Err(_) => return,
};
let recent: Vec<&str> = content.lines().rev()
.filter(|line| {
line.contains("\"event\":\"completed\"") || line.contains("\"event\":\"failed\"")
})
.take(n)
.collect();
if recent.is_empty() { return; }
eprintln!(" Recent:");
for line in recent.iter().rev() {
if let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) {
let ts = obj.get("ts").and_then(|v| v.as_str()).unwrap_or("?");
let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("?");
let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("?");
let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or("");
let time = if ts.len() >= 19 { &ts[11..19] } else { ts };
let sym = if event == "completed" { "" } else { "" };
let name = short_job_name(job);
eprintln!(" {} {} {:30} {}", sym, time, name, detail);
}
}
eprintln!();
}
pub fn show_status() -> Result<(), String> {
let status = match read_status_socket() {
Some(s) => s,
None => {
eprintln!("Daemon not running.");
return Ok(());
}
};
let uptime_str = proc_uptime(status.pid).unwrap_or_default();
if uptime_str.is_empty() {
eprintln!("poc-memory daemon pid={}", status.pid);
} else {
eprintln!("poc-memory daemon pid={} uptime {}", status.pid, uptime_str);
}
if status.tasks.is_empty() {
eprintln!("\n No tasks");
return Ok(());
}
// Count by status
let running = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)).count();
let pending = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count();
let completed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count();
let failed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count();
eprintln!(" tasks: {} running, {} pending, {} done, {} failed",
running, pending, completed, failed);
// Graph health
if let Some(ref gh) = status.graph_health {
eprintln!();
fn indicator(val: f32, target: f32, higher_is_better: bool) -> &'static str {
let ok = if higher_is_better { val >= target } else { val <= target };
if ok { "" } else { "" }
}
eprintln!(" Graph health ({})", gh.computed_at);
eprintln!(" {} nodes, {} edges, {} communities",
gh.nodes, gh.edges, gh.communities);
eprintln!(" {} α={:.2} (≥2.5) {} gini={:.3} (≤0.4) {} cc={:.3} (≥0.2)",
indicator(gh.alpha, 2.5, true), gh.alpha,
indicator(gh.gini, 0.4, false), gh.gini,
indicator(gh.avg_cc, 0.2, true), gh.avg_cc);
eprintln!(" {} episodic={:.0}% (<40%) σ={:.1}",
indicator(gh.episodic_ratio, 0.4, false), gh.episodic_ratio * 100.0,
gh.sigma);
let plan_total: usize = gh.plan_counts.values().sum::<usize>() + 1;
let plan_summary: Vec<String> = gh.plan_counts.iter()
.filter(|(_, c)| **c > 0)
.map(|(a, c)| format!("{}{}", &a[..1], c))
.collect();
eprintln!(" consolidation plan: {} agents ({} +health)",
plan_total, plan_summary.join(" "));
}
eprintln!();
// Group and display
let groups: &[(&str, &str)] = &[
("core", "Core"),
("daily", "Daily pipeline"),
("extract", "Session extraction"),
("health", "Health"),
("other", "Other"),
];
// In-flight tasks first (running + pending)
let in_flight: Vec<&TaskInfo> = status.tasks.iter()
.filter(|t| matches!(t.status, TaskStatus::Running | TaskStatus::Pending))
.collect();
if !in_flight.is_empty() {
eprintln!(" In flight:");
for t in &in_flight {
let sym = status_symbol(t);
let e = task_elapsed(t);
let elapsed = if !e.is_zero() {
format!(" {}", format_duration_human(e.as_millis()))
} else {
String::new()
};
let progress = t.progress.as_deref()
.filter(|p| *p != "idle")
.map(|p| format!(" {}", p))
.unwrap_or_default();
let name = short_job_name(&t.name);
eprintln!(" {} {:30}{}{}", sym, name, elapsed, progress);
if let Some(ref lp) = t.log_path {
// tail from log file
if matches!(t.status, TaskStatus::Running) {
eprintln!(" │ log: {}", lp);
}
}
}
eprintln!();
}
// Recent completions from log file
show_recent_completions(20);
// Detailed group view only if there are failures worth showing
for (group_id, group_label) in groups {
let tasks: Vec<&TaskInfo> = status.tasks.iter()
.filter(|t| task_group(&t.name) == *group_id)
.collect();
if tasks.is_empty() { continue; }
// For extract group, show summary instead of individual tasks
if *group_id == "extract" {
let n_pending = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count();
let n_running = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)).count();
let n_done = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count();
let n_failed = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count();
eprintln!(" {} ({} total)", group_label, tasks.len());
if n_running > 0 {
for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)) {
let e = task_elapsed(t);
let elapsed = if !e.is_zero() {
format!(" ({})", format_duration_human(e.as_millis()))
} else {
String::new()
};
let progress = t.progress.as_deref().map(|p| format!(" {}", p)).unwrap_or_default();
eprintln!(" {} {}{}{}", status_symbol(t), t.name, elapsed, progress);
if let Some(ref lp) = t.log_path {
eprintln!(" │ log: {}", lp);
}
}
}
let mut parts = Vec::new();
if n_done > 0 { parts.push(format!("{} done", n_done)); }
if n_running > 0 { parts.push(format!("{} running", n_running)); }
if n_pending > 0 { parts.push(format!("{} queued", n_pending)); }
if n_failed > 0 { parts.push(format!("{} FAILED", n_failed)); }
eprintln!(" {}", parts.join(", "));
// Show recent failures
for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).take(3) {
if let Some(ref r) = t.result {
if let Some(ref err) = r.error {
let short = if err.len() > 80 { &err[..80] } else { err };
eprintln!("{}: {}", t.name, short);
}
}
}
eprintln!();
continue;
}
eprintln!(" {}", group_label);
for t in &tasks {
let sym = status_symbol(t);
let e = task_elapsed(t);
let duration = if !e.is_zero() {
format_duration_human(e.as_millis())
} else {
String::new()
};
let retry = if t.max_retries > 0 && t.retry_count > 0 {
format!(" retry {}/{}", t.retry_count, t.max_retries)
} else {
String::new()
};
let detail = if matches!(t.status, TaskStatus::Failed) {
t.result.as_ref()
.and_then(|r| r.error.as_ref())
.map(|e| {
let short = if e.len() > 60 { &e[..60] } else { e };
format!(" err: {}", short)
})
.unwrap_or_default()
} else {
String::new()
};
if duration.is_empty() {
eprintln!(" {} {:30}{}{}", sym, t.name, retry, detail);
} else {
eprintln!(" {} {:30} {:>8}{}{}", sym, t.name, duration, retry, detail);
}
// Show output log tail for running tasks
if let Some(ref lp) = t.log_path {
// tail from log file
if matches!(t.status, TaskStatus::Running) {
eprintln!(" │ log: {}", lp);
}
}
}
eprintln!();
}
Ok(())
}
pub fn install_service() -> Result<(), String> {
let exe = std::env::current_exe()
.map_err(|e| format!("current_exe: {}", e))?;
let home = std::env::var("HOME").map_err(|e| format!("HOME: {}", e))?;
let unit_dir = PathBuf::from(&home).join(".config/systemd/user");
fs::create_dir_all(&unit_dir)
.map_err(|e| format!("create {}: {}", unit_dir.display(), e))?;
let unit = format!(
r#"[Unit]
Description=poc-memory daemon — background memory maintenance
After=default.target
[Service]
Type=simple
ExecStart={exe} agent daemon
Restart=on-failure
RestartSec=30
Environment=HOME={home}
Environment=PATH={home}/.cargo/bin:{home}/.local/bin:{home}/bin:/usr/local/bin:/usr/bin:/bin
[Install]
WantedBy=default.target
"#, exe = exe.display(), home = home);
let unit_path = unit_dir.join("poc-memory.service");
fs::write(&unit_path, &unit)
.map_err(|e| format!("write {}: {}", unit_path.display(), e))?;
eprintln!("Wrote {}", unit_path.display());
let status = std::process::Command::new("systemctl")
.args(["--user", "daemon-reload"])
.status()
.map_err(|e| format!("systemctl daemon-reload: {}", e))?;
if !status.success() {
return Err("systemctl daemon-reload failed".into());
}
let status = std::process::Command::new("systemctl")
.args(["--user", "enable", "--now", "poc-memory"])
.status()
.map_err(|e| format!("systemctl enable: {}", e))?;
if !status.success() {
return Err("systemctl enable --now failed".into());
}
eprintln!("Service enabled and started");
// Install poc-daemon service
install_notify_daemon(&unit_dir, &home)?;
// Install memory-search + poc-hook into Claude settings
install_hook()?;
Ok(())
}
/// Install the poc-daemon (notification/idle) systemd user service.
fn install_notify_daemon(unit_dir: &Path, home: &str) -> Result<(), String> {
let poc_daemon = PathBuf::from(home).join(".cargo/bin/poc-daemon");
if !poc_daemon.exists() {
eprintln!("Warning: poc-daemon not found at {} — skipping service install", poc_daemon.display());
eprintln!(" Build with: cargo install --path .");
return Ok(());
}
let unit = format!(
r#"[Unit]
Description=poc-daemon — notification routing and idle management
After=default.target
[Service]
Type=simple
ExecStart={exe} agent daemon
Restart=on-failure
RestartSec=10
Environment=HOME={home}
Environment=PATH={home}/.cargo/bin:{home}/.local/bin:{home}/bin:/usr/local/bin:/usr/bin:/bin
[Install]
WantedBy=default.target
"#, exe = poc_daemon.display(), home = home);
let unit_path = unit_dir.join("poc-daemon.service");
fs::write(&unit_path, &unit)
.map_err(|e| format!("write {}: {}", unit_path.display(), e))?;
eprintln!("Wrote {}", unit_path.display());
let status = std::process::Command::new("systemctl")
.args(["--user", "daemon-reload"])
.status()
.map_err(|e| format!("systemctl daemon-reload: {}", e))?;
if !status.success() {
return Err("systemctl daemon-reload failed".into());
}
let status = std::process::Command::new("systemctl")
.args(["--user", "enable", "--now", "poc-daemon"])
.status()
.map_err(|e| format!("systemctl enable: {}", e))?;
if !status.success() {
return Err("systemctl enable --now poc-daemon failed".into());
}
eprintln!("poc-daemon service enabled and started");
Ok(())
}
/// Install memory-search and poc-hook into Claude Code settings.json.
/// Public so `poc-memory init` can call it too.
///
/// Hook layout:
/// UserPromptSubmit: memory-search (10s), poc-hook (5s)
/// PostToolUse: poc-hook (5s)
/// Stop: poc-hook (5s)
pub fn install_hook() -> Result<(), String> {
let home = std::env::var("HOME").map_err(|e| format!("HOME: {}", e))?;
let exe = std::env::current_exe()
.map_err(|e| format!("current_exe: {}", e))?;
let settings_path = PathBuf::from(&home).join(".claude/settings.json");
let memory_search = exe.with_file_name("memory-search");
let poc_hook = exe.with_file_name("poc-hook");
let mut settings: serde_json::Value = if settings_path.exists() {
let content = fs::read_to_string(&settings_path)
.map_err(|e| format!("read settings: {}", e))?;
serde_json::from_str(&content)
.map_err(|e| format!("parse settings: {}", e))?
} else {
serde_json::json!({})
};
let obj = settings.as_object_mut().ok_or("settings not an object")?;
let hooks_obj = obj.entry("hooks")
.or_insert_with(|| serde_json::json!({}))
.as_object_mut().ok_or("hooks not an object")?;
let mut changed = false;
// Helper: ensure a hook binary is present in an event's hook list
let ensure_hook = |hooks_obj: &mut serde_json::Map<String, serde_json::Value>,
event: &str,
binary: &Path,
timeout: u32,
changed: &mut bool| {
if !binary.exists() {
eprintln!("Warning: {} not found — skipping", binary.display());
return;
}
let cmd = binary.to_string_lossy().to_string();
let name = binary.file_name().unwrap().to_string_lossy().to_string();
let event_array = hooks_obj.entry(event)
.or_insert_with(|| serde_json::json!([{"hooks": []}]))
.as_array_mut().unwrap();
if event_array.is_empty() {
event_array.push(serde_json::json!({"hooks": []}));
}
let inner = event_array[0]
.as_object_mut().unwrap()
.entry("hooks")
.or_insert_with(|| serde_json::json!([]))
.as_array_mut().unwrap();
// Remove legacy load-memory.sh
let before = inner.len();
inner.retain(|h| {
let c = h.get("command").and_then(|c| c.as_str()).unwrap_or("");
!c.contains("load-memory")
});
if inner.len() < before {
eprintln!("Removed load-memory.sh from {event}");
*changed = true;
}
let already = inner.iter().any(|h| {
h.get("command").and_then(|c| c.as_str())
.is_some_and(|c| c.contains(&name))
});
if !already {
inner.push(serde_json::json!({
"type": "command",
"command": cmd,
"timeout": timeout
}));
*changed = true;
eprintln!("Installed {name} in {event}");
}
};
// UserPromptSubmit: memory-search + poc-hook
ensure_hook(hooks_obj, "UserPromptSubmit", &memory_search, 10, &mut changed);
ensure_hook(hooks_obj, "UserPromptSubmit", &poc_hook, 5, &mut changed);
// PostToolUse + Stop: poc-hook only
ensure_hook(hooks_obj, "PostToolUse", &poc_hook, 5, &mut changed);
ensure_hook(hooks_obj, "Stop", &poc_hook, 5, &mut changed);
if changed {
let json = serde_json::to_string_pretty(&settings)
.map_err(|e| format!("serialize settings: {}", e))?;
fs::write(&settings_path, json)
.map_err(|e| format!("write settings: {}", e))?;
eprintln!("Updated {}", settings_path.display());
} else {
eprintln!("All hooks already installed in {}", settings_path.display());
}
Ok(())
}
/// Drill down into a task's log file. Finds the log path from:
/// 1. Running task status (daemon-status.json)
/// 2. daemon.log started events (for completed/failed tasks)
pub fn show_task_log(task_name: &str, lines: usize) -> Result<(), String> {
// Try running tasks first
if let Some(status_json) = send_rpc_pub("") {
if let Ok(status) = serde_json::from_str::<serde_json::Value>(&status_json) {
if let Some(tasks) = status.get("tasks").and_then(|t| t.as_array()) {
for t in tasks {
let name = t.get("name").and_then(|n| n.as_str()).unwrap_or("");
if name.contains(task_name) {
if let Some(lp) = t.get("log_path").and_then(|p| p.as_str()) {
return tail_file(lp, lines);
}
}
}
}
}
}
// Fall back to searching daemon.log for the most recent started event with a log path
let log = log_path();
if log.exists() {
let content = fs::read_to_string(&log).map_err(|e| format!("read log: {}", e))?;
for line in content.lines().rev() {
if let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) {
let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("");
let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("");
let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or("");
if job.contains(task_name) && event == "started" && detail.starts_with("log: ") {
let path = &detail[5..];
return tail_file(path, lines);
}
}
}
}
Err(format!("no log file found for task '{}'", task_name))
}
fn tail_file(path: &str, lines: usize) -> Result<(), String> {
let content = fs::read_to_string(path)
.map_err(|e| format!("read {}: {}", path, e))?;
let all_lines: Vec<&str> = content.lines().collect();
let skip = all_lines.len().saturating_sub(lines);
eprintln!("--- {} ({} lines) ---", path, all_lines.len());
for line in &all_lines[skip..] {
eprintln!("{}", line);
}
Ok(())
}
pub fn show_log(job_filter: Option<&str>, lines: usize) -> Result<(), String> {
let path = log_path();
if !path.exists() {
eprintln!("No daemon log found.");
return Ok(());
}
let content = fs::read_to_string(&path)
.map_err(|e| format!("read log: {}", e))?;
let filtered: Vec<&str> = content.lines().rev()
.filter(|line| {
if let Some(job) = job_filter {
line.contains(&format!("\"job\":\"{}\"", job))
} else {
true
}
})
.take(lines)
.collect();
if filtered.is_empty() {
eprintln!("No log entries{}", job_filter.map(|j| format!(" for job '{}'", j)).unwrap_or_default());
return Ok(());
}
// Pretty-print: parse JSON and format as "TIME JOB EVENT [DETAIL]"
for line in filtered.into_iter().rev() {
if let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) {
let ts = obj.get("ts").and_then(|v| v.as_str()).unwrap_or("?");
let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("?");
let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("?");
let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or("");
// Shorten timestamp to just time portion
let time = if ts.len() >= 19 { &ts[11..19] } else { ts };
if detail.is_empty() {
eprintln!(" {} {:20} {}", time, job, event);
} else {
// Truncate long details (file paths)
let short = if detail.len() > 60 {
let last = detail.rfind('/').map(|i| &detail[i+1..]).unwrap_or(detail);
if last.len() > 60 { &last[..60] } else { last }
} else {
detail
};
eprintln!(" {} {:20} {:12} {}", time, job, event, short);
}
} else {
eprintln!("{}", line);
}
}
Ok(())
}