diff --git a/Cargo.lock b/Cargo.lock index 882f939..60a55a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -538,6 +538,7 @@ dependencies = [ "http-body-util", "hyper", "hyper-util", + "jobkit", "json5", "libc", "log", @@ -1568,6 +1569,20 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "jobkit" +version = "0.3.0" +source = "git+https://evilpiepirate.org/git/jobkit.git#4aacaac22c5f59a7fbc6ce3a65708fc370e55754" +dependencies = [ + "chrono", + "libc", + "log", + "profiling", + "serde", + "serde_json", + "tokio", +] + [[package]] name = "jobserver" version = "0.1.34" @@ -2194,6 +2209,25 @@ dependencies = [ "yansi", ] +[[package]] +name = "profiling" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3eb8486b569e12e2c32ad3e204dbaba5e4b5b216e9367044f25f1dba42341773" +dependencies = [ + "profiling-procmacros", +] + +[[package]] +name = "profiling-procmacros" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52717f9a02b6965224f95ca2a81e2e0c5c43baacd28ca057577988930b6c3d5b" +dependencies = [ + "quote", + "syn 2.0.117", +] + [[package]] name = "ptr_meta" version = "0.1.4" diff --git a/Cargo.toml b/Cargo.toml index 096c390..20df8d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ redb = "4" rkyv = { version = "0.7", features = ["validation", "std"] } rayon = "1" +jobkit = { git = "https://evilpiepirate.org/git/jobkit.git", features = ["daemon"] } tokio = { version = "1", features = ["full"] } tokio-util = { version = "0.7", features = ["compat"] } diff --git a/README.md b/README.md index b210c86..3c67540 100644 --- a/README.md +++ b/README.md @@ -160,19 +160,33 @@ a text input at the bottom and a status bar. ```json5 { - your_host: { + // Backend credentials + anthropic: { + api_key: "sk-...", + }, + deepinfra: { api_key: "...", base_url: "http://localhost:8000/v1", // vLLM endpoint }, + openrouter: { + api_key: "sk-or-...", + base_url: "https://openrouter.ai/api/v1", + }, // Named models — switch with /model models: { "27b": { - backend: "your_host", + backend: "deepinfra", model_id: "Qwen/Qwen3.5-27B", prompt_file: "POC.md", // system prompt file context_window: 262144, }, + opus: { + backend: "anthropic", + model_id: "claude-opus-4-6", + prompt_file: "CLAUDE.md", + context_window: 200000, + }, }, default_model: "27b", @@ -207,6 +221,14 @@ a text input at the bottom and a status bar. } ``` +### Backends + +- **deepinfra** — any OpenAI-compatible completions API (vLLM, llama.cpp, etc.) +- **anthropic** — Anthropic's API +- **openrouter** — OpenRouter + +The `deepinfra` name is historical; it works with any base URL. + ### Context groups Context groups define what gets loaded into the context window at session start. diff --git a/src/cli/agent.rs b/src/cli/agent.rs index c6ae426..652aa13 100644 --- a/src/cli/agent.rs +++ b/src/cli/agent.rs @@ -2,7 +2,7 @@ use crate::store; -pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option<&str>, dry_run: bool, _local: bool, state_dir: Option<&str>) -> Result<(), String> { +pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option<&str>, dry_run: bool, local: bool, state_dir: Option<&str>) -> Result<(), String> { // Mark as agent so tool calls (e.g. poc-memory render) don't // pollute the user's seen set as a side effect // SAFETY: single-threaded at this point (CLI startup, before any agent work) @@ -18,6 +18,18 @@ pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option unsafe { std::env::set_var("POC_MEMORY_DRY_RUN", "1"); } } + let needs_local = local || dry_run; + let has_targets = !target.is_empty() || query.is_some(); + + // Fast path: no explicit targets, daemon available — just queue via RPC + if !needs_local && !has_targets { + if crate::agents::daemon::send_rpc_pub("ping").is_some() { + return crate::agents::daemon::rpc_run_agent(agent, count); + } + println!("Daemon not running — falling back to local execution"); + } + + // Slow path: need the store for local execution or target resolution let mut store = store::Store::load()?; // Resolve targets: explicit --target, --query, or agent's default query @@ -38,15 +50,32 @@ pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option }; if !resolved_targets.is_empty() { - for (i, key) in resolved_targets.iter().enumerate() { - println!("[{}] [{}/{}] {}", agent, i + 1, resolved_targets.len(), key); - if i > 0 { store = store::Store::load()?; } - if let Err(e) = crate::agent::oneshot::run_one_agent( - &mut store, agent, count, Some(&[key.clone()]), - ) { - println!("[{}] ERROR on {}: {}", agent, key, e); + // --local or daemon unavailable: run directly + if needs_local || crate::agents::daemon::send_rpc_pub("ping").is_none() { + if !needs_local { + println!("Daemon not running — falling back to local execution"); + } + for (i, key) in resolved_targets.iter().enumerate() { + println!("[{}] [{}/{}] {}", agent, i + 1, resolved_targets.len(), key); + if i > 0 { store = store::Store::load()?; } + if let Err(e) = crate::agent::oneshot::run_one_agent( + &mut store, agent, count, Some(&[key.clone()]), + ) { + println!("[{}] ERROR on {}: {}", agent, key, e); + } + } + return Ok(()); + } + + // Queue to daemon + let mut queued = 0; + for key in &resolved_targets { + let cmd = format!("run-agent {} 1 target:{}", agent, key); + if crate::agents::daemon::send_rpc_pub(&cmd).is_some() { + queued += 1; } } + println!("[{}] queued {} tasks to daemon", agent, queued); } else { // Local execution (--local, --debug, dry-run, or daemon unavailable) crate::agent::oneshot::run_one_agent( diff --git a/src/lib.rs b/src/lib.rs index d18dac4..1dc10f6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -80,5 +80,5 @@ pub use hippocampus::query::parser as query_parser; pub use subconscious as agents; pub use subconscious::{ audit, consolidate, - digest, + digest, daemon, }; diff --git a/src/main.rs b/src/main.rs index 0fde522..d228a74 100644 --- a/src/main.rs +++ b/src/main.rs @@ -439,8 +439,45 @@ enum GraphCmd { }, } +#[derive(Subcommand)] +enum DaemonCmd { + /// Start the daemon (default) + Start, + /// Show daemon status + Status, + /// Show daemon log + Log { + /// Job name to filter by + job: Option, + /// Tail a task's log file (drill down from daemon log) + #[arg(long)] + task: Option, + /// Number of lines to show + #[arg(long, default_value_t = 20)] + lines: usize, + }, + /// Trigger consolidation via daemon + Consolidate, + /// Run an agent via the daemon + Run { + /// Agent name (e.g. organize, replay, linker) + #[arg(default_value = "replay")] + agent: String, + /// Batch size + #[arg(default_value_t = 1)] + count: usize, + }, + /// Interactive TUI + Tui, + /// Reload config file without restarting + ReloadConfig, +} + #[derive(Subcommand)] enum AgentCmd { + /// Background job daemon + #[command(subcommand)] + Daemon(DaemonCmd), /// Run knowledge agents to convergence #[command(name = "knowledge-loop")] KnowledgeLoop { @@ -822,9 +859,35 @@ impl Run for CursorCmd { } } +impl Run for DaemonCmd { + fn run(self) -> Result<(), String> { + match self { + Self::Start => daemon::run_daemon(), + Self::Status => daemon::show_status(), + Self::Log { job, task, lines } => { + if let Some(ref task_name) = task { + daemon::show_task_log(task_name, lines) + } else { + daemon::show_log(job.as_deref(), lines) + } + } + Self::Consolidate => daemon::rpc_consolidate(), + Self::Run { agent, count } => daemon::rpc_run_agent(&agent, count), + Self::Tui => Err("TUI moved to consciousness binary (F4/F5)".into()), + Self::ReloadConfig => { + match daemon::send_rpc_pub("reload-config") { + Some(resp) => { eprintln!("{}", resp.trim()); Ok(()) } + None => Err("daemon not running".into()), + } + } + } + } +} + impl Run for AgentCmd { fn run(self) -> Result<(), String> { match self { + Self::Daemon(sub) => sub.run(), Self::KnowledgeLoop { max_cycles, batch_size, window, max_depth } => cli::agent::cmd_knowledge_loop(max_cycles, batch_size, window, max_depth), Self::ConsolidateBatch { count, auto, agent } diff --git a/src/subconscious/daemon.rs b/src/subconscious/daemon.rs index 791cd64..9e70581 100644 --- a/src/subconscious/daemon.rs +++ b/src/subconscious/daemon.rs @@ -1,26 +1,370 @@ -// daemon.rs — graph health metrics +// poc-memory daemon: background job orchestration for memory maintenance // -// Compute graph health statistics for the TUI (F4 hippocampus screen). -// The background daemon and RPC infrastructure have been removed; -// graph maintenance agents now run within the consciousness binary. +// Replaces the fragile cron+shell approach with a single long-running process +// that owns all background memory work. Uses jobkit for worker pool, status +// tracking, retry, and cancellation. +// +// Architecture: +// - Scheduler task: runs every 60s, scans filesystem state, spawns jobs +// - Session watcher task: detects ended Claude sessions, triggers extraction +// - Jobs shell out to existing poc-memory subcommands (Phase 1) +// - Status written to daemon-status.json for `poc-memory daemon status` +// +// Phase 2 will inline job logic; Phase 3 integrates into poc-agent. -/// Graph health snapshot for the hippocampus TUI screen. -#[derive(Clone, Default, serde::Serialize, serde::Deserialize)] -pub struct GraphHealth { - pub nodes: usize, - pub edges: usize, - pub communities: usize, - pub alpha: f32, // power-law exponent (target ≥2.5) - pub gini: f32, // degree inequality (target ≤0.4) - pub avg_cc: f32, // clustering coefficient (target ≥0.2) - pub sigma: f32, // small-world sigma - pub episodic_ratio: f32, // episodic/total nodes (target <0.4) - pub interference: usize, // interfering pairs (target <50) - // Consolidation work estimate from plan +use jobkit::{Choir, ExecutionContext, TaskError, TaskInfo, TaskStatus}; +use std::fs; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, SystemTime}; + +const SCHEDULER_INTERVAL: Duration = Duration::from_secs(60); +const HEALTH_INTERVAL: Duration = Duration::from_secs(3600); + +// --- Persistent task queue --- + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +struct PendingTask { + id: String, + agent: String, + batch: usize, #[serde(default)] - pub plan_counts: std::collections::HashMap, - pub plan_rationale: Vec, - pub computed_at: String, + target_key: Option, +} + +struct TaskQueue { + path: PathBuf, + tasks: Mutex>, +} + +impl TaskQueue { + fn load(data_dir: &Path) -> Arc { + let path = data_dir.join("pending-tasks.jsonl"); + let tasks = if path.exists() { + fs::read_to_string(&path) + .unwrap_or_default() + .lines() + .filter_map(|l| serde_json::from_str(l).ok()) + .collect() + } else { + Vec::new() + }; + let count = tasks.len(); + if count > 0 { + log_event("task-queue", "loaded", &format!("{} pending tasks", count)); + } + Arc::new(Self { path, tasks: Mutex::new(tasks) }) + } + + fn push(&self, task: PendingTask) { + let mut tasks = self.tasks.lock().unwrap(); + tasks.push(task); + self.write_locked(&tasks); + } + + fn remove(&self, id: &str) { + let mut tasks = self.tasks.lock().unwrap(); + tasks.retain(|t| t.id != id); + self.write_locked(&tasks); + } + + fn drain(&self) -> Vec { + let tasks = self.tasks.lock().unwrap(); + tasks.clone() + } + + fn write_locked(&self, tasks: &[PendingTask]) { + let content: String = tasks.iter() + .filter_map(|t| serde_json::to_string(t).ok()) + .collect::>() + .join("\n"); + let _ = fs::write(&self.path, if content.is_empty() { String::new() } else { content + "\n" }); + } +} +fn logs_dir() -> PathBuf { + let dir = dirs::home_dir().unwrap_or_default().join(".consciousness/logs"); + let _ = fs::create_dir_all(&dir); + dir +} + +fn log_path() -> PathBuf { + logs_dir().join("daemon.log") +} + +fn log_event(job: &str, event: &str, detail: &str) { + jobkit::daemon::event_log::log(&logs_dir(), job, event, detail); +} + +// --- Job functions (direct, no subprocess) --- + +static DAEMON_POOL: std::sync::OnceLock> = std::sync::OnceLock::new(); + +/// Run a named job with logging, progress reporting, and error mapping. +fn run_job(ctx: &ExecutionContext, name: &str, f: impl FnOnce() -> Result<(), String>) -> Result<(), TaskError> { + let pool = DAEMON_POOL.get().map(|p| p.as_ref()); + jobkit::daemon::Daemon::run_job(&crate::config::get().data_dir, ctx, name, pool, f) +} + +// experience_mine and fact_mine removed — observation.agent handles all transcript mining + +/// Run an agent targeted at a specific node key. +fn job_targeted_agent( + ctx: &ExecutionContext, + agent_type: &str, + target_key: &str, +) -> Result<(), TaskError> { + let agent = agent_type.to_string(); + let key = target_key.to_string(); + let job_name = format!("c-{}-target({})", agent, key); + run_job(ctx, &job_name, || { + let mut store = crate::store::Store::load()?; + ctx.log_line(format!("targeting: {}", key)); + crate::agent::oneshot::run_one_agent( + &mut store, &agent, 5, Some(std::slice::from_ref(&key)), + )?; + ctx.log_line("done"); + Ok(()) + }) +} + +/// Run a single consolidation agent (replay, linker, separator, transfer, health). +/// Shared set of node keys currently being processed by agents. +/// Prevents concurrent agents from working on overlapping graph regions. +type InFlightNodes = Arc>>; + +fn job_consolidation_agent( + ctx: &ExecutionContext, + agent_type: &str, + batch_size: usize, + in_flight: &InFlightNodes, +) -> Result<(), TaskError> { + let agent = agent_type.to_string(); + let batch = batch_size; + let job_name = format!("c-{}", agent); + let in_flight = Arc::clone(in_flight); + run_job(ctx, &job_name, || { + ctx.log_line("loading store"); + let mut store = crate::store::Store::load()?; + + // Claim seeds: lock in-flight set, run query excluding it, + // add selected seeds + strongly-connected neighbors, then unlock. + let mut claimed_keys: Vec; + let graph = store.build_graph(); + { + let mut locked = in_flight.lock().unwrap(); + ctx.log_line(format!("running agent: {} (batch={}, {} in-flight)", + agent, batch, locked.len())); + + // Run the agent's query, filtering out in-flight nodes + let def = super::defs::get_def(&agent) + .ok_or_else(|| format!("no .agent file for {}", agent))?; + let query = &def.query; + let keys = if !query.is_empty() { + use crate::query::engine as search; + let mut stages = search::Stage::parse_pipeline(query)?; + let padded = batch + locked.len().min(100); + if !stages.iter().any(|s| matches!(s, search::Stage::Transform(search::Transform::Limit(_)))) { + stages.push(search::Stage::Transform(search::Transform::Limit(padded))); + } + let results = search::run_query(&stages, vec![], &graph, &store, false, padded); + results.into_iter() + .map(|(k, _)| k) + .filter(|k| !locked.contains(k)) + .take(batch) + .collect::>() + } else { + vec![] + }; + if keys.is_empty() { + return Err("query returned no results (after exclusion)".into()); + } + + // Claim seeds + strongly-connected neighbors. + // Only exclude neighbors with score > threshold to avoid + // blacking out the graph via high-degree hub nodes. + claimed_keys = Vec::with_capacity(batch * 20); + for key in &keys { + claimed_keys.push(key.clone()); + locked.insert(key.clone()); + for (nbr, strength) in graph.neighbors(key) { + let weight = store.nodes.get(nbr.as_str()) + .map(|n| n.weight).unwrap_or(0.1); + if strength * weight > 0.3 { + claimed_keys.push(nbr.clone()); + locked.insert(nbr.clone()); + } + } + } + } + // in_flight lock released — run LLM without holding it + + // Use run_one_agent_with_keys — we already selected seeds above, + // no need to re-run the query. + let result = crate::agent::oneshot::run_one_agent( + &mut store, &agent, batch, Some(&claimed_keys), + ).map(|_| ()); + + // Release all claimed keys (seeds + neighbors) + { + let mut locked = in_flight.lock().unwrap(); + for key in &claimed_keys { + locked.remove(key); + } + } + + result?; + ctx.log_line("done"); + Ok(()) + }) +} + +/// Run the rename agent: generates renames via LLM, applies them directly. +fn job_rename_agent( + ctx: &ExecutionContext, + batch_size: usize, +) -> Result<(), TaskError> { + run_job(ctx, "c-rename", || { + ctx.log_line("loading store"); + let mut store = crate::store::Store::load()?; + + let batch = if batch_size == 0 { 10 } else { batch_size }; + ctx.log_line(format!("running rename agent (batch={})", batch)); + + let result = crate::agent::oneshot::run_one_agent(&mut store, "rename", batch, None)?; + + // Parse RENAME actions from response (rename uses its own format, not WRITE_NODE/LINK/REFINE) + let mut applied = 0; + let mut skipped = 0; + for line in result.output.lines() { + let trimmed = line.trim(); + if !trimmed.starts_with("RENAME ") { continue; } + + let parts: Vec<&str> = trimmed[7..].splitn(2, ' ').collect(); + if parts.len() != 2 { skipped += 1; continue; } + + let old_key = parts[0].trim(); + let new_key = parts[1].trim(); + if old_key.is_empty() || new_key.is_empty() { skipped += 1; continue; } + + let resolved = match store.resolve_key(old_key) { + Ok(k) => k, + Err(e) => { + ctx.log_line(format!("skip: {} → {}: {}", old_key, new_key, e)); + skipped += 1; + continue; + } + }; + + if store.nodes.contains_key(new_key) { + ctx.log_line(format!("skip: {} already exists", new_key)); + skipped += 1; + continue; + } + + match store.rename_node(&resolved, new_key) { + Ok(()) => { + ctx.log_line(format!("renamed: {} → {}", resolved, new_key)); + applied += 1; + } + Err(e) => { + ctx.log_line(format!("error: {} → {}: {}", resolved, new_key, e)); + skipped += 1; + } + } + } + + if applied > 0 { + store.save()?; + } + + ctx.log_line(format!("done: {} applied, {} skipped", applied, skipped)); + Ok(()) + }) +} + +/// Link orphan nodes (CPU-heavy, no LLM). +fn job_link_orphans(ctx: &ExecutionContext) -> Result<(), TaskError> { + run_job(ctx, "c-orphans", || { + ctx.log_line("loading store"); + let mut store = crate::store::Store::load()?; + ctx.log_line("linking orphans"); + let (orphans, added) = crate::neuro::link_orphans(&mut store, 2, 3, 0.15); + ctx.log_line(format!("{} orphans, {} links added", orphans, added)); + Ok(()) + }) +} + +/// Cap node degree to prevent mega-hubs. +fn job_cap_degree(ctx: &ExecutionContext) -> Result<(), TaskError> { + run_job(ctx, "c-cap", || { + ctx.log_line("loading store"); + let mut store = crate::store::Store::load()?; + ctx.log_line("capping degree"); + match store.cap_degree(50) { + Ok((hubs, pruned)) => { + store.save()?; + ctx.log_line(format!("{} hubs capped, {} edges pruned", hubs, pruned)); + Ok(()) + } + Err(e) => Err(e), + } + }) +} + +/// Apply links extracted from digests. +fn job_digest_links(ctx: &ExecutionContext) -> Result<(), TaskError> { + run_job(ctx, "c-digest-links", || { + ctx.log_line("loading store"); + let mut store = crate::store::Store::load()?; + ctx.log_line("applying digest links"); + let links = super::digest::parse_all_digest_links(&store); + let (applied, skipped, fallbacks) = super::digest::apply_digest_links(&mut store, &links); + store.save()?; + ctx.log_line(format!("{} applied, {} skipped, {} fallbacks", applied, skipped, fallbacks)); + Ok(()) + }) +} + +fn job_knowledge_loop(_ctx: &ExecutionContext) -> Result<(), TaskError> { + // Knowledge loop removed — agents now use tool calls directly. + // Consolidation is handled by consolidate_full() in the consolidate job. + Ok(()) +} + +fn job_digest(ctx: &ExecutionContext) -> Result<(), TaskError> { + run_job(ctx, "digest", || { + ctx.log_line("loading store"); + let mut store = crate::store::Store::load()?; + ctx.log_line("generating digests"); + crate::digest::digest_auto(&mut store) + }) +} + +fn job_daily_check( + ctx: &ExecutionContext, + graph_health: &Arc>>, +) -> Result<(), TaskError> { + let gh = Arc::clone(graph_health); + run_job(ctx, "daily-check", || { + ctx.log_line("loading store"); + let store = crate::store::Store::load()?; + ctx.log_line("checking health"); + let _report = crate::neuro::daily_check(&store); + + // Decay search hit counters (10% daily decay) + ctx.log_line("decaying search counters"); + match crate::counters::decay_all(0.9) { + Ok(removed) => ctx.log_line(format!("decayed counters, removed {}", removed)), + Err(e) => ctx.log_line(format!("counter decay failed: {}", e)), + } + + // Compute graph health metrics for status display + ctx.log_line("computing graph health"); + let health = compute_graph_health(&store); + *gh.lock().unwrap() = Some(health); + + Ok(()) + }) } pub fn compute_graph_health(store: &crate::store::Store) -> GraphHealth { @@ -51,3 +395,862 @@ pub fn compute_graph_health(store: &crate::store::Store) -> GraphHealth { computed_at: crate::store::format_datetime_space(crate::store::now_epoch()), } } + +/// Get process uptime as human-readable string by reading /proc//stat. +fn proc_uptime(pid: u32) -> Option { + // /proc//stat field 22 (1-indexed) is start time in clock ticks + let stat = fs::read_to_string(format!("/proc/{}/stat", pid)).ok()?; + // Fields after comm (which may contain spaces/parens) — find closing paren + let after_comm = stat.get(stat.rfind(')')? + 2..)?; + let fields: Vec<&str> = after_comm.split_whitespace().collect(); + // Field 22 in stat is index 19 after comm (fields[0] = state, field 22 = starttime = index 19) + let start_ticks: u64 = fields.get(19)?.parse().ok()?; + let ticks_per_sec = unsafe { libc::sysconf(libc::_SC_CLK_TCK) } as u64; + let boot_time_secs = { + let uptime = fs::read_to_string("/proc/uptime").ok()?; + let sys_uptime: f64 = uptime.split_whitespace().next()?.parse().ok()?; + let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).ok()?.as_secs(); + now - sys_uptime as u64 + }; + let start_secs = boot_time_secs + start_ticks / ticks_per_sec; + let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).ok()?.as_secs(); + let uptime = now.saturating_sub(start_secs); + + Some(format_duration_human(uptime as u128 * 1000)) +} + +// --- Status writing --- + +fn write_status( + choir: &Choir, + last_daily: Option, + graph_health: &Arc>>, +) { + let status = build_status(choir, last_daily, graph_health); + jobkit::daemon::status::write(&crate::config::get().data_dir, &status); +} + +#[derive(Clone, Default, serde::Serialize, serde::Deserialize)] +pub struct GraphHealth { + pub nodes: usize, + pub edges: usize, + pub communities: usize, + pub alpha: f32, // power-law exponent (target ≥2.5) + pub gini: f32, // degree inequality (target ≤0.4) + pub avg_cc: f32, // clustering coefficient (target ≥0.2) + pub sigma: f32, // small-world sigma + pub episodic_ratio: f32, // episodic/total nodes (target <0.4) + pub interference: usize, // interfering pairs (target <50) + // Consolidation work estimate from plan + #[serde(default)] + pub plan_counts: std::collections::HashMap, + pub plan_rationale: Vec, + pub computed_at: String, +} + +#[derive(serde::Serialize, serde::Deserialize)] +struct DaemonStatus { + pid: u32, + tasks: Vec, + #[serde(default)] + last_daily: Option, + #[serde(default)] + graph_health: Option, +} + +// --- The daemon --- + +pub fn run_daemon() -> Result<(), String> { + let config = crate::config::get(); + let mut daemon = jobkit::daemon::Daemon::new(jobkit::daemon::DaemonConfig { + data_dir: config.data_dir.clone(), + resource_slots: config.llm_concurrency, + resource_name: "llm".to_string(), + }); + + let choir = Arc::clone(&daemon.choir); + let llm = Arc::clone(&daemon.resource); + let _ = DAEMON_POOL.set(Arc::clone(&llm)); + let task_log_dir = logs_dir().join("daemon"); + let _ = fs::create_dir_all(&task_log_dir); + + // Enable verbose logging if POC_MEMORY_VERBOSE is set + if std::env::var("POC_MEMORY_VERBOSE").is_ok() { + jobkit::daemon::event_log::set_level(jobkit::daemon::event_log::LogLevel::Verbose); + } + + // Recover last_daily from previous status file + let last_daily: Arc>> = Arc::new(Mutex::new( + jobkit::daemon::status::load::(&config.data_dir) + .and_then(|s| s.last_daily) + .and_then(|d| d.parse().ok()) + )); + + let graph_health: Arc>> = Arc::new(Mutex::new(None)); + + // Persistent task queue — survives daemon restarts + let task_queue = TaskQueue::load(&config.data_dir); + + // Nodes currently being processed by agents — prevents concurrent + // agents from working on overlapping graph regions. + let in_flight: InFlightNodes = Arc::new(Mutex::new(std::collections::HashSet::new())); + + log_event("daemon", "started", &format!("pid {}", std::process::id())); + eprintln!("poc-memory daemon started (pid {})", std::process::id()); + + // Recover pending tasks from previous run + { + let recovered = task_queue.drain(); + if !recovered.is_empty() { + log_event("task-queue", "recovering", &format!("{} tasks", recovered.len())); + for pt in &recovered { + let agent = pt.agent.clone(); + let b = pt.batch; + let task_id = pt.id.clone(); + let in_flight_clone = Arc::clone(&in_flight); + let queue_clone = Arc::clone(&task_queue); + choir.spawn(pt.id.clone()) + .resource(&llm) + .log_dir(&task_log_dir) + .retries(1) + .init(move |ctx| { + let result = job_consolidation_agent(ctx, &agent, b, &in_flight_clone); + queue_clone.remove(&task_id); + result + }); + // Drop schedules via IdleTask::Drop + } + } + } + + // Write initial status + write_status(&choir, *last_daily.lock().unwrap(), &graph_health); + + + // Scheduler: runs daily jobs based on filesystem state + let choir_sched = Arc::clone(&choir); + let llm_sched = Arc::clone(&llm); + let last_daily_sched = Arc::clone(&last_daily); + let graph_health_sched = Arc::clone(&graph_health); + let in_flight_sched = Arc::clone(&in_flight); + let log_dir_sched = task_log_dir.clone(); + let queue_sched = Arc::clone(&task_queue); + const CONSOLIDATION_INTERVAL: Duration = Duration::from_secs(6 * 3600); // 6 hours + + choir.spawn("scheduler").init(move |ctx| { + let mut last_health = std::time::Instant::now() - HEALTH_INTERVAL; + let mut last_consolidation = std::time::Instant::now() - CONSOLIDATION_INTERVAL; // run on first tick + ctx.set_progress("idle"); + + loop { + if ctx.is_cancelled() { + return Err(TaskError::Fatal("cancelled".into())); + } + + let today = chrono::Local::now().date_naive(); + + // Health check: every hour — also updates graph health metrics + if last_health.elapsed() >= HEALTH_INTERVAL { + let gh = Arc::clone(&graph_health_sched); + choir_sched.spawn("health").init(move |ctx| { + job_daily_check(ctx, &gh) + }); + last_health = std::time::Instant::now(); + } + + // Consolidation cycle: every 6 hours (wait for health check to cache metrics first) + let gh = graph_health_sched.lock().unwrap().clone(); + let Some(h) = gh.as_ref() else { continue }; + if last_consolidation.elapsed() >= CONSOLIDATION_INTERVAL { + log_event("scheduler", "consolidation-trigger", + &format!("{} (every 6h)", today)); + + // Use cached graph health plan (from consolidation_plan_quick). + let plan = crate::neuro::ConsolidationPlan { + counts: h.plan_counts.clone(), + run_health: true, + rationale: Vec::new(), + }; + let runs = plan.to_agent_runs(5); + + let summary: Vec = h.plan_counts.iter() + .filter(|(_, c)| **c > 0) + .map(|(a, c)| format!("{}{}", &a[..1], c)) + .collect(); + log_event("scheduler", "consolidation-plan", + &format!("{} agents ({})", runs.len(), summary.join(" "))); + + // Phase 1: Agent runs — all concurrent, in-flight exclusion + // prevents overlapping graph regions. + let mut all_tasks: Vec = Vec::new(); + for (i, (agent_type, batch)) in runs.iter().enumerate() { + let agent = agent_type.to_string(); + let b = *batch; + let in_flight_clone = Arc::clone(&in_flight_sched); + let task_name = format!("c-{}-{}:{}", agent, i, today); + let task_id = task_name.clone(); + let queue_clone = Arc::clone(&queue_sched); + queue_sched.push(PendingTask { + id: task_id.clone(), + agent: agent.clone(), + batch: b, + target_key: None, + }); + let task = choir_sched.spawn(task_name) + .resource(&llm_sched) + .log_dir(&log_dir_sched) + .retries(1) + .init(move |ctx| { + let result = job_consolidation_agent(ctx, &agent, b, &in_flight_clone); + queue_clone.remove(&task_id); + result + }) + .run(); + all_tasks.push(task); + } + // Orphans phase depends on all agent tasks completing + let prev_agent = all_tasks.last().cloned(); + + // Phase 2: Link orphans (CPU-only, no LLM) + let mut orphans = choir_sched.spawn(format!("c-orphans:{}", today)) + .retries(1) + .init(job_link_orphans); + if let Some(ref dep) = prev_agent { + orphans.depend_on(dep); + } + let orphans = orphans.run(); + + // Phase 3: Cap degree + let mut cap = choir_sched.spawn(format!("c-cap:{}", today)) + .retries(1) + .init(job_cap_degree); + cap.depend_on(&orphans); + let cap = cap.run(); + + // Phase 4: Generate digests + let mut digest = choir_sched.spawn(format!("c-digest:{}", today)) + .resource(&llm_sched) + .retries(1) + .init(job_digest); + digest.depend_on(&cap); + let digest = digest.run(); + + // Phase 5: Apply digest links + let mut digest_links = choir_sched.spawn(format!("c-digest-links:{}", today)) + .retries(1) + .init(job_digest_links); + digest_links.depend_on(&digest); + let digest_links = digest_links.run(); + + // Phase 7: Knowledge loop + let mut knowledge = choir_sched.spawn(format!("c-knowledge:{}", today)) + .resource(&llm_sched) + .retries(1) + .init(job_knowledge_loop); + knowledge.depend_on(&digest_links); + + *last_daily_sched.lock().unwrap() = Some(today); + last_consolidation = std::time::Instant::now(); + ctx.set_progress(format!("daily pipeline triggered ({today})")); + } + + // Prune finished tasks from registry + let pruned = choir_sched.gc_finished(); + if pruned > 0 { + log::trace!("pruned {} finished tasks", pruned); + } + + write_status(&choir_sched, *last_daily_sched.lock().unwrap(), &graph_health_sched); + std::thread::sleep(SCHEDULER_INTERVAL); + } + }); + + // Register RPC handlers + { + let last_daily_rpc = Arc::clone(&last_daily); + daemon.add_rpc_handler(move |cmd, _ctx| { + if cmd == "consolidate" { + *last_daily_rpc.lock().unwrap() = None; + log_event("rpc", "consolidate", "triggered via socket"); + Some("{\"ok\":true,\"action\":\"consolidation scheduled\"}\n".into()) + } else { + None + } + }); + } + + daemon.add_rpc_handler(|cmd, _ctx| { + if cmd != "reload-config" { return None; } + let changed = crate::config::reload(); + let config = crate::config::get(); + let api = config.api_base_url.as_deref().unwrap_or("(none)"); + let model = config.api_model.as_deref().unwrap_or("(default)"); + log_event("daemon", "config-reload", + &format!("changed={}, api={}, model={}", changed, api, model)); + Some(format!("{{\"ok\":true,\"changed\":{},\"api_base_url\":\"{}\",\"api_model\":\"{}\"}}\n", + changed, api, model)) + }); + + daemon.add_rpc_handler(|cmd, _ctx| { + if !cmd.starts_with("record-hits ") { return None; } + let keys: Vec<&str> = cmd.strip_prefix("record-hits ") + .unwrap_or("") + .split('\t') + .filter(|k| !k.is_empty()) + .collect(); + if keys.is_empty() { + return Some("{\"ok\":false,\"error\":\"no keys\"}\n".into()); + } + let n = keys.len(); + match crate::counters::record_search_hits(&keys) { + Ok(()) => Some(format!("{{\"ok\":true,\"recorded\":{}}}\n", n)), + Err(e) => Some(format!("{{\"ok\":false,\"error\":\"{}\"}}\n", e.replace('"', "'"))), + } + }); + + { + let choir_rpc = Arc::clone(&choir); + let llm_rpc = Arc::clone(&llm); + let log_dir_rpc = task_log_dir.clone(); + let in_flight_rpc = Arc::clone(&in_flight); + daemon.add_rpc_handler(move |cmd, _ctx| { + if !cmd.starts_with("run-agent ") { return None; } + let parts: Vec<&str> = cmd.splitn(4, ' ').collect(); + let agent_type = parts.get(1).unwrap_or(&"replay"); + let count: usize = parts.get(2) + .and_then(|s| s.parse().ok()) + .unwrap_or(1); + // Optional target key: "run-agent linker 1 target:KEY" + let target_key: Option = parts.get(3) + .and_then(|s| s.strip_prefix("target:")) + .map(|s| s.to_string()); + let batch_size = 5; + let today = chrono::Local::now().format("%Y-%m-%d"); + let ts = chrono::Local::now().format("%H%M%S"); + let mut prev = None; + let mut spawned = 0; + let mut remaining = count; + + let is_rename = *agent_type == "rename"; + + // Targeted run: one task for a specific node + if let Some(ref key) = target_key { + let agent = agent_type.to_string(); + let key = key.clone(); + let task_name = format!("c-{}-{}:{}", agent, key.chars().take(30).collect::(), today); + if jobkit::daemon::event_log::enabled(jobkit::daemon::event_log::LogLevel::Verbose) { + log_event("daemon", "spawn-targeted", + &format!("{} (pool: {}/{})", task_name, llm_rpc.available(), llm_rpc.capacity())); + } + choir_rpc.spawn(task_name) + .resource(&llm_rpc) + .log_dir(&log_dir_rpc) + .retries(1) + .init(move |ctx| { + job_targeted_agent(ctx, &agent, &key) + }) + .run(); + spawned = 1; + remaining = 0; + } + + while remaining > 0 { + let batch = remaining.min(batch_size); + let agent = agent_type.to_string(); + let in_flight_clone = Arc::clone(&in_flight_rpc); + let task_name = format!("c-{}-rpc{}:{}", agent, ts, today); + let mut builder = choir_rpc.spawn(task_name) + .resource(&llm_rpc) + .log_dir(&log_dir_rpc) + .retries(1) + .init(move |ctx| { + if is_rename { + job_rename_agent(ctx, batch) + } else { + job_consolidation_agent(ctx, &agent, batch, &in_flight_clone) + } + }); + if let Some(ref dep) = prev { + builder.depend_on(dep); + } + prev = Some(builder.run()); + remaining -= batch; + spawned += 1; + } + + log_event("rpc", "run-agent", &format!("{} x{}", agent_type, count)); + Some(format!("{{\"ok\":true,\"action\":\"queued {} {} run(s) ({} tasks)\"}}\n", + count, agent_type, spawned)) + }); + } + + // Main thread: socket server + signal handling + let last_daily_status = Arc::clone(&last_daily); + let graph_health_status = Arc::clone(&graph_health); + daemon.run(move |ctx| { + build_status(&ctx.choir, *last_daily_status.lock().unwrap(), &graph_health_status) + }); + + log_event("daemon", "stopping", ""); + eprintln!("Shutting down..."); + + log_event("daemon", "stopped", ""); + std::process::exit(0) +} + +pub fn send_rpc_pub(cmd: &str) -> Option { + send_rpc(cmd) +} + +fn send_rpc(cmd: &str) -> Option { + jobkit::daemon::socket::send_rpc(&crate::config::get().data_dir, cmd) +} + +pub fn rpc_consolidate() -> Result<(), String> { + match send_rpc("consolidate") { + Some(resp) => { + println!("{}", resp.trim()); + Ok(()) + } + None => Err("Daemon not running.".into()), + } +} + +pub fn rpc_run_agent(agent: &str, count: usize) -> Result<(), String> { + let cmd = format!("run-agent {} {}", agent, count); + match send_rpc(&cmd) { + Some(resp) => { + println!("{}", resp.trim()); + Ok(()) + } + None => Err("Daemon not running.".into()), + } +} + +fn read_status_socket() -> Option { + let json = jobkit::daemon::socket::send_rpc(&crate::config::get().data_dir, "")?; + serde_json::from_str(&json).ok() +} + +// status_socket_loop has been replaced by daemon.run() in jobkit-daemon. + +fn build_status( + choir: &Choir, + last_daily: Option, + graph_health: &Arc>>, +) -> DaemonStatus { + DaemonStatus { + pid: std::process::id(), + tasks: choir.task_statuses(), + last_daily: last_daily.map(|d| d.to_string()), + graph_health: graph_health.lock().unwrap().clone(), + } +} + +// --- Status display --- + +fn format_duration_human(ms: u128) -> String { + if ms < 1_000 { + format!("{}ms", ms) + } else if ms < 60_000 { + format!("{:.1}s", ms as f64 / 1000.0) + } else if ms < 3_600_000 { + format!("{:.0}m{:.0}s", ms / 60_000, (ms % 60_000) / 1000) + } else { + format!("{:.0}h{:.0}m", ms / 3_600_000, (ms % 3_600_000) / 60_000) + } +} + +fn task_group(name: &str) -> &str { + if name == "scheduler" { "core" } + else if name.starts_with("c-") || name.starts_with("consolidate:") + || name.starts_with("knowledge-loop:") || name.starts_with("digest:") + || name.starts_with("decay:") { "daily" } + else if name == "health" { "health" } + else { "other" } +} + +/// Compute elapsed time for a task, using absolute started_at if available. +fn task_elapsed(t: &TaskInfo) -> Duration { + if matches!(t.status, TaskStatus::Running) { + if let Some(started) = t.started_at { + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64(); + Duration::from_secs_f64((now - started).max(0.0)) + } else { + t.elapsed + } + } else { + t.result.as_ref() + .map(|r| r.duration) + .unwrap_or(t.elapsed) + } +} + +fn status_symbol(t: &TaskInfo) -> &'static str { + if t.cancelled { return "✗" } + match t.status { + TaskStatus::Running => "▶", + TaskStatus::Completed => "✓", + TaskStatus::Failed => "✗", + TaskStatus::Pending => "·", + } +} + +/// Shorten a job name for display: "experience-mine /long/path/uuid.jsonl" → "experience-mine uuid…" +fn short_job_name(job: &str) -> String { + // Split "verb path" or just return as-is + if let Some((verb, path)) = job.split_once(' ') { + let file = path.rsplit('/').next().unwrap_or(path); + let file = file.strip_suffix(".jsonl").unwrap_or(file); + let short = if file.len() > 12 { &file[..12] } else { file }; + format!("{} {}", verb, short) + } else { + job.to_string() + } +} + +fn show_recent_completions(n: usize) { + let path = log_path(); + let content = match fs::read_to_string(&path) { + Ok(c) => c, + Err(_) => return, + }; + + let recent: Vec<&str> = content.lines().rev() + .filter(|line| { + line.contains("\"event\":\"completed\"") || line.contains("\"event\":\"failed\"") + }) + .take(n) + .collect(); + + if recent.is_empty() { return; } + + eprintln!(" Recent:"); + for line in recent.iter().rev() { + if let Ok(obj) = serde_json::from_str::(line) { + let ts = obj.get("ts").and_then(|v| v.as_str()).unwrap_or("?"); + let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("?"); + let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("?"); + let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or(""); + + let time = if ts.len() >= 19 { &ts[11..19] } else { ts }; + let sym = if event == "completed" { "✓" } else { "✗" }; + let name = short_job_name(job); + + eprintln!(" {} {} {:30} {}", sym, time, name, detail); + } + } + eprintln!(); +} + +pub fn show_status() -> Result<(), String> { + let status = match read_status_socket() { + Some(s) => s, + None => { + eprintln!("Daemon not running."); + return Ok(()); + } + }; + + let uptime_str = proc_uptime(status.pid).unwrap_or_default(); + if uptime_str.is_empty() { + eprintln!("poc-memory daemon pid={}", status.pid); + } else { + eprintln!("poc-memory daemon pid={} uptime {}", status.pid, uptime_str); + } + + if status.tasks.is_empty() { + eprintln!("\n No tasks"); + return Ok(()); + } + + // Count by status + let running = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)).count(); + let pending = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count(); + let completed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count(); + let failed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count(); + eprintln!(" tasks: {} running, {} pending, {} done, {} failed", + running, pending, completed, failed); + + // Graph health + if let Some(ref gh) = status.graph_health { + eprintln!(); + fn indicator(val: f32, target: f32, higher_is_better: bool) -> &'static str { + let ok = if higher_is_better { val >= target } else { val <= target }; + if ok { "✓" } else { "✗" } + } + eprintln!(" Graph health ({})", gh.computed_at); + eprintln!(" {} nodes, {} edges, {} communities", + gh.nodes, gh.edges, gh.communities); + eprintln!(" {} α={:.2} (≥2.5) {} gini={:.3} (≤0.4) {} cc={:.3} (≥0.2)", + indicator(gh.alpha, 2.5, true), gh.alpha, + indicator(gh.gini, 0.4, false), gh.gini, + indicator(gh.avg_cc, 0.2, true), gh.avg_cc); + eprintln!(" {} episodic={:.0}% (<40%) σ={:.1}", + indicator(gh.episodic_ratio, 0.4, false), gh.episodic_ratio * 100.0, + gh.sigma); + + let plan_total: usize = gh.plan_counts.values().sum::() + 1; + let plan_summary: Vec = gh.plan_counts.iter() + .filter(|(_, c)| **c > 0) + .map(|(a, c)| format!("{}{}", &a[..1], c)) + .collect(); + eprintln!(" consolidation plan: {} agents ({} +health)", + plan_total, plan_summary.join(" ")); + } + eprintln!(); + + // Group and display + let groups: &[(&str, &str)] = &[ + ("core", "Core"), + ("daily", "Daily pipeline"), + ("extract", "Session extraction"), + ("health", "Health"), + ("other", "Other"), + ]; + + // In-flight tasks first (running + pending) + let in_flight: Vec<&TaskInfo> = status.tasks.iter() + .filter(|t| matches!(t.status, TaskStatus::Running | TaskStatus::Pending)) + .collect(); + + if !in_flight.is_empty() { + eprintln!(" In flight:"); + for t in &in_flight { + let sym = status_symbol(t); + let e = task_elapsed(t); + let elapsed = if !e.is_zero() { + format!(" {}", format_duration_human(e.as_millis())) + } else { + String::new() + }; + let progress = t.progress.as_deref() + .filter(|p| *p != "idle") + .map(|p| format!(" {}", p)) + .unwrap_or_default(); + let name = short_job_name(&t.name); + eprintln!(" {} {:30}{}{}", sym, name, elapsed, progress); + if let Some(ref lp) = t.log_path { + // tail from log file + if matches!(t.status, TaskStatus::Running) { + eprintln!(" │ log: {}", lp); + } + } + } + eprintln!(); + } + + // Recent completions from log file + show_recent_completions(20); + + // Detailed group view only if there are failures worth showing + for (group_id, group_label) in groups { + let tasks: Vec<&TaskInfo> = status.tasks.iter() + .filter(|t| task_group(&t.name) == *group_id) + .collect(); + if tasks.is_empty() { continue; } + + // For extract group, show summary instead of individual tasks + if *group_id == "extract" { + let n_pending = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count(); + let n_running = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)).count(); + let n_done = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count(); + let n_failed = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count(); + eprintln!(" {} ({} total)", group_label, tasks.len()); + + if n_running > 0 { + for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)) { + let e = task_elapsed(t); + let elapsed = if !e.is_zero() { + format!(" ({})", format_duration_human(e.as_millis())) + } else { + String::new() + }; + let progress = t.progress.as_deref().map(|p| format!(" {}", p)).unwrap_or_default(); + eprintln!(" {} {}{}{}", status_symbol(t), t.name, elapsed, progress); + if let Some(ref lp) = t.log_path { + eprintln!(" │ log: {}", lp); + } + } + } + let mut parts = Vec::new(); + if n_done > 0 { parts.push(format!("{} done", n_done)); } + if n_running > 0 { parts.push(format!("{} running", n_running)); } + if n_pending > 0 { parts.push(format!("{} queued", n_pending)); } + if n_failed > 0 { parts.push(format!("{} FAILED", n_failed)); } + eprintln!(" {}", parts.join(", ")); + + // Show recent failures + for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).take(3) { + if let Some(ref err) = t.result.as_ref().and_then(|r| r.error.as_ref()) { + let short = if err.len() > 80 { &err[..80] } else { err }; + eprintln!(" ✗ {}: {}", t.name, short); + } + } + eprintln!(); + continue; + } + + eprintln!(" {}", group_label); + for t in &tasks { + let sym = status_symbol(t); + let e = task_elapsed(t); + let duration = if !e.is_zero() { + format_duration_human(e.as_millis()) + } else { + String::new() + }; + + let retry = if t.max_retries > 0 && t.retry_count > 0 { + format!(" retry {}/{}", t.retry_count, t.max_retries) + } else { + String::new() + }; + + let detail = if matches!(t.status, TaskStatus::Failed) { + t.result.as_ref() + .and_then(|r| r.error.as_ref()) + .map(|e| { + let short = if e.len() > 60 { &e[..60] } else { e }; + format!(" err: {}", short) + }) + .unwrap_or_default() + } else { + String::new() + }; + + if duration.is_empty() { + eprintln!(" {} {:30}{}{}", sym, t.name, retry, detail); + } else { + eprintln!(" {} {:30} {:>8}{}{}", sym, t.name, duration, retry, detail); + } + + // Show output log tail for running tasks + if let Some(ref lp) = t.log_path { + // tail from log file + if matches!(t.status, TaskStatus::Running) { + eprintln!(" │ log: {}", lp); + } + } + } + eprintln!(); + } + + Ok(()) +} + +/// Drill down into a task's log file. Finds the log path from: +/// 1. Running task status (daemon-status.json) +/// 2. daemon.log started events (for completed/failed tasks) +pub fn show_task_log(task_name: &str, lines: usize) -> Result<(), String> { + // Try running tasks first + let Some(status_json) = send_rpc_pub("") else { + return search_log_fallback(task_name, lines); + }; + let Ok(status) = serde_json::from_str::(&status_json) else { + return search_log_fallback(task_name, lines); + }; + let Some(tasks) = status.get("tasks").and_then(|t| t.as_array()) else { + return search_log_fallback(task_name, lines); + }; + for t in tasks { + let name = t.get("name").and_then(|n| n.as_str()).unwrap_or(""); + if !name.contains(task_name) { continue; } + if let Some(lp) = t.get("log_path").and_then(|p| p.as_str()) { + return tail_file(lp, lines); + } + } + search_log_fallback(task_name, lines) +} + +fn search_log_fallback(task_name: &str, lines: usize) -> Result<(), String> { + // Fall back to searching daemon.log for the most recent started event with a log path + let log = log_path(); + if log.exists() { + let content = fs::read_to_string(&log).map_err(|e| format!("read log: {}", e))?; + for line in content.lines().rev() { + if let Ok(obj) = serde_json::from_str::(line) { + let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or(""); + let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or(""); + let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or(""); + if job.contains(task_name) && event == "started" && detail.starts_with("log: ") { + let path = &detail[5..]; + return tail_file(path, lines); + } + } + } + } + + Err(format!("no log file found for task '{}'", task_name)) +} + +fn tail_file(path: &str, lines: usize) -> Result<(), String> { + let content = fs::read_to_string(path) + .map_err(|e| format!("read {}: {}", path, e))?; + let all_lines: Vec<&str> = content.lines().collect(); + let skip = all_lines.len().saturating_sub(lines); + eprintln!("--- {} ({} lines) ---", path, all_lines.len()); + for line in &all_lines[skip..] { + eprintln!("{}", line); + } + Ok(()) +} + +pub fn show_log(job_filter: Option<&str>, lines: usize) -> Result<(), String> { + let path = log_path(); + if !path.exists() { + eprintln!("No daemon log found."); + return Ok(()); + } + + let content = fs::read_to_string(&path) + .map_err(|e| format!("read log: {}", e))?; + + let filtered: Vec<&str> = content.lines().rev() + .filter(|line| { + if let Some(job) = job_filter { + line.contains(&format!("\"job\":\"{}\"", job)) + } else { + true + } + }) + .take(lines) + .collect(); + + if filtered.is_empty() { + eprintln!("No log entries{}", job_filter.map(|j| format!(" for job '{}'", j)).unwrap_or_default()); + return Ok(()); + } + + // Pretty-print: parse JSON and format as "TIME JOB EVENT [DETAIL]" + for line in filtered.into_iter().rev() { + if let Ok(obj) = serde_json::from_str::(line) { + let ts = obj.get("ts").and_then(|v| v.as_str()).unwrap_or("?"); + let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("?"); + let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("?"); + let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or(""); + + // Shorten timestamp to just time portion + let time = if ts.len() >= 19 { &ts[11..19] } else { ts }; + + if detail.is_empty() { + eprintln!(" {} {:20} {}", time, job, event); + } else { + // Truncate long details (file paths) + let short = if detail.len() > 60 { + let last = detail.rfind('/').map(|i| &detail[i+1..]).unwrap_or(detail); + if last.len() > 60 { &last[..60] } else { last } + } else { + detail + }; + eprintln!(" {} {:20} {:12} {}", time, job, event, short); + } + } else { + eprintln!("{}", line); + } + } + Ok(()) +}