consciousness/poc-memory/src/agents/daemon.rs
Kent Overstreet 3640de444b cleanup: fix clippy warnings in daemon.rs
- Remove dead code (job_split_one function never called)
- Fix needless borrows (ctx.log_line(&format! -> format!))
- Fix slice clone ([key.clone()] -> std::slice::from_ref(&key))
- Collapse nested if statements
- Fix unwrap after is_some check
- Remove redundant closures in task spawning

Reduces daemon.rs from 2030 to 1825 lines.
2026-03-21 19:42:03 -04:00

1825 lines
70 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// poc-memory daemon: background job orchestration for memory maintenance
//
// Replaces the fragile cron+shell approach with a single long-running process
// that owns all background memory work. Uses jobkit for worker pool, status
// tracking, retry, and cancellation.
//
// Architecture:
// - Scheduler task: runs every 60s, scans filesystem state, spawns jobs
// - Session watcher task: detects ended Claude sessions, triggers extraction
// - Jobs shell out to existing poc-memory subcommands (Phase 1)
// - Status written to daemon-status.json for `poc-memory daemon status`
//
// Phase 2 will inline job logic; Phase 3 integrates into poc-agent.
use jobkit::{Choir, ExecutionContext, TaskError, TaskInfo, TaskStatus};
use std::collections::{HashMap, HashSet};
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
const SESSION_STALE_SECS: u64 = 600; // 10 minutes
const SCHEDULER_INTERVAL: Duration = Duration::from_secs(60);
const HEALTH_INTERVAL: Duration = Duration::from_secs(3600);
// --- Persistent task queue ---
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
struct PendingTask {
id: String,
agent: String,
batch: usize,
#[serde(default)]
target_key: Option<String>,
}
struct TaskQueue {
path: PathBuf,
tasks: Mutex<Vec<PendingTask>>,
}
impl TaskQueue {
fn load(data_dir: &Path) -> Arc<Self> {
let path = data_dir.join("pending-tasks.jsonl");
let tasks = if path.exists() {
fs::read_to_string(&path)
.unwrap_or_default()
.lines()
.filter_map(|l| serde_json::from_str(l).ok())
.collect()
} else {
Vec::new()
};
let count = tasks.len();
if count > 0 {
log_event("task-queue", "loaded", &format!("{} pending tasks", count));
}
Arc::new(Self { path, tasks: Mutex::new(tasks) })
}
fn push(&self, task: PendingTask) {
let mut tasks = self.tasks.lock().unwrap();
tasks.push(task);
self.write_locked(&tasks);
}
fn remove(&self, id: &str) {
let mut tasks = self.tasks.lock().unwrap();
tasks.retain(|t| t.id != id);
self.write_locked(&tasks);
}
fn drain(&self) -> Vec<PendingTask> {
let tasks = self.tasks.lock().unwrap();
tasks.clone()
}
fn write_locked(&self, tasks: &[PendingTask]) {
let content: String = tasks.iter()
.filter_map(|t| serde_json::to_string(t).ok())
.collect::<Vec<_>>()
.join("\n");
let _ = fs::write(&self.path, if content.is_empty() { String::new() } else { content + "\n" });
}
}
fn log_path() -> PathBuf {
crate::config::get().data_dir.join("daemon.log")
}
// --- Logging ---
fn log_event(job: &str, event: &str, detail: &str) {
jobkit::daemon::event_log::log(&crate::config::get().data_dir, job, event, detail);
}
/// Public wrapper for logging from other agent modules.
pub fn log_event_pub(job: &str, event: &str, detail: &str) {
log_event(job, event, detail);
}
/// Verbose log — only written if verbose logging is enabled.
pub fn log_verbose(job: &str, event: &str, detail: &str) {
jobkit::daemon::event_log::verbose(&crate::config::get().data_dir, job, event, detail);
}
// --- Job functions (direct, no subprocess) ---
static DAEMON_POOL: std::sync::OnceLock<Arc<jobkit::ResourcePool>> = std::sync::OnceLock::new();
/// Run a named job with logging, progress reporting, and error mapping.
fn run_job(ctx: &ExecutionContext, name: &str, f: impl FnOnce() -> Result<(), String>) -> Result<(), TaskError> {
let pool = DAEMON_POOL.get().map(|p| p.as_ref());
jobkit::daemon::Daemon::run_job(&crate::config::get().data_dir, ctx, name, pool, f)
}
// experience_mine and fact_mine removed — observation.agent handles all transcript mining
/// Run an agent targeted at a specific node key.
fn job_targeted_agent(
ctx: &ExecutionContext,
agent_type: &str,
target_key: &str,
) -> Result<(), TaskError> {
let agent = agent_type.to_string();
let key = target_key.to_string();
let job_name = format!("c-{}-target({})", agent, key);
run_job(ctx, &job_name, || {
let mut store = crate::store::Store::load()?;
ctx.log_line(format!("targeting: {}", key));
let log = |msg: &str| {
ctx.log_line(msg);
log_event(&job_name, "progress", msg);
};
super::knowledge::run_one_agent_with_keys(
&mut store, &agent, std::slice::from_ref(&key), 5, "daemon", &log, false,
)?;
ctx.log_line("done");
Ok(())
})
}
/// Run a single consolidation agent (replay, linker, separator, transfer, health).
/// Shared set of node keys currently being processed by agents.
/// Prevents concurrent agents from working on overlapping graph regions.
type InFlightNodes = Arc<Mutex<std::collections::HashSet<String>>>;
fn job_consolidation_agent(
ctx: &ExecutionContext,
agent_type: &str,
batch_size: usize,
in_flight: &InFlightNodes,
) -> Result<(), TaskError> {
let agent = agent_type.to_string();
let batch = batch_size;
let job_name = format!("c-{}", agent);
let in_flight = Arc::clone(in_flight);
run_job(ctx, &job_name, || {
ctx.log_line("loading store");
let mut store = crate::store::Store::load()?;
// Claim seeds: lock in-flight set, run query excluding it,
// add selected seeds + strongly-connected neighbors, then unlock.
let mut claimed_keys: Vec<String>;
let graph = store.build_graph();
{
let mut locked = in_flight.lock().unwrap();
ctx.log_line(format!("running agent: {} (batch={}, {} in-flight)",
agent, batch, locked.len()));
// Run the agent's query, filtering out in-flight nodes
let def = super::defs::get_def(&agent)
.ok_or_else(|| format!("no .agent file for {}", agent))?;
let query = &def.query;
let keys = if !query.is_empty() {
use crate::query::engine as search;
let mut stages = search::Stage::parse_pipeline(query)?;
let padded = batch + locked.len().min(100);
if !stages.iter().any(|s| matches!(s, search::Stage::Transform(search::Transform::Limit(_)))) {
stages.push(search::Stage::Transform(search::Transform::Limit(padded)));
}
let results = search::run_query(&stages, vec![], &graph, &store, false, padded);
results.into_iter()
.map(|(k, _)| k)
.filter(|k| !locked.contains(k))
.take(batch)
.collect::<Vec<_>>()
} else {
vec![]
};
if keys.is_empty() {
return Err("query returned no results (after exclusion)".into());
}
// Claim seeds + strongly-connected neighbors.
// Only exclude neighbors with score > threshold to avoid
// blacking out the graph via high-degree hub nodes.
claimed_keys = Vec::with_capacity(batch * 20);
for key in &keys {
claimed_keys.push(key.clone());
locked.insert(key.clone());
for (nbr, strength) in graph.neighbors(key) {
let weight = store.nodes.get(nbr.as_str())
.map(|n| n.weight).unwrap_or(0.1);
if strength * weight > 0.3 {
claimed_keys.push(nbr.clone());
locked.insert(nbr.clone());
}
}
}
}
// in_flight lock released — run LLM without holding it
let log = |msg: &str| {
ctx.log_line(msg);
log_event(&job_name, "progress", msg);
};
// Use run_one_agent_with_keys — we already selected seeds above,
// no need to re-run the query.
let result = super::knowledge::run_one_agent_with_keys(
&mut store, &agent, &claimed_keys, batch, "consolidate", &log, false,
).map(|_| ());
// Release all claimed keys (seeds + neighbors)
{
let mut locked = in_flight.lock().unwrap();
for key in &claimed_keys {
locked.remove(key);
}
}
result?;
ctx.log_line("done");
Ok(())
})
}
/// Run the rename agent: generates renames via LLM, applies them directly.
fn job_rename_agent(
ctx: &ExecutionContext,
batch_size: usize,
) -> Result<(), TaskError> {
run_job(ctx, "c-rename", || {
ctx.log_line("loading store");
let mut store = crate::store::Store::load()?;
let batch = if batch_size == 0 { 10 } else { batch_size };
ctx.log_line(format!("running rename agent (batch={})", batch));
let log = |msg: &str| ctx.log_line(msg);
let result = super::knowledge::run_one_agent(&mut store, "rename", batch, "consolidate", &log, false)?;
// Parse RENAME actions from response (rename uses its own format, not WRITE_NODE/LINK/REFINE)
let mut applied = 0;
let mut skipped = 0;
for line in result.output.lines() {
let trimmed = line.trim();
if !trimmed.starts_with("RENAME ") { continue; }
let parts: Vec<&str> = trimmed[7..].splitn(2, ' ').collect();
if parts.len() != 2 { skipped += 1; continue; }
let old_key = parts[0].trim();
let new_key = parts[1].trim();
if old_key.is_empty() || new_key.is_empty() { skipped += 1; continue; }
let resolved = match store.resolve_key(old_key) {
Ok(k) => k,
Err(e) => {
ctx.log_line(format!("skip: {}{}: {}", old_key, new_key, e));
skipped += 1;
continue;
}
};
if store.nodes.contains_key(new_key) {
ctx.log_line(format!("skip: {} already exists", new_key));
skipped += 1;
continue;
}
match store.rename_node(&resolved, new_key) {
Ok(()) => {
ctx.log_line(format!("renamed: {}{}", resolved, new_key));
applied += 1;
}
Err(e) => {
ctx.log_line(format!("error: {}{}: {}", resolved, new_key, e));
skipped += 1;
}
}
}
if applied > 0 {
store.save()?;
}
ctx.log_line(format!("done: {} applied, {} skipped", applied, skipped));
Ok(())
})
}
/// Link orphan nodes (CPU-heavy, no LLM).
fn job_link_orphans(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "c-orphans", || {
ctx.log_line("loading store");
let mut store = crate::store::Store::load()?;
ctx.log_line("linking orphans");
let (orphans, added) = crate::neuro::link_orphans(&mut store, 2, 3, 0.15);
ctx.log_line(format!("{} orphans, {} links added", orphans, added));
Ok(())
})
}
/// Cap node degree to prevent mega-hubs.
fn job_cap_degree(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "c-cap", || {
ctx.log_line("loading store");
let mut store = crate::store::Store::load()?;
ctx.log_line("capping degree");
match store.cap_degree(50) {
Ok((hubs, pruned)) => {
store.save()?;
ctx.log_line(format!("{} hubs capped, {} edges pruned", hubs, pruned));
Ok(())
}
Err(e) => Err(e),
}
})
}
/// Apply links extracted from digests.
fn job_digest_links(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "c-digest-links", || {
ctx.log_line("loading store");
let mut store = crate::store::Store::load()?;
ctx.log_line("applying digest links");
let links = super::digest::parse_all_digest_links(&store);
let (applied, skipped, fallbacks) = super::digest::apply_digest_links(&mut store, &links);
store.save()?;
ctx.log_line(format!("{} applied, {} skipped, {} fallbacks", applied, skipped, fallbacks));
Ok(())
})
}
fn job_knowledge_loop(_ctx: &ExecutionContext) -> Result<(), TaskError> {
// Knowledge loop removed — agents now use tool calls directly.
// Consolidation is handled by consolidate_full() in the consolidate job.
Ok(())
}
fn job_digest(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "digest", || {
ctx.log_line("loading store");
let mut store = crate::store::Store::load()?;
ctx.log_line("generating digests");
crate::digest::digest_auto(&mut store)
})
}
fn job_daily_check(
ctx: &ExecutionContext,
graph_health: &Arc<Mutex<Option<GraphHealth>>>,
) -> Result<(), TaskError> {
let gh = Arc::clone(graph_health);
run_job(ctx, "daily-check", || {
ctx.log_line("loading store");
let store = crate::store::Store::load()?;
ctx.log_line("checking health");
let _report = crate::neuro::daily_check(&store);
// Decay search hit counters (10% daily decay)
ctx.log_line("decaying search counters");
match crate::counters::decay_all(0.9) {
Ok(removed) => ctx.log_line(format!("decayed counters, removed {}", removed)),
Err(e) => ctx.log_line(format!("counter decay failed: {}", e)),
}
// Compute graph health metrics for status display
ctx.log_line("computing graph health");
let health = compute_graph_health(&store);
*gh.lock().unwrap() = Some(health);
Ok(())
})
}
fn compute_graph_health(store: &crate::store::Store) -> GraphHealth {
let graph = store.build_graph();
let snap = crate::graph::current_metrics(&graph);
let episodic_count = store.nodes.iter()
.filter(|(_, n)| matches!(n.node_type, crate::store::NodeType::EpisodicSession))
.count();
let episodic_ratio = if store.nodes.is_empty() { 0.0 }
else { episodic_count as f32 / store.nodes.len() as f32 };
// Use the same planning logic as consolidation (skip O(n²) interference)
let plan = crate::neuro::consolidation_plan_quick(store);
GraphHealth {
nodes: snap.nodes,
edges: snap.edges,
communities: snap.communities,
alpha: snap.alpha,
gini: snap.gini,
avg_cc: snap.avg_cc,
sigma: snap.sigma,
episodic_ratio,
interference: 0,
plan_counts: plan.counts,
plan_rationale: plan.rationale,
computed_at: crate::store::format_datetime_space(crate::store::now_epoch()),
}
}
// --- Session detection ---
/// Find JSONL session files that are stale (not recently written) and not
/// held open by any process.
/// Find JSONL session files that haven't been written to recently.
/// Only checks metadata (stat), no file reads or subprocess calls.
/// The fuser check (is file open?) is deferred to the reconcile loop,
/// only for sessions that pass the mined-key filter.
/// Minimum file size for a session to be worth mining.
/// Daemon-spawned LLM calls are ~55KB/5 lines; real interactive
/// sessions are much larger. Skip anything too small to contain
/// meaningful conversation.
const MIN_SESSION_BYTES: u64 = 100_000;
fn find_stale_sessions() -> Vec<PathBuf> {
let projects = crate::config::get().projects_dir.clone();
if !projects.exists() {
return Vec::new();
}
let mut stale = Vec::new();
let now = SystemTime::now();
let Ok(dirs) = fs::read_dir(&projects) else { return stale };
for dir_entry in dirs.filter_map(|e| e.ok()) {
if !dir_entry.path().is_dir() { continue; }
let Ok(files) = fs::read_dir(dir_entry.path()) else { continue };
for f in files.filter_map(|e| e.ok()) {
let path = f.path();
if !path.extension().map(|x| x == "jsonl").unwrap_or(false) { continue; }
let Ok(meta) = path.metadata() else { continue };
// Skip tiny sessions (daemon-spawned LLM calls, aborted sessions)
if meta.len() < MIN_SESSION_BYTES { continue; }
let Ok(mtime) = meta.modified() else { continue };
let age = now.duration_since(mtime).unwrap_or_default();
if age.as_secs() >= SESSION_STALE_SECS {
stale.push(path);
}
}
}
stale
}
/// Check if any other process has a file open by scanning /proc/*/fd/.
/// This is what `fuser` does internally, without the subprocess overhead.
fn is_file_open(path: &Path) -> bool {
let Ok(target) = path.canonicalize() else { return false };
let Ok(procs) = fs::read_dir("/proc") else { return false };
let my_pid = std::process::id().to_string();
for proc_entry in procs.filter_map(|e| e.ok()) {
let name = proc_entry.file_name();
let name = name.to_string_lossy();
if !name.chars().all(|c| c.is_ascii_digit()) { continue; }
if *name == my_pid { continue; }
let fd_dir = proc_entry.path().join("fd");
let Ok(fds) = fs::read_dir(&fd_dir) else { continue };
for fd in fds.filter_map(|e| e.ok()) {
let Ok(link) = fs::read_link(fd.path()) else { continue };
if link == target { return true; }
}
}
false
}
/// Get process uptime as human-readable string by reading /proc/<pid>/stat.
fn proc_uptime(pid: u32) -> Option<String> {
// /proc/<pid>/stat field 22 (1-indexed) is start time in clock ticks
let stat = fs::read_to_string(format!("/proc/{}/stat", pid)).ok()?;
// Fields after comm (which may contain spaces/parens) — find closing paren
let after_comm = stat.get(stat.rfind(')')? + 2..)?;
let fields: Vec<&str> = after_comm.split_whitespace().collect();
// Field 22 in stat is index 19 after comm (fields[0] = state, field 22 = starttime = index 19)
let start_ticks: u64 = fields.get(19)?.parse().ok()?;
let ticks_per_sec = unsafe { libc::sysconf(libc::_SC_CLK_TCK) } as u64;
let boot_time_secs = {
let uptime = fs::read_to_string("/proc/uptime").ok()?;
let sys_uptime: f64 = uptime.split_whitespace().next()?.parse().ok()?;
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).ok()?.as_secs();
now - sys_uptime as u64
};
let start_secs = boot_time_secs + start_ticks / ticks_per_sec;
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).ok()?.as_secs();
let uptime = now.saturating_sub(start_secs);
Some(format_duration_human(uptime as u128 * 1000))
}
// --- Status writing ---
fn write_status(
choir: &Choir,
last_daily: Option<chrono::NaiveDate>,
graph_health: &Arc<Mutex<Option<GraphHealth>>>,
) {
let status = build_status(choir, last_daily, graph_health);
jobkit::daemon::status::write(&crate::config::get().data_dir, &status);
}
#[derive(Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct GraphHealth {
pub nodes: usize,
pub edges: usize,
pub communities: usize,
pub alpha: f32, // power-law exponent (target ≥2.5)
pub gini: f32, // degree inequality (target ≤0.4)
pub avg_cc: f32, // clustering coefficient (target ≥0.2)
pub sigma: f32, // small-world sigma
pub episodic_ratio: f32, // episodic/total nodes (target <0.4)
pub interference: usize, // interfering pairs (target <50)
// Consolidation work estimate from plan
#[serde(default)]
pub plan_counts: std::collections::HashMap<String, usize>,
pub plan_rationale: Vec<String>,
pub computed_at: String,
}
#[derive(serde::Serialize, serde::Deserialize)]
struct DaemonStatus {
pid: u32,
tasks: Vec<TaskInfo>,
#[serde(default)]
last_daily: Option<String>,
#[serde(default)]
graph_health: Option<GraphHealth>,
}
// --- The daemon ---
pub fn run_daemon() -> Result<(), String> {
let config = crate::config::get();
let mut daemon = jobkit::daemon::Daemon::new(jobkit::daemon::DaemonConfig {
data_dir: config.data_dir.clone(),
resource_slots: config.llm_concurrency,
resource_name: "llm".to_string(),
});
let choir = Arc::clone(&daemon.choir);
let llm = Arc::clone(&daemon.resource);
let _ = DAEMON_POOL.set(Arc::clone(&llm));
let task_log_dir = config.data_dir.join("logs");
let _ = fs::create_dir_all(&task_log_dir);
// Enable verbose logging if POC_MEMORY_VERBOSE is set
if std::env::var("POC_MEMORY_VERBOSE").is_ok() {
jobkit::daemon::event_log::set_level(jobkit::daemon::event_log::LogLevel::Verbose);
}
// Recover last_daily from previous status file
let last_daily: Arc<Mutex<Option<chrono::NaiveDate>>> = Arc::new(Mutex::new(
jobkit::daemon::status::load::<DaemonStatus>(&config.data_dir)
.and_then(|s| s.last_daily)
.and_then(|d| d.parse().ok())
));
let graph_health: Arc<Mutex<Option<GraphHealth>>> = Arc::new(Mutex::new(None));
// Persistent task queue — survives daemon restarts
let task_queue = TaskQueue::load(&config.data_dir);
// Nodes currently being processed by agents — prevents concurrent
// agents from working on overlapping graph regions.
let in_flight: InFlightNodes = Arc::new(Mutex::new(std::collections::HashSet::new()));
log_event("daemon", "started", &format!("pid {}", std::process::id()));
eprintln!("poc-memory daemon started (pid {})", std::process::id());
// Recover pending tasks from previous run
{
let recovered = task_queue.drain();
if !recovered.is_empty() {
log_event("task-queue", "recovering", &format!("{} tasks", recovered.len()));
for pt in &recovered {
let agent = pt.agent.clone();
let b = pt.batch;
let task_id = pt.id.clone();
let in_flight_clone = Arc::clone(&in_flight);
let queue_clone = Arc::clone(&task_queue);
choir.spawn(pt.id.clone())
.resource(&llm)
.log_dir(&task_log_dir)
.retries(1)
.init(move |ctx| {
let result = job_consolidation_agent(ctx, &agent, b, &in_flight_clone);
queue_clone.remove(&task_id);
result
});
// Drop schedules via IdleTask::Drop
}
}
}
// Write initial status
write_status(&choir, *last_daily.lock().unwrap(), &graph_health);
// Session watcher: reconcile-based extraction
// Each tick: scan filesystem for stale sessions, check store for what's
// already mined, check task registry for what's in-flight, spawn the diff.
// No persistent tracking state — the store is the source of truth.
let choir_sw = Arc::clone(&choir);
let _llm_sw = Arc::clone(&llm); // kept for future use
let last_daily_sw = Arc::clone(&last_daily);
let graph_health_sw = Arc::clone(&graph_health);
choir.spawn("session-watcher").init(move |ctx| {
ctx.set_progress("idle");
// Cache: path → (file_size, segment_count). Invalidated when size changes.
let mut seg_cache: HashMap<String, (u64, usize)> = HashMap::new();
// Retry backoff: filename → (next_retry_after, current_backoff).
// Exponential from 5min, cap 30min. Resets on daemon restart.
let mut retry_backoff: HashMap<String, (std::time::Instant, Duration)> = HashMap::new();
const BACKOFF_INITIAL: Duration = Duration::from_secs(300); // 5 min
const BACKOFF_MAX: Duration = Duration::from_secs(1800); // 30 min
loop {
if ctx.is_cancelled() {
return Err(TaskError::Fatal("cancelled".into()));
}
ctx.set_progress("scanning");
// Check for failed tasks and update backoff.
// Task names are "extract:{filename}.{segment}" — extract the
// filename (UUID.jsonl) by stripping the trailing .N segment suffix.
let task_statuses = choir_sw.task_statuses();
for t in &task_statuses {
if let Some(label) = t.name.strip_prefix("extract:") {
// label is "UUID.jsonl.N" — strip last ".N" to get filename
let filename = match label.rfind('.') {
Some(pos) if label[pos+1..].chars().all(|c| c.is_ascii_digit()) => {
&label[..pos]
}
_ => label,
};
match t.status {
TaskStatus::Failed => {
let entry = retry_backoff.entry(filename.to_string())
.or_insert((std::time::Instant::now(), BACKOFF_INITIAL));
entry.1 = (entry.1 * 2).min(BACKOFF_MAX);
entry.0 = std::time::Instant::now() + entry.1;
}
TaskStatus::Completed => {
retry_backoff.remove(filename);
}
_ => {}
}
}
}
// What's currently running/pending? (avoid spawning duplicates)
let active: HashSet<String> = task_statuses.iter()
.filter(|t| !t.status.is_finished())
.map(|t| t.name.clone())
.collect();
let stale = find_stale_sessions();
// Load mined transcript keys once for this tick
let mined = std::collections::HashSet::<String>::new(); // mining removed
// MAX_NEW_PER_TICK removed — mining handled by observation agent
// Load fact-mined keys too
let fact_keys: HashSet<String> = {
use crate::store::StoreView;
let view = crate::store::AnyView::load().ok();
view.map(|v| {
let mut keys = HashSet::new();
v.for_each_node(|key, _, _| {
if key.starts_with("_facts-") {
keys.insert(key.to_string());
}
});
keys
}).unwrap_or_default()
};
let _extract_queued = 0usize;
let mut _extract_remaining = 0usize;
let mut _fact_remaining = 0usize;
let mut already_mined = 0;
let mut still_open = 0;
let mut backed_off = 0;
let total_stale = stale.len();
// Sessions with old whole-file keys that need per-segment migration
let mut migrate_keys: Vec<(String, String, usize)> = Vec::new();
let mut needs_extract: Vec<(String, String, Option<usize>)> = Vec::new();
let mut needs_fact: Vec<(String, String)> = Vec::new();
let now = std::time::Instant::now();
for session in stale {
let filename = session.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| "unknown".into());
let path_str = session.to_string_lossy().to_string();
// Check retry backoff before doing any work
if let Some((next_retry, _)) = retry_backoff.get(&filename) {
if now >= *next_retry {
// Backoff expired, proceed
} else {
backed_off += 1;
continue;
}
}
if is_file_open(&session) {
still_open += 1;
continue;
}
// Get file size for cache invalidation
let file_size = fs::metadata(&session).map(|m| m.len()).unwrap_or(0);
// Get segment count, using cache with size-based invalidation
let seg_count = if let Some(&(cached_size, cached_count)) = seg_cache.get(&path_str) {
if cached_size == file_size {
cached_count
} else {
// File changed — re-parse
let messages = match super::enrich::extract_conversation(&path_str) {
Ok(m) => m,
Err(_) => continue,
};
let count = super::enrich::split_on_compaction(messages).len();
seg_cache.insert(path_str.clone(), (file_size, count));
count
}
} else {
let messages = match super::enrich::extract_conversation(&path_str) {
Ok(m) => m,
Err(_) => continue,
};
let count = super::enrich::split_on_compaction(messages).len();
seg_cache.insert(path_str.clone(), (file_size, count));
count
};
// No extractable messages — skip entirely
if seg_count == 0 {
already_mined += 1;
continue;
}
let fname_key = format!("_experience-{}", filename.trim_end_matches(".jsonl"));
let has_whole_file_key = mined.contains(&fname_key);
// Check per-segment keys, find unmined segments
let mut unmined_segs: Vec<usize> = Vec::new();
let mut has_any_seg_key = false;
for i in 0..seg_count {
let seg_key = format!("{}.{}", fname_key, i);
if mined.contains(&seg_key) {
has_any_seg_key = true;
} else {
unmined_segs.push(i);
}
}
// Migrate old whole-file key: if it exists but no per-segment keys,
// write per-segment keys for all current segments (they were mined
// under the old scheme)
if has_whole_file_key && !has_any_seg_key && seg_count > 0 {
migrate_keys.push((fname_key.clone(), path_str.clone(), seg_count));
// After migration, all current segments are covered
unmined_segs.clear();
}
if unmined_segs.is_empty() {
// All segments mined — check fact-mining
let fact_key = format!("_facts-{}", filename.trim_end_matches(".jsonl"));
if !fact_keys.contains(&fact_key) {
let task_name = format!("fact-mine:{}", filename);
if !active.contains(&task_name) {
needs_fact.push((filename, path_str));
}
} else {
already_mined += 1;
}
} else {
// Queue unmined segments
for i in unmined_segs {
let task_name = format!("extract:{}.{}", filename, i);
if active.contains(&task_name) { continue; }
needs_extract.push((
format!("{}.{}", filename, i),
path_str.clone(),
Some(i),
));
}
}
}
// Migrate old whole-file keys to per-segment keys
if !migrate_keys.is_empty() {
match crate::store::Store::load() {
Ok(mut store) => {
let mut ok = 0;
let mut fail = 0;
for (fname_key, path_str, seg_count) in &migrate_keys {
for i in 0..*seg_count {
let seg_key = format!("{}.{}", fname_key, i);
let content = format!("Migrated from whole-file key for {}", path_str);
let mut node = crate::store::new_node(&seg_key, &content);
node.provenance = "experience-mine:write".to_string();
match store.upsert_node(node) {
Ok(()) => ok += 1,
Err(e) => {
if fail == 0 {
eprintln!("migration upsert_node error: {}", e);
}
fail += 1;
}
}
}
}
if let Err(e) = store.save() {
eprintln!("migration save error: {}", e);
}
log_event("session-watcher", "migrated",
&format!("{} whole-file keys → per-segment ({} ok, {} fail)",
migrate_keys.len(), ok, fail));
}
Err(e) => {
eprintln!("migration store load error: {}", e);
}
}
}
// experience_mine and fact_mine killed — observation.agent handles transcript mining
_extract_remaining = needs_extract.len();
_fact_remaining = needs_fact.len();
let extract_pending = _extract_queued + _extract_remaining;
let fact_pending = _fact_remaining;
if extract_pending > 0 || fact_pending > 0 || still_open > 0 || backed_off > 0 {
log_event("session-watcher", "tick",
&format!("{} stale, {} mined, {} extract, {} fact, {} open, {} backoff",
total_stale, already_mined, extract_pending, fact_pending, still_open, backed_off));
let mut parts = Vec::new();
if extract_pending > 0 { parts.push(format!("{} extract", extract_pending)); }
if fact_pending > 0 { parts.push(format!("{} fact", fact_pending)); }
if still_open > 0 { parts.push(format!("{} open", still_open)); }
if backed_off > 0 { parts.push(format!("{} backoff", backed_off)); }
ctx.set_progress(parts.join(", "));
} else {
ctx.set_progress("idle");
}
write_status(&choir_sw, *last_daily_sw.lock().unwrap(), &graph_health_sw);
std::thread::sleep(SCHEDULER_INTERVAL);
}
});
// Scheduler: runs daily jobs based on filesystem state
let choir_sched = Arc::clone(&choir);
let llm_sched = Arc::clone(&llm);
let last_daily_sched = Arc::clone(&last_daily);
let graph_health_sched = Arc::clone(&graph_health);
let in_flight_sched = Arc::clone(&in_flight);
let log_dir_sched = task_log_dir.clone();
let queue_sched = Arc::clone(&task_queue);
const CONSOLIDATION_INTERVAL: Duration = Duration::from_secs(6 * 3600); // 6 hours
choir.spawn("scheduler").init(move |ctx| {
let mut last_health = std::time::Instant::now() - HEALTH_INTERVAL;
let mut last_consolidation = std::time::Instant::now() - CONSOLIDATION_INTERVAL; // run on first tick
ctx.set_progress("idle");
loop {
if ctx.is_cancelled() {
return Err(TaskError::Fatal("cancelled".into()));
}
let today = chrono::Local::now().date_naive();
// Health check: every hour — also updates graph health metrics
if last_health.elapsed() >= HEALTH_INTERVAL {
let gh = Arc::clone(&graph_health_sched);
choir_sched.spawn("health").init(move |ctx| {
job_daily_check(ctx, &gh)
});
last_health = std::time::Instant::now();
}
// Consolidation cycle: every 6 hours (wait for health check to cache metrics first)
let gh = graph_health_sched.lock().unwrap().clone();
let Some(h) = gh.as_ref() else { continue };
if last_consolidation.elapsed() >= CONSOLIDATION_INTERVAL {
log_event("scheduler", "consolidation-trigger",
&format!("{} (every 6h)", today));
// Use cached graph health plan (from consolidation_plan_quick).
let plan = crate::neuro::ConsolidationPlan {
counts: h.plan_counts.clone(),
run_health: true,
rationale: Vec::new(),
};
let runs = plan.to_agent_runs(5);
let summary: Vec<String> = h.plan_counts.iter()
.filter(|(_, c)| **c > 0)
.map(|(a, c)| format!("{}{}", &a[..1], c))
.collect();
log_event("scheduler", "consolidation-plan",
&format!("{} agents ({})", runs.len(), summary.join(" ")));
// Phase 1: Agent runs — all concurrent, in-flight exclusion
// prevents overlapping graph regions.
let mut all_tasks: Vec<jobkit::RunningTask> = Vec::new();
for (i, (agent_type, batch)) in runs.iter().enumerate() {
let agent = agent_type.to_string();
let b = *batch;
let in_flight_clone = Arc::clone(&in_flight_sched);
let task_name = format!("c-{}-{}:{}", agent, i, today);
let task_id = task_name.clone();
let queue_clone = Arc::clone(&queue_sched);
queue_sched.push(PendingTask {
id: task_id.clone(),
agent: agent.clone(),
batch: b,
target_key: None,
});
let task = choir_sched.spawn(task_name)
.resource(&llm_sched)
.log_dir(&log_dir_sched)
.retries(1)
.init(move |ctx| {
let result = job_consolidation_agent(ctx, &agent, b, &in_flight_clone);
queue_clone.remove(&task_id);
result
})
.run();
all_tasks.push(task);
}
// Orphans phase depends on all agent tasks completing
let prev_agent = all_tasks.last().cloned();
// Phase 2: Link orphans (CPU-only, no LLM)
let mut orphans = choir_sched.spawn(format!("c-orphans:{}", today))
.retries(1)
.init(job_link_orphans);
if let Some(ref dep) = prev_agent {
orphans.depend_on(dep);
}
let orphans = orphans.run();
// Phase 3: Cap degree
let mut cap = choir_sched.spawn(format!("c-cap:{}", today))
.retries(1)
.init(job_cap_degree);
cap.depend_on(&orphans);
let cap = cap.run();
// Phase 4: Generate digests
let mut digest = choir_sched.spawn(format!("c-digest:{}", today))
.resource(&llm_sched)
.retries(1)
.init(job_digest);
digest.depend_on(&cap);
let digest = digest.run();
// Phase 5: Apply digest links
let mut digest_links = choir_sched.spawn(format!("c-digest-links:{}", today))
.retries(1)
.init(job_digest_links);
digest_links.depend_on(&digest);
let digest_links = digest_links.run();
// Phase 7: Knowledge loop
let mut knowledge = choir_sched.spawn(format!("c-knowledge:{}", today))
.resource(&llm_sched)
.retries(1)
.init(job_knowledge_loop);
knowledge.depend_on(&digest_links);
*last_daily_sched.lock().unwrap() = Some(today);
last_consolidation = std::time::Instant::now();
ctx.set_progress(format!("daily pipeline triggered ({today})"));
}
// Prune finished tasks from registry
let pruned = choir_sched.gc_finished();
if pruned > 0 {
log::trace!("pruned {} finished tasks", pruned);
}
write_status(&choir_sched, *last_daily_sched.lock().unwrap(), &graph_health_sched);
std::thread::sleep(SCHEDULER_INTERVAL);
}
});
// Register RPC handlers
{
let last_daily_rpc = Arc::clone(&last_daily);
daemon.add_rpc_handler(move |cmd, _ctx| {
if cmd == "consolidate" {
*last_daily_rpc.lock().unwrap() = None;
log_event("rpc", "consolidate", "triggered via socket");
Some("{\"ok\":true,\"action\":\"consolidation scheduled\"}\n".into())
} else {
None
}
});
}
daemon.add_rpc_handler(|cmd, _ctx| {
if cmd != "reload-config" { return None; }
let changed = crate::config::reload();
let config = crate::config::get();
let api = config.api_base_url.as_deref().unwrap_or("(none)");
let model = config.api_model.as_deref().unwrap_or("(default)");
log_event("daemon", "config-reload",
&format!("changed={}, api={}, model={}", changed, api, model));
Some(format!("{{\"ok\":true,\"changed\":{},\"api_base_url\":\"{}\",\"api_model\":\"{}\"}}\n",
changed, api, model))
});
daemon.add_rpc_handler(|cmd, _ctx| {
if !cmd.starts_with("record-hits ") { return None; }
let keys: Vec<&str> = cmd.strip_prefix("record-hits ")
.unwrap_or("")
.split('\t')
.filter(|k| !k.is_empty())
.collect();
if keys.is_empty() {
return Some("{\"ok\":false,\"error\":\"no keys\"}\n".into());
}
let n = keys.len();
match crate::counters::record_search_hits(&keys) {
Ok(()) => Some(format!("{{\"ok\":true,\"recorded\":{}}}\n", n)),
Err(e) => Some(format!("{{\"ok\":false,\"error\":\"{}\"}}\n", e.replace('"', "'"))),
}
});
{
let choir_rpc = Arc::clone(&choir);
let llm_rpc = Arc::clone(&llm);
let log_dir_rpc = task_log_dir.clone();
let in_flight_rpc = Arc::clone(&in_flight);
daemon.add_rpc_handler(move |cmd, _ctx| {
if !cmd.starts_with("run-agent ") { return None; }
let parts: Vec<&str> = cmd.splitn(4, ' ').collect();
let agent_type = parts.get(1).unwrap_or(&"replay");
let count: usize = parts.get(2)
.and_then(|s| s.parse().ok())
.unwrap_or(1);
// Optional target key: "run-agent linker 1 target:KEY"
let target_key: Option<String> = parts.get(3)
.and_then(|s| s.strip_prefix("target:"))
.map(|s| s.to_string());
let batch_size = 5;
let today = chrono::Local::now().format("%Y-%m-%d");
let ts = chrono::Local::now().format("%H%M%S");
let mut prev = None;
let mut spawned = 0;
let mut remaining = count;
let is_rename = *agent_type == "rename";
// Targeted run: one task for a specific node
if let Some(ref key) = target_key {
let agent = agent_type.to_string();
let key = key.clone();
let task_name = format!("c-{}-{}:{}", agent, key.chars().take(30).collect::<String>(), today);
if jobkit::daemon::event_log::enabled(jobkit::daemon::event_log::LogLevel::Verbose) {
log_event("daemon", "spawn-targeted",
&format!("{} (pool: {}/{})", task_name, llm_rpc.available(), llm_rpc.capacity()));
}
choir_rpc.spawn(task_name)
.resource(&llm_rpc)
.log_dir(&log_dir_rpc)
.retries(1)
.init(move |ctx| {
job_targeted_agent(ctx, &agent, &key)
})
.run();
spawned = 1;
remaining = 0;
}
while remaining > 0 {
let batch = remaining.min(batch_size);
let agent = agent_type.to_string();
let in_flight_clone = Arc::clone(&in_flight_rpc);
let task_name = format!("c-{}-rpc{}:{}", agent, ts, today);
let mut builder = choir_rpc.spawn(task_name)
.resource(&llm_rpc)
.log_dir(&log_dir_rpc)
.retries(1)
.init(move |ctx| {
if is_rename {
job_rename_agent(ctx, batch)
} else {
job_consolidation_agent(ctx, &agent, batch, &in_flight_clone)
}
});
if let Some(ref dep) = prev {
builder.depend_on(dep);
}
prev = Some(builder.run());
remaining -= batch;
spawned += 1;
}
log_event("rpc", "run-agent", &format!("{} x{}", agent_type, count));
Some(format!("{{\"ok\":true,\"action\":\"queued {} {} run(s) ({} tasks)\"}}\n",
count, agent_type, spawned))
});
}
// Main thread: socket server + signal handling
let last_daily_status = Arc::clone(&last_daily);
let graph_health_status = Arc::clone(&graph_health);
daemon.run(move |ctx| {
build_status(&ctx.choir, *last_daily_status.lock().unwrap(), &graph_health_status)
});
log_event("daemon", "stopping", "");
eprintln!("Shutting down...");
log_event("daemon", "stopped", "");
std::process::exit(0)
}
pub fn send_rpc_pub(cmd: &str) -> Option<String> {
send_rpc(cmd)
}
fn send_rpc(cmd: &str) -> Option<String> {
jobkit::daemon::socket::send_rpc(&crate::config::get().data_dir, cmd)
}
pub fn rpc_consolidate() -> Result<(), String> {
match send_rpc("consolidate") {
Some(resp) => {
println!("{}", resp.trim());
Ok(())
}
None => Err("Daemon not running.".into()),
}
}
/// Record search hits for the given keys (fire-and-forget from memory-search).
pub fn rpc_record_hits(keys: &[&str]) -> Result<(), String> {
if keys.is_empty() { return Ok(()); }
let cmd = format!("record-hits {}", keys.join("\t"));
match send_rpc(&cmd) {
Some(_) => Ok(()),
None => Err("Daemon not running.".into()),
}
}
pub fn rpc_run_agent(agent: &str, count: usize) -> Result<(), String> {
let cmd = format!("run-agent {} {}", agent, count);
match send_rpc(&cmd) {
Some(resp) => {
println!("{}", resp.trim());
Ok(())
}
None => Err("Daemon not running.".into()),
}
}
fn read_status_socket() -> Option<DaemonStatus> {
let json = jobkit::daemon::socket::send_rpc(&crate::config::get().data_dir, "")?;
serde_json::from_str(&json).ok()
}
// status_socket_loop has been replaced by daemon.run() in jobkit-daemon.
fn build_status(
choir: &Choir,
last_daily: Option<chrono::NaiveDate>,
graph_health: &Arc<Mutex<Option<GraphHealth>>>,
) -> DaemonStatus {
DaemonStatus {
pid: std::process::id(),
tasks: choir.task_statuses(),
last_daily: last_daily.map(|d| d.to_string()),
graph_health: graph_health.lock().unwrap().clone(),
}
}
// --- Status display ---
fn format_duration_human(ms: u128) -> String {
if ms < 1_000 {
format!("{}ms", ms)
} else if ms < 60_000 {
format!("{:.1}s", ms as f64 / 1000.0)
} else if ms < 3_600_000 {
format!("{:.0}m{:.0}s", ms / 60_000, (ms % 60_000) / 1000)
} else {
format!("{:.0}h{:.0}m", ms / 3_600_000, (ms % 3_600_000) / 60_000)
}
}
fn task_group(name: &str) -> &str {
if name == "session-watcher" || name == "scheduler" { "core" }
else if name.starts_with("extract:") || name.starts_with("fact-mine:") { "extract" }
else if name.starts_with("c-") || name.starts_with("consolidate:")
|| name.starts_with("knowledge-loop:") || name.starts_with("digest:")
|| name.starts_with("decay:") { "daily" }
else if name == "health" { "health" }
else { "other" }
}
/// Compute elapsed time for a task, using absolute started_at if available.
fn task_elapsed(t: &TaskInfo) -> Duration {
if matches!(t.status, TaskStatus::Running) {
if let Some(started) = t.started_at {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64();
Duration::from_secs_f64((now - started).max(0.0))
} else {
t.elapsed
}
} else {
t.result.as_ref()
.map(|r| r.duration)
.unwrap_or(t.elapsed)
}
}
fn status_symbol(t: &TaskInfo) -> &'static str {
if t.cancelled { return "" }
match t.status {
TaskStatus::Running => "",
TaskStatus::Completed => "",
TaskStatus::Failed => "",
TaskStatus::Pending => "·",
}
}
/// Shorten a job name for display: "experience-mine /long/path/uuid.jsonl" → "experience-mine uuid…"
fn short_job_name(job: &str) -> String {
// Split "verb path" or just return as-is
if let Some((verb, path)) = job.split_once(' ') {
let file = path.rsplit('/').next().unwrap_or(path);
let file = file.strip_suffix(".jsonl").unwrap_or(file);
let short = if file.len() > 12 { &file[..12] } else { file };
format!("{} {}", verb, short)
} else {
job.to_string()
}
}
fn show_recent_completions(n: usize) {
let path = log_path();
let content = match fs::read_to_string(&path) {
Ok(c) => c,
Err(_) => return,
};
let recent: Vec<&str> = content.lines().rev()
.filter(|line| {
line.contains("\"event\":\"completed\"") || line.contains("\"event\":\"failed\"")
})
.take(n)
.collect();
if recent.is_empty() { return; }
eprintln!(" Recent:");
for line in recent.iter().rev() {
if let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) {
let ts = obj.get("ts").and_then(|v| v.as_str()).unwrap_or("?");
let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("?");
let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("?");
let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or("");
let time = if ts.len() >= 19 { &ts[11..19] } else { ts };
let sym = if event == "completed" { "" } else { "" };
let name = short_job_name(job);
eprintln!(" {} {} {:30} {}", sym, time, name, detail);
}
}
eprintln!();
}
pub fn show_status() -> Result<(), String> {
let status = match read_status_socket() {
Some(s) => s,
None => {
eprintln!("Daemon not running.");
return Ok(());
}
};
let uptime_str = proc_uptime(status.pid).unwrap_or_default();
if uptime_str.is_empty() {
eprintln!("poc-memory daemon pid={}", status.pid);
} else {
eprintln!("poc-memory daemon pid={} uptime {}", status.pid, uptime_str);
}
if status.tasks.is_empty() {
eprintln!("\n No tasks");
return Ok(());
}
// Count by status
let running = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)).count();
let pending = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count();
let completed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count();
let failed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count();
eprintln!(" tasks: {} running, {} pending, {} done, {} failed",
running, pending, completed, failed);
// Graph health
if let Some(ref gh) = status.graph_health {
eprintln!();
fn indicator(val: f32, target: f32, higher_is_better: bool) -> &'static str {
let ok = if higher_is_better { val >= target } else { val <= target };
if ok { "" } else { "" }
}
eprintln!(" Graph health ({})", gh.computed_at);
eprintln!(" {} nodes, {} edges, {} communities",
gh.nodes, gh.edges, gh.communities);
eprintln!(" {} α={:.2} (≥2.5) {} gini={:.3} (≤0.4) {} cc={:.3} (≥0.2)",
indicator(gh.alpha, 2.5, true), gh.alpha,
indicator(gh.gini, 0.4, false), gh.gini,
indicator(gh.avg_cc, 0.2, true), gh.avg_cc);
eprintln!(" {} episodic={:.0}% (<40%) σ={:.1}",
indicator(gh.episodic_ratio, 0.4, false), gh.episodic_ratio * 100.0,
gh.sigma);
let plan_total: usize = gh.plan_counts.values().sum::<usize>() + 1;
let plan_summary: Vec<String> = gh.plan_counts.iter()
.filter(|(_, c)| **c > 0)
.map(|(a, c)| format!("{}{}", &a[..1], c))
.collect();
eprintln!(" consolidation plan: {} agents ({} +health)",
plan_total, plan_summary.join(" "));
}
eprintln!();
// Group and display
let groups: &[(&str, &str)] = &[
("core", "Core"),
("daily", "Daily pipeline"),
("extract", "Session extraction"),
("health", "Health"),
("other", "Other"),
];
// In-flight tasks first (running + pending)
let in_flight: Vec<&TaskInfo> = status.tasks.iter()
.filter(|t| matches!(t.status, TaskStatus::Running | TaskStatus::Pending))
.collect();
if !in_flight.is_empty() {
eprintln!(" In flight:");
for t in &in_flight {
let sym = status_symbol(t);
let e = task_elapsed(t);
let elapsed = if !e.is_zero() {
format!(" {}", format_duration_human(e.as_millis()))
} else {
String::new()
};
let progress = t.progress.as_deref()
.filter(|p| *p != "idle")
.map(|p| format!(" {}", p))
.unwrap_or_default();
let name = short_job_name(&t.name);
eprintln!(" {} {:30}{}{}", sym, name, elapsed, progress);
if let Some(ref lp) = t.log_path {
// tail from log file
if matches!(t.status, TaskStatus::Running) {
eprintln!(" │ log: {}", lp);
}
}
}
eprintln!();
}
// Recent completions from log file
show_recent_completions(20);
// Detailed group view only if there are failures worth showing
for (group_id, group_label) in groups {
let tasks: Vec<&TaskInfo> = status.tasks.iter()
.filter(|t| task_group(&t.name) == *group_id)
.collect();
if tasks.is_empty() { continue; }
// For extract group, show summary instead of individual tasks
if *group_id == "extract" {
let n_pending = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count();
let n_running = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)).count();
let n_done = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count();
let n_failed = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count();
eprintln!(" {} ({} total)", group_label, tasks.len());
if n_running > 0 {
for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)) {
let e = task_elapsed(t);
let elapsed = if !e.is_zero() {
format!(" ({})", format_duration_human(e.as_millis()))
} else {
String::new()
};
let progress = t.progress.as_deref().map(|p| format!(" {}", p)).unwrap_or_default();
eprintln!(" {} {}{}{}", status_symbol(t), t.name, elapsed, progress);
if let Some(ref lp) = t.log_path {
eprintln!(" │ log: {}", lp);
}
}
}
let mut parts = Vec::new();
if n_done > 0 { parts.push(format!("{} done", n_done)); }
if n_running > 0 { parts.push(format!("{} running", n_running)); }
if n_pending > 0 { parts.push(format!("{} queued", n_pending)); }
if n_failed > 0 { parts.push(format!("{} FAILED", n_failed)); }
eprintln!(" {}", parts.join(", "));
// Show recent failures
for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).take(3) {
if let Some(ref err) = t.result.as_ref().and_then(|r| r.error.as_ref()) {
let short = if err.len() > 80 { &err[..80] } else { err };
eprintln!("{}: {}", t.name, short);
}
}
eprintln!();
continue;
}
eprintln!(" {}", group_label);
for t in &tasks {
let sym = status_symbol(t);
let e = task_elapsed(t);
let duration = if !e.is_zero() {
format_duration_human(e.as_millis())
} else {
String::new()
};
let retry = if t.max_retries > 0 && t.retry_count > 0 {
format!(" retry {}/{}", t.retry_count, t.max_retries)
} else {
String::new()
};
let detail = if matches!(t.status, TaskStatus::Failed) {
t.result.as_ref()
.and_then(|r| r.error.as_ref())
.map(|e| {
let short = if e.len() > 60 { &e[..60] } else { e };
format!(" err: {}", short)
})
.unwrap_or_default()
} else {
String::new()
};
if duration.is_empty() {
eprintln!(" {} {:30}{}{}", sym, t.name, retry, detail);
} else {
eprintln!(" {} {:30} {:>8}{}{}", sym, t.name, duration, retry, detail);
}
// Show output log tail for running tasks
if let Some(ref lp) = t.log_path {
// tail from log file
if matches!(t.status, TaskStatus::Running) {
eprintln!(" │ log: {}", lp);
}
}
}
eprintln!();
}
Ok(())
}
pub fn install_service() -> Result<(), String> {
let exe = std::env::current_exe()
.map_err(|e| format!("current_exe: {}", e))?;
let home = std::env::var("HOME").map_err(|e| format!("HOME: {}", e))?;
let unit_dir = PathBuf::from(&home).join(".config/systemd/user");
fs::create_dir_all(&unit_dir)
.map_err(|e| format!("create {}: {}", unit_dir.display(), e))?;
let unit = format!(
r#"[Unit]
Description=poc-memory daemon — background memory maintenance
After=default.target
[Service]
Type=simple
ExecStart={exe} agent daemon
Restart=on-failure
RestartSec=30
Environment=HOME={home}
Environment=PATH={home}/.cargo/bin:{home}/.local/bin:{home}/bin:/usr/local/bin:/usr/bin:/bin
[Install]
WantedBy=default.target
"#, exe = exe.display(), home = home);
let unit_path = unit_dir.join("poc-memory.service");
fs::write(&unit_path, &unit)
.map_err(|e| format!("write {}: {}", unit_path.display(), e))?;
eprintln!("Wrote {}", unit_path.display());
let status = std::process::Command::new("systemctl")
.args(["--user", "daemon-reload"])
.status()
.map_err(|e| format!("systemctl daemon-reload: {}", e))?;
if !status.success() {
return Err("systemctl daemon-reload failed".into());
}
let status = std::process::Command::new("systemctl")
.args(["--user", "enable", "--now", "poc-memory"])
.status()
.map_err(|e| format!("systemctl enable: {}", e))?;
if !status.success() {
return Err("systemctl enable --now failed".into());
}
eprintln!("Service enabled and started");
// Install poc-daemon service
install_notify_daemon(&unit_dir, &home)?;
// Install memory-search + poc-hook into Claude settings
install_hook()?;
Ok(())
}
/// Install the poc-daemon (notification/idle) systemd user service.
fn install_notify_daemon(unit_dir: &Path, home: &str) -> Result<(), String> {
let poc_daemon = PathBuf::from(home).join(".cargo/bin/poc-daemon");
if !poc_daemon.exists() {
eprintln!("Warning: poc-daemon not found at {} — skipping service install", poc_daemon.display());
eprintln!(" Build with: cargo install --path .");
return Ok(());
}
let unit = format!(
r#"[Unit]
Description=poc-daemon — notification routing and idle management
After=default.target
[Service]
Type=simple
ExecStart={exe} agent daemon
Restart=on-failure
RestartSec=10
Environment=HOME={home}
Environment=PATH={home}/.cargo/bin:{home}/.local/bin:{home}/bin:/usr/local/bin:/usr/bin:/bin
[Install]
WantedBy=default.target
"#, exe = poc_daemon.display(), home = home);
let unit_path = unit_dir.join("poc-daemon.service");
fs::write(&unit_path, &unit)
.map_err(|e| format!("write {}: {}", unit_path.display(), e))?;
eprintln!("Wrote {}", unit_path.display());
let status = std::process::Command::new("systemctl")
.args(["--user", "daemon-reload"])
.status()
.map_err(|e| format!("systemctl daemon-reload: {}", e))?;
if !status.success() {
return Err("systemctl daemon-reload failed".into());
}
let status = std::process::Command::new("systemctl")
.args(["--user", "enable", "--now", "poc-daemon"])
.status()
.map_err(|e| format!("systemctl enable: {}", e))?;
if !status.success() {
return Err("systemctl enable --now poc-daemon failed".into());
}
eprintln!("poc-daemon service enabled and started");
Ok(())
}
/// Install memory-search and poc-hook into Claude Code settings.json.
/// Public so `poc-memory init` can call it too.
///
/// Hook layout:
/// UserPromptSubmit: memory-search (10s), poc-hook (5s)
/// PostToolUse: poc-hook (5s)
/// Stop: poc-hook (5s)
pub fn install_hook() -> Result<(), String> {
let home = std::env::var("HOME").map_err(|e| format!("HOME: {}", e))?;
let exe = std::env::current_exe()
.map_err(|e| format!("current_exe: {}", e))?;
let settings_path = PathBuf::from(&home).join(".claude/settings.json");
let memory_search = exe.with_file_name("memory-search");
let poc_hook = exe.with_file_name("poc-hook");
let mut settings: serde_json::Value = if settings_path.exists() {
let content = fs::read_to_string(&settings_path)
.map_err(|e| format!("read settings: {}", e))?;
serde_json::from_str(&content)
.map_err(|e| format!("parse settings: {}", e))?
} else {
serde_json::json!({})
};
let obj = settings.as_object_mut().ok_or("settings not an object")?;
let hooks_obj = obj.entry("hooks")
.or_insert_with(|| serde_json::json!({}))
.as_object_mut().ok_or("hooks not an object")?;
let mut changed = false;
// Helper: ensure a hook binary is present in an event's hook list
let ensure_hook = |hooks_obj: &mut serde_json::Map<String, serde_json::Value>,
event: &str,
binary: &Path,
timeout: u32,
changed: &mut bool| {
if !binary.exists() {
eprintln!("Warning: {} not found — skipping", binary.display());
return;
}
let cmd = binary.to_string_lossy().to_string();
let name = binary.file_name().unwrap().to_string_lossy().to_string();
let event_array = hooks_obj.entry(event)
.or_insert_with(|| serde_json::json!([{"hooks": []}]))
.as_array_mut().unwrap();
if event_array.is_empty() {
event_array.push(serde_json::json!({"hooks": []}));
}
let inner = event_array[0]
.as_object_mut().unwrap()
.entry("hooks")
.or_insert_with(|| serde_json::json!([]))
.as_array_mut().unwrap();
// Remove legacy load-memory.sh
let before = inner.len();
inner.retain(|h| {
let c = h.get("command").and_then(|c| c.as_str()).unwrap_or("");
!c.contains("load-memory")
});
if inner.len() < before {
eprintln!("Removed load-memory.sh from {event}");
*changed = true;
}
let already = inner.iter().any(|h| {
h.get("command").and_then(|c| c.as_str())
.is_some_and(|c| c.contains(&name))
});
if !already {
inner.push(serde_json::json!({
"type": "command",
"command": cmd,
"timeout": timeout
}));
*changed = true;
eprintln!("Installed {name} in {event}");
}
};
// UserPromptSubmit: memory-search + poc-hook
ensure_hook(hooks_obj, "UserPromptSubmit", &memory_search, 10, &mut changed);
ensure_hook(hooks_obj, "UserPromptSubmit", &poc_hook, 5, &mut changed);
// PostToolUse + Stop: poc-hook only
ensure_hook(hooks_obj, "PostToolUse", &poc_hook, 5, &mut changed);
ensure_hook(hooks_obj, "Stop", &poc_hook, 5, &mut changed);
if changed {
let json = serde_json::to_string_pretty(&settings)
.map_err(|e| format!("serialize settings: {}", e))?;
fs::write(&settings_path, json)
.map_err(|e| format!("write settings: {}", e))?;
eprintln!("Updated {}", settings_path.display());
} else {
eprintln!("All hooks already installed in {}", settings_path.display());
}
Ok(())
}
/// Drill down into a task's log file. Finds the log path from:
/// 1. Running task status (daemon-status.json)
/// 2. daemon.log started events (for completed/failed tasks)
pub fn show_task_log(task_name: &str, lines: usize) -> Result<(), String> {
// Try running tasks first
let Some(status_json) = send_rpc_pub("") else {
return search_log_fallback(task_name, lines);
};
let Ok(status) = serde_json::from_str::<serde_json::Value>(&status_json) else {
return search_log_fallback(task_name, lines);
};
let Some(tasks) = status.get("tasks").and_then(|t| t.as_array()) else {
return search_log_fallback(task_name, lines);
};
for t in tasks {
let name = t.get("name").and_then(|n| n.as_str()).unwrap_or("");
if !name.contains(task_name) { continue; }
if let Some(lp) = t.get("log_path").and_then(|p| p.as_str()) {
return tail_file(lp, lines);
}
}
search_log_fallback(task_name, lines)
}
fn search_log_fallback(task_name: &str, lines: usize) -> Result<(), String> {
// Fall back to searching daemon.log for the most recent started event with a log path
let log = log_path();
if log.exists() {
let content = fs::read_to_string(&log).map_err(|e| format!("read log: {}", e))?;
for line in content.lines().rev() {
if let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) {
let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("");
let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("");
let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or("");
if job.contains(task_name) && event == "started" && detail.starts_with("log: ") {
let path = &detail[5..];
return tail_file(path, lines);
}
}
}
}
Err(format!("no log file found for task '{}'", task_name))
}
fn tail_file(path: &str, lines: usize) -> Result<(), String> {
let content = fs::read_to_string(path)
.map_err(|e| format!("read {}: {}", path, e))?;
let all_lines: Vec<&str> = content.lines().collect();
let skip = all_lines.len().saturating_sub(lines);
eprintln!("--- {} ({} lines) ---", path, all_lines.len());
for line in &all_lines[skip..] {
eprintln!("{}", line);
}
Ok(())
}
pub fn show_log(job_filter: Option<&str>, lines: usize) -> Result<(), String> {
let path = log_path();
if !path.exists() {
eprintln!("No daemon log found.");
return Ok(());
}
let content = fs::read_to_string(&path)
.map_err(|e| format!("read log: {}", e))?;
let filtered: Vec<&str> = content.lines().rev()
.filter(|line| {
if let Some(job) = job_filter {
line.contains(&format!("\"job\":\"{}\"", job))
} else {
true
}
})
.take(lines)
.collect();
if filtered.is_empty() {
eprintln!("No log entries{}", job_filter.map(|j| format!(" for job '{}'", j)).unwrap_or_default());
return Ok(());
}
// Pretty-print: parse JSON and format as "TIME JOB EVENT [DETAIL]"
for line in filtered.into_iter().rev() {
if let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) {
let ts = obj.get("ts").and_then(|v| v.as_str()).unwrap_or("?");
let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("?");
let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("?");
let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or("");
// Shorten timestamp to just time portion
let time = if ts.len() >= 19 { &ts[11..19] } else { ts };
if detail.is_empty() {
eprintln!(" {} {:20} {}", time, job, event);
} else {
// Truncate long details (file paths)
let short = if detail.len() > 60 {
let last = detail.rfind('/').map(|i| &detail[i+1..]).unwrap_or(detail);
if last.len() > 60 { &last[..60] } else { last }
} else {
detail
};
eprintln!(" {} {:20} {:12} {}", time, job, event, short);
}
} else {
eprintln!("{}", line);
}
}
Ok(())
}