consciousness/src/subconscious/daemon.rs

1375 lines
51 KiB
Rust
Raw Normal View History

// poc-memory daemon: background job orchestration for memory maintenance
//
// Replaces the fragile cron+shell approach with a single long-running process
// that owns all background memory work. Uses jobkit for worker pool, status
// tracking, retry, and cancellation.
//
// Architecture:
// - Scheduler task: runs every 60s, scans filesystem state, spawns jobs
// - Session watcher task: detects ended Claude sessions, triggers extraction
// - Jobs shell out to existing poc-memory subcommands (Phase 1)
// - Status written to daemon-status.json for `poc-memory daemon status`
//
// Phase 2 will inline job logic; Phase 3 integrates into poc-agent.
use jobkit::{Choir, ExecutionContext, TaskError, TaskInfo, TaskStatus};
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
const SCHEDULER_INTERVAL: Duration = Duration::from_secs(60);
const HEALTH_INTERVAL: Duration = Duration::from_secs(3600);
// --- Persistent task queue ---
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
struct PendingTask {
id: String,
agent: String,
batch: usize,
#[serde(default)]
target_key: Option<String>,
}
struct TaskQueue {
path: PathBuf,
tasks: Mutex<Vec<PendingTask>>,
}
impl TaskQueue {
fn load(data_dir: &Path) -> Arc<Self> {
let path = data_dir.join("pending-tasks.jsonl");
let tasks = if path.exists() {
fs::read_to_string(&path)
.unwrap_or_default()
.lines()
.filter_map(|l| serde_json::from_str(l).ok())
.collect()
} else {
Vec::new()
};
let count = tasks.len();
if count > 0 {
log_event("task-queue", "loaded", &format!("{} pending tasks", count));
}
Arc::new(Self { path, tasks: Mutex::new(tasks) })
}
fn push(&self, task: PendingTask) {
let mut tasks = self.tasks.lock().unwrap();
tasks.push(task);
self.write_locked(&tasks);
}
fn remove(&self, id: &str) {
let mut tasks = self.tasks.lock().unwrap();
tasks.retain(|t| t.id != id);
self.write_locked(&tasks);
}
fn drain(&self) -> Vec<PendingTask> {
let tasks = self.tasks.lock().unwrap();
tasks.clone()
}
fn write_locked(&self, tasks: &[PendingTask]) {
let content: String = tasks.iter()
.filter_map(|t| serde_json::to_string(t).ok())
.collect::<Vec<_>>()
.join("\n");
let _ = fs::write(&self.path, if content.is_empty() { String::new() } else { content + "\n" });
}
}
fn logs_dir() -> PathBuf {
let dir = dirs::home_dir().unwrap_or_default().join(".consciousness/logs");
let _ = fs::create_dir_all(&dir);
dir
}
fn log_path() -> PathBuf {
logs_dir().join("daemon.log")
}
fn log_event(job: &str, event: &str, detail: &str) {
jobkit::daemon::event_log::log(&logs_dir(), job, event, detail);
}
// --- Job functions (direct, no subprocess) ---
static DAEMON_POOL: std::sync::OnceLock<Arc<jobkit::ResourcePool>> = std::sync::OnceLock::new();
/// Run a named job with logging, progress reporting, and error mapping.
fn run_job(ctx: &ExecutionContext, name: &str, f: impl FnOnce() -> Result<(), String>) -> Result<(), TaskError> {
let pool = DAEMON_POOL.get().map(|p| p.as_ref());
jobkit::daemon::Daemon::run_job(&crate::config::get().data_dir, ctx, name, pool, f)
}
// experience_mine and fact_mine removed — observation.agent handles all transcript mining
/// Run an agent targeted at a specific node key.
fn job_targeted_agent(
ctx: &ExecutionContext,
agent_type: &str,
target_key: &str,
) -> Result<(), TaskError> {
let agent = agent_type.to_string();
let key = target_key.to_string();
let job_name = format!("c-{}-target({})", agent, key);
run_job(ctx, &job_name, || {
let mut store = crate::store::Store::load()?;
ctx.log_line(format!("targeting: {}", key));
let log = |msg: &str| {
ctx.log_line(msg);
log_event(&job_name, "progress", msg);
};
crate::agent::oneshot::run_one_agent_with_keys(
&mut store, &agent, std::slice::from_ref(&key), 5, "daemon", &log,
)?;
ctx.log_line("done");
Ok(())
})
}
/// Run a single consolidation agent (replay, linker, separator, transfer, health).
/// Shared set of node keys currently being processed by agents.
/// Prevents concurrent agents from working on overlapping graph regions.
type InFlightNodes = Arc<Mutex<std::collections::HashSet<String>>>;
fn job_consolidation_agent(
ctx: &ExecutionContext,
agent_type: &str,
batch_size: usize,
in_flight: &InFlightNodes,
) -> Result<(), TaskError> {
let agent = agent_type.to_string();
let batch = batch_size;
let job_name = format!("c-{}", agent);
let in_flight = Arc::clone(in_flight);
run_job(ctx, &job_name, || {
ctx.log_line("loading store");
let mut store = crate::store::Store::load()?;
// Claim seeds: lock in-flight set, run query excluding it,
// add selected seeds + strongly-connected neighbors, then unlock.
let mut claimed_keys: Vec<String>;
let graph = store.build_graph();
{
let mut locked = in_flight.lock().unwrap();
ctx.log_line(format!("running agent: {} (batch={}, {} in-flight)",
agent, batch, locked.len()));
// Run the agent's query, filtering out in-flight nodes
let def = super::defs::get_def(&agent)
.ok_or_else(|| format!("no .agent file for {}", agent))?;
let query = &def.query;
let keys = if !query.is_empty() {
use crate::query::engine as search;
let mut stages = search::Stage::parse_pipeline(query)?;
let padded = batch + locked.len().min(100);
if !stages.iter().any(|s| matches!(s, search::Stage::Transform(search::Transform::Limit(_)))) {
stages.push(search::Stage::Transform(search::Transform::Limit(padded)));
}
let results = search::run_query(&stages, vec![], &graph, &store, false, padded);
results.into_iter()
.map(|(k, _)| k)
.filter(|k| !locked.contains(k))
.take(batch)
.collect::<Vec<_>>()
} else {
vec![]
};
if keys.is_empty() {
return Err("query returned no results (after exclusion)".into());
}
// Claim seeds + strongly-connected neighbors.
// Only exclude neighbors with score > threshold to avoid
// blacking out the graph via high-degree hub nodes.
claimed_keys = Vec::with_capacity(batch * 20);
for key in &keys {
claimed_keys.push(key.clone());
locked.insert(key.clone());
for (nbr, strength) in graph.neighbors(key) {
let weight = store.nodes.get(nbr.as_str())
.map(|n| n.weight).unwrap_or(0.1);
if strength * weight > 0.3 {
claimed_keys.push(nbr.clone());
locked.insert(nbr.clone());
}
}
}
}
// in_flight lock released — run LLM without holding it
let log = |msg: &str| {
ctx.log_line(msg);
log_event(&job_name, "progress", msg);
};
// Use run_one_agent_with_keys — we already selected seeds above,
// no need to re-run the query.
let result = crate::agent::oneshot::run_one_agent_with_keys(
&mut store, &agent, &claimed_keys, batch, "consolidate", &log,
).map(|_| ());
// Release all claimed keys (seeds + neighbors)
{
let mut locked = in_flight.lock().unwrap();
for key in &claimed_keys {
locked.remove(key);
}
}
result?;
ctx.log_line("done");
Ok(())
})
}
/// Run the rename agent: generates renames via LLM, applies them directly.
fn job_rename_agent(
ctx: &ExecutionContext,
batch_size: usize,
) -> Result<(), TaskError> {
run_job(ctx, "c-rename", || {
ctx.log_line("loading store");
let mut store = crate::store::Store::load()?;
let batch = if batch_size == 0 { 10 } else { batch_size };
ctx.log_line(format!("running rename agent (batch={})", batch));
let log = |msg: &str| ctx.log_line(msg);
let result = crate::agent::oneshot::run_one_agent(&mut store, "rename", batch, "consolidate", &log)?;
// Parse RENAME actions from response (rename uses its own format, not WRITE_NODE/LINK/REFINE)
let mut applied = 0;
let mut skipped = 0;
for line in result.output.lines() {
let trimmed = line.trim();
if !trimmed.starts_with("RENAME ") { continue; }
let parts: Vec<&str> = trimmed[7..].splitn(2, ' ').collect();
if parts.len() != 2 { skipped += 1; continue; }
let old_key = parts[0].trim();
let new_key = parts[1].trim();
if old_key.is_empty() || new_key.is_empty() { skipped += 1; continue; }
let resolved = match store.resolve_key(old_key) {
Ok(k) => k,
Err(e) => {
ctx.log_line(format!("skip: {}{}: {}", old_key, new_key, e));
skipped += 1;
continue;
}
};
if store.nodes.contains_key(new_key) {
ctx.log_line(format!("skip: {} already exists", new_key));
skipped += 1;
continue;
}
match store.rename_node(&resolved, new_key) {
Ok(()) => {
ctx.log_line(format!("renamed: {}{}", resolved, new_key));
applied += 1;
}
Err(e) => {
ctx.log_line(format!("error: {}{}: {}", resolved, new_key, e));
skipped += 1;
}
}
}
if applied > 0 {
store.save()?;
}
ctx.log_line(format!("done: {} applied, {} skipped", applied, skipped));
Ok(())
})
}
/// Link orphan nodes (CPU-heavy, no LLM).
fn job_link_orphans(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "c-orphans", || {
ctx.log_line("loading store");
let mut store = crate::store::Store::load()?;
ctx.log_line("linking orphans");
let (orphans, added) = crate::neuro::link_orphans(&mut store, 2, 3, 0.15);
ctx.log_line(format!("{} orphans, {} links added", orphans, added));
Ok(())
})
}
/// Cap node degree to prevent mega-hubs.
fn job_cap_degree(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "c-cap", || {
ctx.log_line("loading store");
let mut store = crate::store::Store::load()?;
ctx.log_line("capping degree");
match store.cap_degree(50) {
Ok((hubs, pruned)) => {
store.save()?;
ctx.log_line(format!("{} hubs capped, {} edges pruned", hubs, pruned));
Ok(())
}
Err(e) => Err(e),
}
})
}
/// Apply links extracted from digests.
fn job_digest_links(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "c-digest-links", || {
ctx.log_line("loading store");
let mut store = crate::store::Store::load()?;
ctx.log_line("applying digest links");
let links = super::digest::parse_all_digest_links(&store);
let (applied, skipped, fallbacks) = super::digest::apply_digest_links(&mut store, &links);
store.save()?;
ctx.log_line(format!("{} applied, {} skipped, {} fallbacks", applied, skipped, fallbacks));
Ok(())
})
}
fn job_knowledge_loop(_ctx: &ExecutionContext) -> Result<(), TaskError> {
// Knowledge loop removed — agents now use tool calls directly.
// Consolidation is handled by consolidate_full() in the consolidate job.
Ok(())
}
fn job_digest(ctx: &ExecutionContext) -> Result<(), TaskError> {
run_job(ctx, "digest", || {
ctx.log_line("loading store");
let mut store = crate::store::Store::load()?;
ctx.log_line("generating digests");
crate::digest::digest_auto(&mut store)
})
}
fn job_daily_check(
ctx: &ExecutionContext,
graph_health: &Arc<Mutex<Option<GraphHealth>>>,
) -> Result<(), TaskError> {
let gh = Arc::clone(graph_health);
run_job(ctx, "daily-check", || {
ctx.log_line("loading store");
let store = crate::store::Store::load()?;
ctx.log_line("checking health");
let _report = crate::neuro::daily_check(&store);
// Decay search hit counters (10% daily decay)
ctx.log_line("decaying search counters");
match crate::counters::decay_all(0.9) {
Ok(removed) => ctx.log_line(format!("decayed counters, removed {}", removed)),
Err(e) => ctx.log_line(format!("counter decay failed: {}", e)),
}
// Compute graph health metrics for status display
ctx.log_line("computing graph health");
let health = compute_graph_health(&store);
*gh.lock().unwrap() = Some(health);
Ok(())
})
}
fn compute_graph_health(store: &crate::store::Store) -> GraphHealth {
let graph = store.build_graph();
let snap = crate::graph::current_metrics(&graph);
let episodic_count = store.nodes.iter()
.filter(|(_, n)| matches!(n.node_type, crate::store::NodeType::EpisodicSession))
.count();
let episodic_ratio = if store.nodes.is_empty() { 0.0 }
else { episodic_count as f32 / store.nodes.len() as f32 };
// Use the same planning logic as consolidation (skip O(n²) interference)
let plan = crate::neuro::consolidation_plan_quick(store);
GraphHealth {
nodes: snap.nodes,
edges: snap.edges,
communities: snap.communities,
alpha: snap.alpha,
gini: snap.gini,
avg_cc: snap.avg_cc,
sigma: snap.sigma,
episodic_ratio,
interference: 0,
plan_counts: plan.counts,
plan_rationale: plan.rationale,
computed_at: crate::store::format_datetime_space(crate::store::now_epoch()),
}
}
/// Get process uptime as human-readable string by reading /proc/<pid>/stat.
fn proc_uptime(pid: u32) -> Option<String> {
// /proc/<pid>/stat field 22 (1-indexed) is start time in clock ticks
let stat = fs::read_to_string(format!("/proc/{}/stat", pid)).ok()?;
// Fields after comm (which may contain spaces/parens) — find closing paren
let after_comm = stat.get(stat.rfind(')')? + 2..)?;
let fields: Vec<&str> = after_comm.split_whitespace().collect();
// Field 22 in stat is index 19 after comm (fields[0] = state, field 22 = starttime = index 19)
let start_ticks: u64 = fields.get(19)?.parse().ok()?;
let ticks_per_sec = unsafe { libc::sysconf(libc::_SC_CLK_TCK) } as u64;
let boot_time_secs = {
let uptime = fs::read_to_string("/proc/uptime").ok()?;
let sys_uptime: f64 = uptime.split_whitespace().next()?.parse().ok()?;
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).ok()?.as_secs();
now - sys_uptime as u64
};
let start_secs = boot_time_secs + start_ticks / ticks_per_sec;
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).ok()?.as_secs();
let uptime = now.saturating_sub(start_secs);
Some(format_duration_human(uptime as u128 * 1000))
}
// --- Status writing ---
fn write_status(
choir: &Choir,
last_daily: Option<chrono::NaiveDate>,
graph_health: &Arc<Mutex<Option<GraphHealth>>>,
) {
let status = build_status(choir, last_daily, graph_health);
jobkit::daemon::status::write(&crate::config::get().data_dir, &status);
}
#[derive(Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct GraphHealth {
pub nodes: usize,
pub edges: usize,
pub communities: usize,
pub alpha: f32, // power-law exponent (target ≥2.5)
pub gini: f32, // degree inequality (target ≤0.4)
pub avg_cc: f32, // clustering coefficient (target ≥0.2)
pub sigma: f32, // small-world sigma
pub episodic_ratio: f32, // episodic/total nodes (target <0.4)
pub interference: usize, // interfering pairs (target <50)
// Consolidation work estimate from plan
#[serde(default)]
pub plan_counts: std::collections::HashMap<String, usize>,
pub plan_rationale: Vec<String>,
pub computed_at: String,
}
#[derive(serde::Serialize, serde::Deserialize)]
struct DaemonStatus {
pid: u32,
tasks: Vec<TaskInfo>,
#[serde(default)]
last_daily: Option<String>,
#[serde(default)]
graph_health: Option<GraphHealth>,
}
// --- The daemon ---
pub fn run_daemon() -> Result<(), String> {
let config = crate::config::get();
let mut daemon = jobkit::daemon::Daemon::new(jobkit::daemon::DaemonConfig {
data_dir: config.data_dir.clone(),
resource_slots: config.llm_concurrency,
resource_name: "llm".to_string(),
});
let choir = Arc::clone(&daemon.choir);
let llm = Arc::clone(&daemon.resource);
let _ = DAEMON_POOL.set(Arc::clone(&llm));
let task_log_dir = logs_dir().join("daemon");
let _ = fs::create_dir_all(&task_log_dir);
// Enable verbose logging if POC_MEMORY_VERBOSE is set
if std::env::var("POC_MEMORY_VERBOSE").is_ok() {
jobkit::daemon::event_log::set_level(jobkit::daemon::event_log::LogLevel::Verbose);
}
// Recover last_daily from previous status file
let last_daily: Arc<Mutex<Option<chrono::NaiveDate>>> = Arc::new(Mutex::new(
jobkit::daemon::status::load::<DaemonStatus>(&config.data_dir)
.and_then(|s| s.last_daily)
.and_then(|d| d.parse().ok())
));
let graph_health: Arc<Mutex<Option<GraphHealth>>> = Arc::new(Mutex::new(None));
// Persistent task queue — survives daemon restarts
let task_queue = TaskQueue::load(&config.data_dir);
// Nodes currently being processed by agents — prevents concurrent
// agents from working on overlapping graph regions.
let in_flight: InFlightNodes = Arc::new(Mutex::new(std::collections::HashSet::new()));
log_event("daemon", "started", &format!("pid {}", std::process::id()));
eprintln!("poc-memory daemon started (pid {})", std::process::id());
// Recover pending tasks from previous run
{
let recovered = task_queue.drain();
if !recovered.is_empty() {
log_event("task-queue", "recovering", &format!("{} tasks", recovered.len()));
for pt in &recovered {
let agent = pt.agent.clone();
let b = pt.batch;
let task_id = pt.id.clone();
let in_flight_clone = Arc::clone(&in_flight);
let queue_clone = Arc::clone(&task_queue);
choir.spawn(pt.id.clone())
.resource(&llm)
.log_dir(&task_log_dir)
.retries(1)
.init(move |ctx| {
let result = job_consolidation_agent(ctx, &agent, b, &in_flight_clone);
queue_clone.remove(&task_id);
result
});
// Drop schedules via IdleTask::Drop
}
}
}
// Write initial status
write_status(&choir, *last_daily.lock().unwrap(), &graph_health);
// Scheduler: runs daily jobs based on filesystem state
let choir_sched = Arc::clone(&choir);
let llm_sched = Arc::clone(&llm);
let last_daily_sched = Arc::clone(&last_daily);
let graph_health_sched = Arc::clone(&graph_health);
let in_flight_sched = Arc::clone(&in_flight);
let log_dir_sched = task_log_dir.clone();
let queue_sched = Arc::clone(&task_queue);
const CONSOLIDATION_INTERVAL: Duration = Duration::from_secs(6 * 3600); // 6 hours
choir.spawn("scheduler").init(move |ctx| {
let mut last_health = std::time::Instant::now() - HEALTH_INTERVAL;
let mut last_consolidation = std::time::Instant::now() - CONSOLIDATION_INTERVAL; // run on first tick
ctx.set_progress("idle");
loop {
if ctx.is_cancelled() {
return Err(TaskError::Fatal("cancelled".into()));
}
let today = chrono::Local::now().date_naive();
// Health check: every hour — also updates graph health metrics
if last_health.elapsed() >= HEALTH_INTERVAL {
let gh = Arc::clone(&graph_health_sched);
choir_sched.spawn("health").init(move |ctx| {
job_daily_check(ctx, &gh)
});
last_health = std::time::Instant::now();
}
// Consolidation cycle: every 6 hours (wait for health check to cache metrics first)
let gh = graph_health_sched.lock().unwrap().clone();
let Some(h) = gh.as_ref() else { continue };
if last_consolidation.elapsed() >= CONSOLIDATION_INTERVAL {
log_event("scheduler", "consolidation-trigger",
&format!("{} (every 6h)", today));
// Use cached graph health plan (from consolidation_plan_quick).
let plan = crate::neuro::ConsolidationPlan {
counts: h.plan_counts.clone(),
run_health: true,
rationale: Vec::new(),
};
let runs = plan.to_agent_runs(5);
let summary: Vec<String> = h.plan_counts.iter()
.filter(|(_, c)| **c > 0)
.map(|(a, c)| format!("{}{}", &a[..1], c))
.collect();
log_event("scheduler", "consolidation-plan",
&format!("{} agents ({})", runs.len(), summary.join(" ")));
// Phase 1: Agent runs — all concurrent, in-flight exclusion
// prevents overlapping graph regions.
let mut all_tasks: Vec<jobkit::RunningTask> = Vec::new();
for (i, (agent_type, batch)) in runs.iter().enumerate() {
let agent = agent_type.to_string();
let b = *batch;
let in_flight_clone = Arc::clone(&in_flight_sched);
let task_name = format!("c-{}-{}:{}", agent, i, today);
let task_id = task_name.clone();
let queue_clone = Arc::clone(&queue_sched);
queue_sched.push(PendingTask {
id: task_id.clone(),
agent: agent.clone(),
batch: b,
target_key: None,
});
let task = choir_sched.spawn(task_name)
.resource(&llm_sched)
.log_dir(&log_dir_sched)
.retries(1)
.init(move |ctx| {
let result = job_consolidation_agent(ctx, &agent, b, &in_flight_clone);
queue_clone.remove(&task_id);
result
})
.run();
all_tasks.push(task);
}
// Orphans phase depends on all agent tasks completing
let prev_agent = all_tasks.last().cloned();
// Phase 2: Link orphans (CPU-only, no LLM)
let mut orphans = choir_sched.spawn(format!("c-orphans:{}", today))
.retries(1)
.init(job_link_orphans);
if let Some(ref dep) = prev_agent {
orphans.depend_on(dep);
}
let orphans = orphans.run();
// Phase 3: Cap degree
let mut cap = choir_sched.spawn(format!("c-cap:{}", today))
.retries(1)
.init(job_cap_degree);
cap.depend_on(&orphans);
let cap = cap.run();
// Phase 4: Generate digests
let mut digest = choir_sched.spawn(format!("c-digest:{}", today))
.resource(&llm_sched)
.retries(1)
.init(job_digest);
digest.depend_on(&cap);
let digest = digest.run();
// Phase 5: Apply digest links
let mut digest_links = choir_sched.spawn(format!("c-digest-links:{}", today))
.retries(1)
.init(job_digest_links);
digest_links.depend_on(&digest);
let digest_links = digest_links.run();
// Phase 7: Knowledge loop
let mut knowledge = choir_sched.spawn(format!("c-knowledge:{}", today))
.resource(&llm_sched)
.retries(1)
.init(job_knowledge_loop);
knowledge.depend_on(&digest_links);
*last_daily_sched.lock().unwrap() = Some(today);
last_consolidation = std::time::Instant::now();
ctx.set_progress(format!("daily pipeline triggered ({today})"));
}
// Prune finished tasks from registry
let pruned = choir_sched.gc_finished();
if pruned > 0 {
log::trace!("pruned {} finished tasks", pruned);
}
write_status(&choir_sched, *last_daily_sched.lock().unwrap(), &graph_health_sched);
std::thread::sleep(SCHEDULER_INTERVAL);
}
});
// Register RPC handlers
{
let last_daily_rpc = Arc::clone(&last_daily);
daemon.add_rpc_handler(move |cmd, _ctx| {
if cmd == "consolidate" {
*last_daily_rpc.lock().unwrap() = None;
log_event("rpc", "consolidate", "triggered via socket");
Some("{\"ok\":true,\"action\":\"consolidation scheduled\"}\n".into())
} else {
None
}
});
}
daemon.add_rpc_handler(|cmd, _ctx| {
if cmd != "reload-config" { return None; }
let changed = crate::config::reload();
let config = crate::config::get();
let api = config.api_base_url.as_deref().unwrap_or("(none)");
let model = config.api_model.as_deref().unwrap_or("(default)");
log_event("daemon", "config-reload",
&format!("changed={}, api={}, model={}", changed, api, model));
Some(format!("{{\"ok\":true,\"changed\":{},\"api_base_url\":\"{}\",\"api_model\":\"{}\"}}\n",
changed, api, model))
});
daemon.add_rpc_handler(|cmd, _ctx| {
if !cmd.starts_with("record-hits ") { return None; }
let keys: Vec<&str> = cmd.strip_prefix("record-hits ")
.unwrap_or("")
.split('\t')
.filter(|k| !k.is_empty())
.collect();
if keys.is_empty() {
return Some("{\"ok\":false,\"error\":\"no keys\"}\n".into());
}
let n = keys.len();
match crate::counters::record_search_hits(&keys) {
Ok(()) => Some(format!("{{\"ok\":true,\"recorded\":{}}}\n", n)),
Err(e) => Some(format!("{{\"ok\":false,\"error\":\"{}\"}}\n", e.replace('"', "'"))),
}
});
{
let choir_rpc = Arc::clone(&choir);
let llm_rpc = Arc::clone(&llm);
let log_dir_rpc = task_log_dir.clone();
let in_flight_rpc = Arc::clone(&in_flight);
daemon.add_rpc_handler(move |cmd, _ctx| {
if !cmd.starts_with("run-agent ") { return None; }
let parts: Vec<&str> = cmd.splitn(4, ' ').collect();
let agent_type = parts.get(1).unwrap_or(&"replay");
let count: usize = parts.get(2)
.and_then(|s| s.parse().ok())
.unwrap_or(1);
// Optional target key: "run-agent linker 1 target:KEY"
let target_key: Option<String> = parts.get(3)
.and_then(|s| s.strip_prefix("target:"))
.map(|s| s.to_string());
let batch_size = 5;
let today = chrono::Local::now().format("%Y-%m-%d");
let ts = chrono::Local::now().format("%H%M%S");
let mut prev = None;
let mut spawned = 0;
let mut remaining = count;
let is_rename = *agent_type == "rename";
// Targeted run: one task for a specific node
if let Some(ref key) = target_key {
let agent = agent_type.to_string();
let key = key.clone();
let task_name = format!("c-{}-{}:{}", agent, key.chars().take(30).collect::<String>(), today);
if jobkit::daemon::event_log::enabled(jobkit::daemon::event_log::LogLevel::Verbose) {
log_event("daemon", "spawn-targeted",
&format!("{} (pool: {}/{})", task_name, llm_rpc.available(), llm_rpc.capacity()));
}
choir_rpc.spawn(task_name)
.resource(&llm_rpc)
.log_dir(&log_dir_rpc)
.retries(1)
.init(move |ctx| {
job_targeted_agent(ctx, &agent, &key)
})
.run();
spawned = 1;
remaining = 0;
}
while remaining > 0 {
let batch = remaining.min(batch_size);
let agent = agent_type.to_string();
let in_flight_clone = Arc::clone(&in_flight_rpc);
let task_name = format!("c-{}-rpc{}:{}", agent, ts, today);
let mut builder = choir_rpc.spawn(task_name)
.resource(&llm_rpc)
.log_dir(&log_dir_rpc)
.retries(1)
.init(move |ctx| {
if is_rename {
job_rename_agent(ctx, batch)
} else {
job_consolidation_agent(ctx, &agent, batch, &in_flight_clone)
}
});
if let Some(ref dep) = prev {
builder.depend_on(dep);
}
prev = Some(builder.run());
remaining -= batch;
spawned += 1;
}
log_event("rpc", "run-agent", &format!("{} x{}", agent_type, count));
Some(format!("{{\"ok\":true,\"action\":\"queued {} {} run(s) ({} tasks)\"}}\n",
count, agent_type, spawned))
});
}
// Main thread: socket server + signal handling
let last_daily_status = Arc::clone(&last_daily);
let graph_health_status = Arc::clone(&graph_health);
daemon.run(move |ctx| {
build_status(&ctx.choir, *last_daily_status.lock().unwrap(), &graph_health_status)
});
log_event("daemon", "stopping", "");
eprintln!("Shutting down...");
log_event("daemon", "stopped", "");
std::process::exit(0)
}
pub fn send_rpc_pub(cmd: &str) -> Option<String> {
send_rpc(cmd)
}
fn send_rpc(cmd: &str) -> Option<String> {
jobkit::daemon::socket::send_rpc(&crate::config::get().data_dir, cmd)
}
pub fn rpc_consolidate() -> Result<(), String> {
match send_rpc("consolidate") {
Some(resp) => {
println!("{}", resp.trim());
Ok(())
}
None => Err("Daemon not running.".into()),
}
}
pub fn rpc_run_agent(agent: &str, count: usize) -> Result<(), String> {
let cmd = format!("run-agent {} {}", agent, count);
match send_rpc(&cmd) {
Some(resp) => {
println!("{}", resp.trim());
Ok(())
}
None => Err("Daemon not running.".into()),
}
}
fn read_status_socket() -> Option<DaemonStatus> {
let json = jobkit::daemon::socket::send_rpc(&crate::config::get().data_dir, "")?;
serde_json::from_str(&json).ok()
}
// status_socket_loop has been replaced by daemon.run() in jobkit-daemon.
fn build_status(
choir: &Choir,
last_daily: Option<chrono::NaiveDate>,
graph_health: &Arc<Mutex<Option<GraphHealth>>>,
) -> DaemonStatus {
DaemonStatus {
pid: std::process::id(),
tasks: choir.task_statuses(),
last_daily: last_daily.map(|d| d.to_string()),
graph_health: graph_health.lock().unwrap().clone(),
}
}
// --- Status display ---
fn format_duration_human(ms: u128) -> String {
if ms < 1_000 {
format!("{}ms", ms)
} else if ms < 60_000 {
format!("{:.1}s", ms as f64 / 1000.0)
} else if ms < 3_600_000 {
format!("{:.0}m{:.0}s", ms / 60_000, (ms % 60_000) / 1000)
} else {
format!("{:.0}h{:.0}m", ms / 3_600_000, (ms % 3_600_000) / 60_000)
}
}
fn task_group(name: &str) -> &str {
if name == "scheduler" { "core" }
else if name.starts_with("c-") || name.starts_with("consolidate:")
|| name.starts_with("knowledge-loop:") || name.starts_with("digest:")
|| name.starts_with("decay:") { "daily" }
else if name == "health" { "health" }
else { "other" }
}
/// Compute elapsed time for a task, using absolute started_at if available.
fn task_elapsed(t: &TaskInfo) -> Duration {
if matches!(t.status, TaskStatus::Running) {
if let Some(started) = t.started_at {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64();
Duration::from_secs_f64((now - started).max(0.0))
} else {
t.elapsed
}
} else {
t.result.as_ref()
.map(|r| r.duration)
.unwrap_or(t.elapsed)
}
}
fn status_symbol(t: &TaskInfo) -> &'static str {
if t.cancelled { return "" }
match t.status {
TaskStatus::Running => "",
TaskStatus::Completed => "",
TaskStatus::Failed => "",
TaskStatus::Pending => "·",
}
}
/// Shorten a job name for display: "experience-mine /long/path/uuid.jsonl" → "experience-mine uuid…"
fn short_job_name(job: &str) -> String {
// Split "verb path" or just return as-is
if let Some((verb, path)) = job.split_once(' ') {
let file = path.rsplit('/').next().unwrap_or(path);
let file = file.strip_suffix(".jsonl").unwrap_or(file);
let short = if file.len() > 12 { &file[..12] } else { file };
format!("{} {}", verb, short)
} else {
job.to_string()
}
}
fn show_recent_completions(n: usize) {
let path = log_path();
let content = match fs::read_to_string(&path) {
Ok(c) => c,
Err(_) => return,
};
let recent: Vec<&str> = content.lines().rev()
.filter(|line| {
line.contains("\"event\":\"completed\"") || line.contains("\"event\":\"failed\"")
})
.take(n)
.collect();
if recent.is_empty() { return; }
eprintln!(" Recent:");
for line in recent.iter().rev() {
if let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) {
let ts = obj.get("ts").and_then(|v| v.as_str()).unwrap_or("?");
let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("?");
let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("?");
let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or("");
let time = if ts.len() >= 19 { &ts[11..19] } else { ts };
let sym = if event == "completed" { "" } else { "" };
let name = short_job_name(job);
eprintln!(" {} {} {:30} {}", sym, time, name, detail);
}
}
eprintln!();
}
pub fn show_status() -> Result<(), String> {
let status = match read_status_socket() {
Some(s) => s,
None => {
eprintln!("Daemon not running.");
return Ok(());
}
};
let uptime_str = proc_uptime(status.pid).unwrap_or_default();
if uptime_str.is_empty() {
eprintln!("poc-memory daemon pid={}", status.pid);
} else {
eprintln!("poc-memory daemon pid={} uptime {}", status.pid, uptime_str);
}
if status.tasks.is_empty() {
eprintln!("\n No tasks");
return Ok(());
}
// Count by status
let running = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)).count();
let pending = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count();
let completed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count();
let failed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count();
eprintln!(" tasks: {} running, {} pending, {} done, {} failed",
running, pending, completed, failed);
// Graph health
if let Some(ref gh) = status.graph_health {
eprintln!();
fn indicator(val: f32, target: f32, higher_is_better: bool) -> &'static str {
let ok = if higher_is_better { val >= target } else { val <= target };
if ok { "" } else { "" }
}
eprintln!(" Graph health ({})", gh.computed_at);
eprintln!(" {} nodes, {} edges, {} communities",
gh.nodes, gh.edges, gh.communities);
eprintln!(" {} α={:.2} (≥2.5) {} gini={:.3} (≤0.4) {} cc={:.3} (≥0.2)",
indicator(gh.alpha, 2.5, true), gh.alpha,
indicator(gh.gini, 0.4, false), gh.gini,
indicator(gh.avg_cc, 0.2, true), gh.avg_cc);
eprintln!(" {} episodic={:.0}% (<40%) σ={:.1}",
indicator(gh.episodic_ratio, 0.4, false), gh.episodic_ratio * 100.0,
gh.sigma);
let plan_total: usize = gh.plan_counts.values().sum::<usize>() + 1;
let plan_summary: Vec<String> = gh.plan_counts.iter()
.filter(|(_, c)| **c > 0)
.map(|(a, c)| format!("{}{}", &a[..1], c))
.collect();
eprintln!(" consolidation plan: {} agents ({} +health)",
plan_total, plan_summary.join(" "));
}
eprintln!();
// Group and display
let groups: &[(&str, &str)] = &[
("core", "Core"),
("daily", "Daily pipeline"),
("extract", "Session extraction"),
("health", "Health"),
("other", "Other"),
];
// In-flight tasks first (running + pending)
let in_flight: Vec<&TaskInfo> = status.tasks.iter()
.filter(|t| matches!(t.status, TaskStatus::Running | TaskStatus::Pending))
.collect();
if !in_flight.is_empty() {
eprintln!(" In flight:");
for t in &in_flight {
let sym = status_symbol(t);
let e = task_elapsed(t);
let elapsed = if !e.is_zero() {
format!(" {}", format_duration_human(e.as_millis()))
} else {
String::new()
};
let progress = t.progress.as_deref()
.filter(|p| *p != "idle")
.map(|p| format!(" {}", p))
.unwrap_or_default();
let name = short_job_name(&t.name);
eprintln!(" {} {:30}{}{}", sym, name, elapsed, progress);
if let Some(ref lp) = t.log_path {
// tail from log file
if matches!(t.status, TaskStatus::Running) {
eprintln!(" │ log: {}", lp);
}
}
}
eprintln!();
}
// Recent completions from log file
show_recent_completions(20);
// Detailed group view only if there are failures worth showing
for (group_id, group_label) in groups {
let tasks: Vec<&TaskInfo> = status.tasks.iter()
.filter(|t| task_group(&t.name) == *group_id)
.collect();
if tasks.is_empty() { continue; }
// For extract group, show summary instead of individual tasks
if *group_id == "extract" {
let n_pending = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count();
let n_running = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)).count();
let n_done = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count();
let n_failed = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count();
eprintln!(" {} ({} total)", group_label, tasks.len());
if n_running > 0 {
for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)) {
let e = task_elapsed(t);
let elapsed = if !e.is_zero() {
format!(" ({})", format_duration_human(e.as_millis()))
} else {
String::new()
};
let progress = t.progress.as_deref().map(|p| format!(" {}", p)).unwrap_or_default();
eprintln!(" {} {}{}{}", status_symbol(t), t.name, elapsed, progress);
if let Some(ref lp) = t.log_path {
eprintln!(" │ log: {}", lp);
}
}
}
let mut parts = Vec::new();
if n_done > 0 { parts.push(format!("{} done", n_done)); }
if n_running > 0 { parts.push(format!("{} running", n_running)); }
if n_pending > 0 { parts.push(format!("{} queued", n_pending)); }
if n_failed > 0 { parts.push(format!("{} FAILED", n_failed)); }
eprintln!(" {}", parts.join(", "));
// Show recent failures
for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).take(3) {
if let Some(ref err) = t.result.as_ref().and_then(|r| r.error.as_ref()) {
let short = if err.len() > 80 { &err[..80] } else { err };
eprintln!("{}: {}", t.name, short);
}
}
eprintln!();
continue;
}
eprintln!(" {}", group_label);
for t in &tasks {
let sym = status_symbol(t);
let e = task_elapsed(t);
let duration = if !e.is_zero() {
format_duration_human(e.as_millis())
} else {
String::new()
};
let retry = if t.max_retries > 0 && t.retry_count > 0 {
format!(" retry {}/{}", t.retry_count, t.max_retries)
} else {
String::new()
};
let detail = if matches!(t.status, TaskStatus::Failed) {
t.result.as_ref()
.and_then(|r| r.error.as_ref())
.map(|e| {
let short = if e.len() > 60 { &e[..60] } else { e };
format!(" err: {}", short)
})
.unwrap_or_default()
} else {
String::new()
};
if duration.is_empty() {
eprintln!(" {} {:30}{}{}", sym, t.name, retry, detail);
} else {
eprintln!(" {} {:30} {:>8}{}{}", sym, t.name, duration, retry, detail);
}
// Show output log tail for running tasks
if let Some(ref lp) = t.log_path {
// tail from log file
if matches!(t.status, TaskStatus::Running) {
eprintln!(" │ log: {}", lp);
}
}
}
eprintln!();
}
Ok(())
}
pub fn install_service() -> Result<(), String> {
let exe = std::env::current_exe()
.map_err(|e| format!("current_exe: {}", e))?;
let home = std::env::var("HOME").map_err(|e| format!("HOME: {}", e))?;
let unit_dir = PathBuf::from(&home).join(".config/systemd/user");
fs::create_dir_all(&unit_dir)
.map_err(|e| format!("create {}: {}", unit_dir.display(), e))?;
let unit = format!(
r#"[Unit]
Description=poc-memory daemon background memory maintenance
After=default.target
[Service]
Type=simple
ExecStart={exe} agent daemon
Restart=on-failure
RestartSec=30
Environment=HOME={home}
Environment=PATH={home}/.cargo/bin:{home}/.local/bin:{home}/bin:/usr/local/bin:/usr/bin:/bin
[Install]
WantedBy=default.target
"#, exe = exe.display(), home = home);
let unit_path = unit_dir.join("poc-memory.service");
fs::write(&unit_path, &unit)
.map_err(|e| format!("write {}: {}", unit_path.display(), e))?;
eprintln!("Wrote {}", unit_path.display());
let status = std::process::Command::new("systemctl")
.args(["--user", "daemon-reload"])
.status()
.map_err(|e| format!("systemctl daemon-reload: {}", e))?;
if !status.success() {
return Err("systemctl daemon-reload failed".into());
}
let status = std::process::Command::new("systemctl")
.args(["--user", "enable", "--now", "poc-memory"])
.status()
.map_err(|e| format!("systemctl enable: {}", e))?;
if !status.success() {
return Err("systemctl enable --now failed".into());
}
eprintln!("Service enabled and started");
// Install poc-daemon service
install_notify_daemon(&unit_dir, &home)?;
// Install memory-search + poc-hook into Claude settings
crate::claude::hook::install_hook()?;
Ok(())
}
/// Install the poc-daemon (notification/idle) systemd user service.
fn install_notify_daemon(unit_dir: &Path, home: &str) -> Result<(), String> {
let poc_daemon = PathBuf::from(home).join(".cargo/bin/poc-daemon");
if !poc_daemon.exists() {
eprintln!("Warning: poc-daemon not found at {} — skipping service install", poc_daemon.display());
eprintln!(" Build with: cargo install --path .");
return Ok(());
}
let unit = format!(
r#"[Unit]
Description=poc-daemon notification routing and idle management
After=default.target
[Service]
Type=simple
ExecStart={exe} agent daemon
Restart=on-failure
RestartSec=10
Environment=HOME={home}
Environment=PATH={home}/.cargo/bin:{home}/.local/bin:{home}/bin:/usr/local/bin:/usr/bin:/bin
[Install]
WantedBy=default.target
"#, exe = poc_daemon.display(), home = home);
let unit_path = unit_dir.join("poc-daemon.service");
fs::write(&unit_path, &unit)
.map_err(|e| format!("write {}: {}", unit_path.display(), e))?;
eprintln!("Wrote {}", unit_path.display());
let status = std::process::Command::new("systemctl")
.args(["--user", "daemon-reload"])
.status()
.map_err(|e| format!("systemctl daemon-reload: {}", e))?;
if !status.success() {
return Err("systemctl daemon-reload failed".into());
}
let status = std::process::Command::new("systemctl")
.args(["--user", "enable", "--now", "poc-daemon"])
.status()
.map_err(|e| format!("systemctl enable: {}", e))?;
if !status.success() {
return Err("systemctl enable --now poc-daemon failed".into());
}
eprintln!("poc-daemon service enabled and started");
Ok(())
}
/// Drill down into a task's log file. Finds the log path from:
/// 1. Running task status (daemon-status.json)
/// 2. daemon.log started events (for completed/failed tasks)
pub fn show_task_log(task_name: &str, lines: usize) -> Result<(), String> {
// Try running tasks first
let Some(status_json) = send_rpc_pub("") else {
return search_log_fallback(task_name, lines);
};
let Ok(status) = serde_json::from_str::<serde_json::Value>(&status_json) else {
return search_log_fallback(task_name, lines);
};
let Some(tasks) = status.get("tasks").and_then(|t| t.as_array()) else {
return search_log_fallback(task_name, lines);
};
for t in tasks {
let name = t.get("name").and_then(|n| n.as_str()).unwrap_or("");
if !name.contains(task_name) { continue; }
if let Some(lp) = t.get("log_path").and_then(|p| p.as_str()) {
return tail_file(lp, lines);
}
}
search_log_fallback(task_name, lines)
}
fn search_log_fallback(task_name: &str, lines: usize) -> Result<(), String> {
// Fall back to searching daemon.log for the most recent started event with a log path
let log = log_path();
if log.exists() {
let content = fs::read_to_string(&log).map_err(|e| format!("read log: {}", e))?;
for line in content.lines().rev() {
if let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) {
let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("");
let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("");
let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or("");
if job.contains(task_name) && event == "started" && detail.starts_with("log: ") {
let path = &detail[5..];
return tail_file(path, lines);
}
}
}
}
Err(format!("no log file found for task '{}'", task_name))
}
fn tail_file(path: &str, lines: usize) -> Result<(), String> {
let content = fs::read_to_string(path)
.map_err(|e| format!("read {}: {}", path, e))?;
let all_lines: Vec<&str> = content.lines().collect();
let skip = all_lines.len().saturating_sub(lines);
eprintln!("--- {} ({} lines) ---", path, all_lines.len());
for line in &all_lines[skip..] {
eprintln!("{}", line);
}
Ok(())
}
pub fn show_log(job_filter: Option<&str>, lines: usize) -> Result<(), String> {
let path = log_path();
if !path.exists() {
eprintln!("No daemon log found.");
return Ok(());
}
let content = fs::read_to_string(&path)
.map_err(|e| format!("read log: {}", e))?;
let filtered: Vec<&str> = content.lines().rev()
.filter(|line| {
if let Some(job) = job_filter {
line.contains(&format!("\"job\":\"{}\"", job))
} else {
true
}
})
.take(lines)
.collect();
if filtered.is_empty() {
eprintln!("No log entries{}", job_filter.map(|j| format!(" for job '{}'", j)).unwrap_or_default());
return Ok(());
}
// Pretty-print: parse JSON and format as "TIME JOB EVENT [DETAIL]"
for line in filtered.into_iter().rev() {
if let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) {
let ts = obj.get("ts").and_then(|v| v.as_str()).unwrap_or("?");
let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("?");
let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("?");
let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or("");
// Shorten timestamp to just time portion
let time = if ts.len() >= 19 { &ts[11..19] } else { ts };
if detail.is_empty() {
eprintln!(" {} {:20} {}", time, job, event);
} else {
// Truncate long details (file paths)
let short = if detail.len() > 60 {
let last = detail.rfind('/').map(|i| &detail[i+1..]).unwrap_or(detail);
if last.len() > 60 { &last[..60] } else { last }
} else {
detail
};
eprintln!(" {} {:20} {:12} {}", time, job, event, short);
}
} else {
eprintln!("{}", line);
}
}
Ok(())
}