diff --git a/Cargo.lock b/Cargo.lock index 60a55a4..882f939 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -538,7 +538,6 @@ dependencies = [ "http-body-util", "hyper", "hyper-util", - "jobkit", "json5", "libc", "log", @@ -1569,20 +1568,6 @@ dependencies = [ "syn 2.0.117", ] -[[package]] -name = "jobkit" -version = "0.3.0" -source = "git+https://evilpiepirate.org/git/jobkit.git#4aacaac22c5f59a7fbc6ce3a65708fc370e55754" -dependencies = [ - "chrono", - "libc", - "log", - "profiling", - "serde", - "serde_json", - "tokio", -] - [[package]] name = "jobserver" version = "0.1.34" @@ -2209,25 +2194,6 @@ dependencies = [ "yansi", ] -[[package]] -name = "profiling" -version = "1.0.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3eb8486b569e12e2c32ad3e204dbaba5e4b5b216e9367044f25f1dba42341773" -dependencies = [ - "profiling-procmacros", -] - -[[package]] -name = "profiling-procmacros" -version = "1.0.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52717f9a02b6965224f95ca2a81e2e0c5c43baacd28ca057577988930b6c3d5b" -dependencies = [ - "quote", - "syn 2.0.117", -] - [[package]] name = "ptr_meta" version = "0.1.4" diff --git a/Cargo.toml b/Cargo.toml index 20df8d1..096c390 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,6 @@ redb = "4" rkyv = { version = "0.7", features = ["validation", "std"] } rayon = "1" -jobkit = { git = "https://evilpiepirate.org/git/jobkit.git", features = ["daemon"] } tokio = { version = "1", features = ["full"] } tokio-util = { version = "0.7", features = ["compat"] } diff --git a/README.md b/README.md index 3c67540..b210c86 100644 --- a/README.md +++ b/README.md @@ -160,33 +160,19 @@ a text input at the bottom and a status bar. ```json5 { - // Backend credentials - anthropic: { - api_key: "sk-...", - }, - deepinfra: { + your_host: { api_key: "...", base_url: "http://localhost:8000/v1", // vLLM endpoint }, - openrouter: { - api_key: "sk-or-...", - base_url: "https://openrouter.ai/api/v1", - }, // Named models — switch with /model models: { "27b": { - backend: "deepinfra", + backend: "your_host", model_id: "Qwen/Qwen3.5-27B", prompt_file: "POC.md", // system prompt file context_window: 262144, }, - opus: { - backend: "anthropic", - model_id: "claude-opus-4-6", - prompt_file: "CLAUDE.md", - context_window: 200000, - }, }, default_model: "27b", @@ -221,14 +207,6 @@ a text input at the bottom and a status bar. } ``` -### Backends - -- **deepinfra** — any OpenAI-compatible completions API (vLLM, llama.cpp, etc.) -- **anthropic** — Anthropic's API -- **openrouter** — OpenRouter - -The `deepinfra` name is historical; it works with any base URL. - ### Context groups Context groups define what gets loaded into the context window at session start. diff --git a/src/cli/agent.rs b/src/cli/agent.rs index 652aa13..c6ae426 100644 --- a/src/cli/agent.rs +++ b/src/cli/agent.rs @@ -2,7 +2,7 @@ use crate::store; -pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option<&str>, dry_run: bool, local: bool, state_dir: Option<&str>) -> Result<(), String> { +pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option<&str>, dry_run: bool, _local: bool, state_dir: Option<&str>) -> Result<(), String> { // Mark as agent so tool calls (e.g. poc-memory render) don't // pollute the user's seen set as a side effect // SAFETY: single-threaded at this point (CLI startup, before any agent work) @@ -18,18 +18,6 @@ pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option unsafe { std::env::set_var("POC_MEMORY_DRY_RUN", "1"); } } - let needs_local = local || dry_run; - let has_targets = !target.is_empty() || query.is_some(); - - // Fast path: no explicit targets, daemon available — just queue via RPC - if !needs_local && !has_targets { - if crate::agents::daemon::send_rpc_pub("ping").is_some() { - return crate::agents::daemon::rpc_run_agent(agent, count); - } - println!("Daemon not running — falling back to local execution"); - } - - // Slow path: need the store for local execution or target resolution let mut store = store::Store::load()?; // Resolve targets: explicit --target, --query, or agent's default query @@ -50,32 +38,15 @@ pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option }; if !resolved_targets.is_empty() { - // --local or daemon unavailable: run directly - if needs_local || crate::agents::daemon::send_rpc_pub("ping").is_none() { - if !needs_local { - println!("Daemon not running — falling back to local execution"); - } - for (i, key) in resolved_targets.iter().enumerate() { - println!("[{}] [{}/{}] {}", agent, i + 1, resolved_targets.len(), key); - if i > 0 { store = store::Store::load()?; } - if let Err(e) = crate::agent::oneshot::run_one_agent( - &mut store, agent, count, Some(&[key.clone()]), - ) { - println!("[{}] ERROR on {}: {}", agent, key, e); - } - } - return Ok(()); - } - - // Queue to daemon - let mut queued = 0; - for key in &resolved_targets { - let cmd = format!("run-agent {} 1 target:{}", agent, key); - if crate::agents::daemon::send_rpc_pub(&cmd).is_some() { - queued += 1; + for (i, key) in resolved_targets.iter().enumerate() { + println!("[{}] [{}/{}] {}", agent, i + 1, resolved_targets.len(), key); + if i > 0 { store = store::Store::load()?; } + if let Err(e) = crate::agent::oneshot::run_one_agent( + &mut store, agent, count, Some(&[key.clone()]), + ) { + println!("[{}] ERROR on {}: {}", agent, key, e); } } - println!("[{}] queued {} tasks to daemon", agent, queued); } else { // Local execution (--local, --debug, dry-run, or daemon unavailable) crate::agent::oneshot::run_one_agent( diff --git a/src/lib.rs b/src/lib.rs index 1dc10f6..d18dac4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -80,5 +80,5 @@ pub use hippocampus::query::parser as query_parser; pub use subconscious as agents; pub use subconscious::{ audit, consolidate, - digest, daemon, + digest, }; diff --git a/src/main.rs b/src/main.rs index d228a74..0fde522 100644 --- a/src/main.rs +++ b/src/main.rs @@ -439,45 +439,8 @@ enum GraphCmd { }, } -#[derive(Subcommand)] -enum DaemonCmd { - /// Start the daemon (default) - Start, - /// Show daemon status - Status, - /// Show daemon log - Log { - /// Job name to filter by - job: Option, - /// Tail a task's log file (drill down from daemon log) - #[arg(long)] - task: Option, - /// Number of lines to show - #[arg(long, default_value_t = 20)] - lines: usize, - }, - /// Trigger consolidation via daemon - Consolidate, - /// Run an agent via the daemon - Run { - /// Agent name (e.g. organize, replay, linker) - #[arg(default_value = "replay")] - agent: String, - /// Batch size - #[arg(default_value_t = 1)] - count: usize, - }, - /// Interactive TUI - Tui, - /// Reload config file without restarting - ReloadConfig, -} - #[derive(Subcommand)] enum AgentCmd { - /// Background job daemon - #[command(subcommand)] - Daemon(DaemonCmd), /// Run knowledge agents to convergence #[command(name = "knowledge-loop")] KnowledgeLoop { @@ -859,35 +822,9 @@ impl Run for CursorCmd { } } -impl Run for DaemonCmd { - fn run(self) -> Result<(), String> { - match self { - Self::Start => daemon::run_daemon(), - Self::Status => daemon::show_status(), - Self::Log { job, task, lines } => { - if let Some(ref task_name) = task { - daemon::show_task_log(task_name, lines) - } else { - daemon::show_log(job.as_deref(), lines) - } - } - Self::Consolidate => daemon::rpc_consolidate(), - Self::Run { agent, count } => daemon::rpc_run_agent(&agent, count), - Self::Tui => Err("TUI moved to consciousness binary (F4/F5)".into()), - Self::ReloadConfig => { - match daemon::send_rpc_pub("reload-config") { - Some(resp) => { eprintln!("{}", resp.trim()); Ok(()) } - None => Err("daemon not running".into()), - } - } - } - } -} - impl Run for AgentCmd { fn run(self) -> Result<(), String> { match self { - Self::Daemon(sub) => sub.run(), Self::KnowledgeLoop { max_cycles, batch_size, window, max_depth } => cli::agent::cmd_knowledge_loop(max_cycles, batch_size, window, max_depth), Self::ConsolidateBatch { count, auto, agent } diff --git a/src/subconscious/daemon.rs b/src/subconscious/daemon.rs index 9e70581..791cd64 100644 --- a/src/subconscious/daemon.rs +++ b/src/subconscious/daemon.rs @@ -1,370 +1,26 @@ -// poc-memory daemon: background job orchestration for memory maintenance +// daemon.rs — graph health metrics // -// Replaces the fragile cron+shell approach with a single long-running process -// that owns all background memory work. Uses jobkit for worker pool, status -// tracking, retry, and cancellation. -// -// Architecture: -// - Scheduler task: runs every 60s, scans filesystem state, spawns jobs -// - Session watcher task: detects ended Claude sessions, triggers extraction -// - Jobs shell out to existing poc-memory subcommands (Phase 1) -// - Status written to daemon-status.json for `poc-memory daemon status` -// -// Phase 2 will inline job logic; Phase 3 integrates into poc-agent. +// Compute graph health statistics for the TUI (F4 hippocampus screen). +// The background daemon and RPC infrastructure have been removed; +// graph maintenance agents now run within the consciousness binary. -use jobkit::{Choir, ExecutionContext, TaskError, TaskInfo, TaskStatus}; -use std::fs; -use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; -use std::time::{Duration, SystemTime}; - -const SCHEDULER_INTERVAL: Duration = Duration::from_secs(60); -const HEALTH_INTERVAL: Duration = Duration::from_secs(3600); - -// --- Persistent task queue --- - -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -struct PendingTask { - id: String, - agent: String, - batch: usize, +/// Graph health snapshot for the hippocampus TUI screen. +#[derive(Clone, Default, serde::Serialize, serde::Deserialize)] +pub struct GraphHealth { + pub nodes: usize, + pub edges: usize, + pub communities: usize, + pub alpha: f32, // power-law exponent (target ≥2.5) + pub gini: f32, // degree inequality (target ≤0.4) + pub avg_cc: f32, // clustering coefficient (target ≥0.2) + pub sigma: f32, // small-world sigma + pub episodic_ratio: f32, // episodic/total nodes (target <0.4) + pub interference: usize, // interfering pairs (target <50) + // Consolidation work estimate from plan #[serde(default)] - target_key: Option, -} - -struct TaskQueue { - path: PathBuf, - tasks: Mutex>, -} - -impl TaskQueue { - fn load(data_dir: &Path) -> Arc { - let path = data_dir.join("pending-tasks.jsonl"); - let tasks = if path.exists() { - fs::read_to_string(&path) - .unwrap_or_default() - .lines() - .filter_map(|l| serde_json::from_str(l).ok()) - .collect() - } else { - Vec::new() - }; - let count = tasks.len(); - if count > 0 { - log_event("task-queue", "loaded", &format!("{} pending tasks", count)); - } - Arc::new(Self { path, tasks: Mutex::new(tasks) }) - } - - fn push(&self, task: PendingTask) { - let mut tasks = self.tasks.lock().unwrap(); - tasks.push(task); - self.write_locked(&tasks); - } - - fn remove(&self, id: &str) { - let mut tasks = self.tasks.lock().unwrap(); - tasks.retain(|t| t.id != id); - self.write_locked(&tasks); - } - - fn drain(&self) -> Vec { - let tasks = self.tasks.lock().unwrap(); - tasks.clone() - } - - fn write_locked(&self, tasks: &[PendingTask]) { - let content: String = tasks.iter() - .filter_map(|t| serde_json::to_string(t).ok()) - .collect::>() - .join("\n"); - let _ = fs::write(&self.path, if content.is_empty() { String::new() } else { content + "\n" }); - } -} -fn logs_dir() -> PathBuf { - let dir = dirs::home_dir().unwrap_or_default().join(".consciousness/logs"); - let _ = fs::create_dir_all(&dir); - dir -} - -fn log_path() -> PathBuf { - logs_dir().join("daemon.log") -} - -fn log_event(job: &str, event: &str, detail: &str) { - jobkit::daemon::event_log::log(&logs_dir(), job, event, detail); -} - -// --- Job functions (direct, no subprocess) --- - -static DAEMON_POOL: std::sync::OnceLock> = std::sync::OnceLock::new(); - -/// Run a named job with logging, progress reporting, and error mapping. -fn run_job(ctx: &ExecutionContext, name: &str, f: impl FnOnce() -> Result<(), String>) -> Result<(), TaskError> { - let pool = DAEMON_POOL.get().map(|p| p.as_ref()); - jobkit::daemon::Daemon::run_job(&crate::config::get().data_dir, ctx, name, pool, f) -} - -// experience_mine and fact_mine removed — observation.agent handles all transcript mining - -/// Run an agent targeted at a specific node key. -fn job_targeted_agent( - ctx: &ExecutionContext, - agent_type: &str, - target_key: &str, -) -> Result<(), TaskError> { - let agent = agent_type.to_string(); - let key = target_key.to_string(); - let job_name = format!("c-{}-target({})", agent, key); - run_job(ctx, &job_name, || { - let mut store = crate::store::Store::load()?; - ctx.log_line(format!("targeting: {}", key)); - crate::agent::oneshot::run_one_agent( - &mut store, &agent, 5, Some(std::slice::from_ref(&key)), - )?; - ctx.log_line("done"); - Ok(()) - }) -} - -/// Run a single consolidation agent (replay, linker, separator, transfer, health). -/// Shared set of node keys currently being processed by agents. -/// Prevents concurrent agents from working on overlapping graph regions. -type InFlightNodes = Arc>>; - -fn job_consolidation_agent( - ctx: &ExecutionContext, - agent_type: &str, - batch_size: usize, - in_flight: &InFlightNodes, -) -> Result<(), TaskError> { - let agent = agent_type.to_string(); - let batch = batch_size; - let job_name = format!("c-{}", agent); - let in_flight = Arc::clone(in_flight); - run_job(ctx, &job_name, || { - ctx.log_line("loading store"); - let mut store = crate::store::Store::load()?; - - // Claim seeds: lock in-flight set, run query excluding it, - // add selected seeds + strongly-connected neighbors, then unlock. - let mut claimed_keys: Vec; - let graph = store.build_graph(); - { - let mut locked = in_flight.lock().unwrap(); - ctx.log_line(format!("running agent: {} (batch={}, {} in-flight)", - agent, batch, locked.len())); - - // Run the agent's query, filtering out in-flight nodes - let def = super::defs::get_def(&agent) - .ok_or_else(|| format!("no .agent file for {}", agent))?; - let query = &def.query; - let keys = if !query.is_empty() { - use crate::query::engine as search; - let mut stages = search::Stage::parse_pipeline(query)?; - let padded = batch + locked.len().min(100); - if !stages.iter().any(|s| matches!(s, search::Stage::Transform(search::Transform::Limit(_)))) { - stages.push(search::Stage::Transform(search::Transform::Limit(padded))); - } - let results = search::run_query(&stages, vec![], &graph, &store, false, padded); - results.into_iter() - .map(|(k, _)| k) - .filter(|k| !locked.contains(k)) - .take(batch) - .collect::>() - } else { - vec![] - }; - if keys.is_empty() { - return Err("query returned no results (after exclusion)".into()); - } - - // Claim seeds + strongly-connected neighbors. - // Only exclude neighbors with score > threshold to avoid - // blacking out the graph via high-degree hub nodes. - claimed_keys = Vec::with_capacity(batch * 20); - for key in &keys { - claimed_keys.push(key.clone()); - locked.insert(key.clone()); - for (nbr, strength) in graph.neighbors(key) { - let weight = store.nodes.get(nbr.as_str()) - .map(|n| n.weight).unwrap_or(0.1); - if strength * weight > 0.3 { - claimed_keys.push(nbr.clone()); - locked.insert(nbr.clone()); - } - } - } - } - // in_flight lock released — run LLM without holding it - - // Use run_one_agent_with_keys — we already selected seeds above, - // no need to re-run the query. - let result = crate::agent::oneshot::run_one_agent( - &mut store, &agent, batch, Some(&claimed_keys), - ).map(|_| ()); - - // Release all claimed keys (seeds + neighbors) - { - let mut locked = in_flight.lock().unwrap(); - for key in &claimed_keys { - locked.remove(key); - } - } - - result?; - ctx.log_line("done"); - Ok(()) - }) -} - -/// Run the rename agent: generates renames via LLM, applies them directly. -fn job_rename_agent( - ctx: &ExecutionContext, - batch_size: usize, -) -> Result<(), TaskError> { - run_job(ctx, "c-rename", || { - ctx.log_line("loading store"); - let mut store = crate::store::Store::load()?; - - let batch = if batch_size == 0 { 10 } else { batch_size }; - ctx.log_line(format!("running rename agent (batch={})", batch)); - - let result = crate::agent::oneshot::run_one_agent(&mut store, "rename", batch, None)?; - - // Parse RENAME actions from response (rename uses its own format, not WRITE_NODE/LINK/REFINE) - let mut applied = 0; - let mut skipped = 0; - for line in result.output.lines() { - let trimmed = line.trim(); - if !trimmed.starts_with("RENAME ") { continue; } - - let parts: Vec<&str> = trimmed[7..].splitn(2, ' ').collect(); - if parts.len() != 2 { skipped += 1; continue; } - - let old_key = parts[0].trim(); - let new_key = parts[1].trim(); - if old_key.is_empty() || new_key.is_empty() { skipped += 1; continue; } - - let resolved = match store.resolve_key(old_key) { - Ok(k) => k, - Err(e) => { - ctx.log_line(format!("skip: {} → {}: {}", old_key, new_key, e)); - skipped += 1; - continue; - } - }; - - if store.nodes.contains_key(new_key) { - ctx.log_line(format!("skip: {} already exists", new_key)); - skipped += 1; - continue; - } - - match store.rename_node(&resolved, new_key) { - Ok(()) => { - ctx.log_line(format!("renamed: {} → {}", resolved, new_key)); - applied += 1; - } - Err(e) => { - ctx.log_line(format!("error: {} → {}: {}", resolved, new_key, e)); - skipped += 1; - } - } - } - - if applied > 0 { - store.save()?; - } - - ctx.log_line(format!("done: {} applied, {} skipped", applied, skipped)); - Ok(()) - }) -} - -/// Link orphan nodes (CPU-heavy, no LLM). -fn job_link_orphans(ctx: &ExecutionContext) -> Result<(), TaskError> { - run_job(ctx, "c-orphans", || { - ctx.log_line("loading store"); - let mut store = crate::store::Store::load()?; - ctx.log_line("linking orphans"); - let (orphans, added) = crate::neuro::link_orphans(&mut store, 2, 3, 0.15); - ctx.log_line(format!("{} orphans, {} links added", orphans, added)); - Ok(()) - }) -} - -/// Cap node degree to prevent mega-hubs. -fn job_cap_degree(ctx: &ExecutionContext) -> Result<(), TaskError> { - run_job(ctx, "c-cap", || { - ctx.log_line("loading store"); - let mut store = crate::store::Store::load()?; - ctx.log_line("capping degree"); - match store.cap_degree(50) { - Ok((hubs, pruned)) => { - store.save()?; - ctx.log_line(format!("{} hubs capped, {} edges pruned", hubs, pruned)); - Ok(()) - } - Err(e) => Err(e), - } - }) -} - -/// Apply links extracted from digests. -fn job_digest_links(ctx: &ExecutionContext) -> Result<(), TaskError> { - run_job(ctx, "c-digest-links", || { - ctx.log_line("loading store"); - let mut store = crate::store::Store::load()?; - ctx.log_line("applying digest links"); - let links = super::digest::parse_all_digest_links(&store); - let (applied, skipped, fallbacks) = super::digest::apply_digest_links(&mut store, &links); - store.save()?; - ctx.log_line(format!("{} applied, {} skipped, {} fallbacks", applied, skipped, fallbacks)); - Ok(()) - }) -} - -fn job_knowledge_loop(_ctx: &ExecutionContext) -> Result<(), TaskError> { - // Knowledge loop removed — agents now use tool calls directly. - // Consolidation is handled by consolidate_full() in the consolidate job. - Ok(()) -} - -fn job_digest(ctx: &ExecutionContext) -> Result<(), TaskError> { - run_job(ctx, "digest", || { - ctx.log_line("loading store"); - let mut store = crate::store::Store::load()?; - ctx.log_line("generating digests"); - crate::digest::digest_auto(&mut store) - }) -} - -fn job_daily_check( - ctx: &ExecutionContext, - graph_health: &Arc>>, -) -> Result<(), TaskError> { - let gh = Arc::clone(graph_health); - run_job(ctx, "daily-check", || { - ctx.log_line("loading store"); - let store = crate::store::Store::load()?; - ctx.log_line("checking health"); - let _report = crate::neuro::daily_check(&store); - - // Decay search hit counters (10% daily decay) - ctx.log_line("decaying search counters"); - match crate::counters::decay_all(0.9) { - Ok(removed) => ctx.log_line(format!("decayed counters, removed {}", removed)), - Err(e) => ctx.log_line(format!("counter decay failed: {}", e)), - } - - // Compute graph health metrics for status display - ctx.log_line("computing graph health"); - let health = compute_graph_health(&store); - *gh.lock().unwrap() = Some(health); - - Ok(()) - }) + pub plan_counts: std::collections::HashMap, + pub plan_rationale: Vec, + pub computed_at: String, } pub fn compute_graph_health(store: &crate::store::Store) -> GraphHealth { @@ -395,862 +51,3 @@ pub fn compute_graph_health(store: &crate::store::Store) -> GraphHealth { computed_at: crate::store::format_datetime_space(crate::store::now_epoch()), } } - -/// Get process uptime as human-readable string by reading /proc//stat. -fn proc_uptime(pid: u32) -> Option { - // /proc//stat field 22 (1-indexed) is start time in clock ticks - let stat = fs::read_to_string(format!("/proc/{}/stat", pid)).ok()?; - // Fields after comm (which may contain spaces/parens) — find closing paren - let after_comm = stat.get(stat.rfind(')')? + 2..)?; - let fields: Vec<&str> = after_comm.split_whitespace().collect(); - // Field 22 in stat is index 19 after comm (fields[0] = state, field 22 = starttime = index 19) - let start_ticks: u64 = fields.get(19)?.parse().ok()?; - let ticks_per_sec = unsafe { libc::sysconf(libc::_SC_CLK_TCK) } as u64; - let boot_time_secs = { - let uptime = fs::read_to_string("/proc/uptime").ok()?; - let sys_uptime: f64 = uptime.split_whitespace().next()?.parse().ok()?; - let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).ok()?.as_secs(); - now - sys_uptime as u64 - }; - let start_secs = boot_time_secs + start_ticks / ticks_per_sec; - let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).ok()?.as_secs(); - let uptime = now.saturating_sub(start_secs); - - Some(format_duration_human(uptime as u128 * 1000)) -} - -// --- Status writing --- - -fn write_status( - choir: &Choir, - last_daily: Option, - graph_health: &Arc>>, -) { - let status = build_status(choir, last_daily, graph_health); - jobkit::daemon::status::write(&crate::config::get().data_dir, &status); -} - -#[derive(Clone, Default, serde::Serialize, serde::Deserialize)] -pub struct GraphHealth { - pub nodes: usize, - pub edges: usize, - pub communities: usize, - pub alpha: f32, // power-law exponent (target ≥2.5) - pub gini: f32, // degree inequality (target ≤0.4) - pub avg_cc: f32, // clustering coefficient (target ≥0.2) - pub sigma: f32, // small-world sigma - pub episodic_ratio: f32, // episodic/total nodes (target <0.4) - pub interference: usize, // interfering pairs (target <50) - // Consolidation work estimate from plan - #[serde(default)] - pub plan_counts: std::collections::HashMap, - pub plan_rationale: Vec, - pub computed_at: String, -} - -#[derive(serde::Serialize, serde::Deserialize)] -struct DaemonStatus { - pid: u32, - tasks: Vec, - #[serde(default)] - last_daily: Option, - #[serde(default)] - graph_health: Option, -} - -// --- The daemon --- - -pub fn run_daemon() -> Result<(), String> { - let config = crate::config::get(); - let mut daemon = jobkit::daemon::Daemon::new(jobkit::daemon::DaemonConfig { - data_dir: config.data_dir.clone(), - resource_slots: config.llm_concurrency, - resource_name: "llm".to_string(), - }); - - let choir = Arc::clone(&daemon.choir); - let llm = Arc::clone(&daemon.resource); - let _ = DAEMON_POOL.set(Arc::clone(&llm)); - let task_log_dir = logs_dir().join("daemon"); - let _ = fs::create_dir_all(&task_log_dir); - - // Enable verbose logging if POC_MEMORY_VERBOSE is set - if std::env::var("POC_MEMORY_VERBOSE").is_ok() { - jobkit::daemon::event_log::set_level(jobkit::daemon::event_log::LogLevel::Verbose); - } - - // Recover last_daily from previous status file - let last_daily: Arc>> = Arc::new(Mutex::new( - jobkit::daemon::status::load::(&config.data_dir) - .and_then(|s| s.last_daily) - .and_then(|d| d.parse().ok()) - )); - - let graph_health: Arc>> = Arc::new(Mutex::new(None)); - - // Persistent task queue — survives daemon restarts - let task_queue = TaskQueue::load(&config.data_dir); - - // Nodes currently being processed by agents — prevents concurrent - // agents from working on overlapping graph regions. - let in_flight: InFlightNodes = Arc::new(Mutex::new(std::collections::HashSet::new())); - - log_event("daemon", "started", &format!("pid {}", std::process::id())); - eprintln!("poc-memory daemon started (pid {})", std::process::id()); - - // Recover pending tasks from previous run - { - let recovered = task_queue.drain(); - if !recovered.is_empty() { - log_event("task-queue", "recovering", &format!("{} tasks", recovered.len())); - for pt in &recovered { - let agent = pt.agent.clone(); - let b = pt.batch; - let task_id = pt.id.clone(); - let in_flight_clone = Arc::clone(&in_flight); - let queue_clone = Arc::clone(&task_queue); - choir.spawn(pt.id.clone()) - .resource(&llm) - .log_dir(&task_log_dir) - .retries(1) - .init(move |ctx| { - let result = job_consolidation_agent(ctx, &agent, b, &in_flight_clone); - queue_clone.remove(&task_id); - result - }); - // Drop schedules via IdleTask::Drop - } - } - } - - // Write initial status - write_status(&choir, *last_daily.lock().unwrap(), &graph_health); - - - // Scheduler: runs daily jobs based on filesystem state - let choir_sched = Arc::clone(&choir); - let llm_sched = Arc::clone(&llm); - let last_daily_sched = Arc::clone(&last_daily); - let graph_health_sched = Arc::clone(&graph_health); - let in_flight_sched = Arc::clone(&in_flight); - let log_dir_sched = task_log_dir.clone(); - let queue_sched = Arc::clone(&task_queue); - const CONSOLIDATION_INTERVAL: Duration = Duration::from_secs(6 * 3600); // 6 hours - - choir.spawn("scheduler").init(move |ctx| { - let mut last_health = std::time::Instant::now() - HEALTH_INTERVAL; - let mut last_consolidation = std::time::Instant::now() - CONSOLIDATION_INTERVAL; // run on first tick - ctx.set_progress("idle"); - - loop { - if ctx.is_cancelled() { - return Err(TaskError::Fatal("cancelled".into())); - } - - let today = chrono::Local::now().date_naive(); - - // Health check: every hour — also updates graph health metrics - if last_health.elapsed() >= HEALTH_INTERVAL { - let gh = Arc::clone(&graph_health_sched); - choir_sched.spawn("health").init(move |ctx| { - job_daily_check(ctx, &gh) - }); - last_health = std::time::Instant::now(); - } - - // Consolidation cycle: every 6 hours (wait for health check to cache metrics first) - let gh = graph_health_sched.lock().unwrap().clone(); - let Some(h) = gh.as_ref() else { continue }; - if last_consolidation.elapsed() >= CONSOLIDATION_INTERVAL { - log_event("scheduler", "consolidation-trigger", - &format!("{} (every 6h)", today)); - - // Use cached graph health plan (from consolidation_plan_quick). - let plan = crate::neuro::ConsolidationPlan { - counts: h.plan_counts.clone(), - run_health: true, - rationale: Vec::new(), - }; - let runs = plan.to_agent_runs(5); - - let summary: Vec = h.plan_counts.iter() - .filter(|(_, c)| **c > 0) - .map(|(a, c)| format!("{}{}", &a[..1], c)) - .collect(); - log_event("scheduler", "consolidation-plan", - &format!("{} agents ({})", runs.len(), summary.join(" "))); - - // Phase 1: Agent runs — all concurrent, in-flight exclusion - // prevents overlapping graph regions. - let mut all_tasks: Vec = Vec::new(); - for (i, (agent_type, batch)) in runs.iter().enumerate() { - let agent = agent_type.to_string(); - let b = *batch; - let in_flight_clone = Arc::clone(&in_flight_sched); - let task_name = format!("c-{}-{}:{}", agent, i, today); - let task_id = task_name.clone(); - let queue_clone = Arc::clone(&queue_sched); - queue_sched.push(PendingTask { - id: task_id.clone(), - agent: agent.clone(), - batch: b, - target_key: None, - }); - let task = choir_sched.spawn(task_name) - .resource(&llm_sched) - .log_dir(&log_dir_sched) - .retries(1) - .init(move |ctx| { - let result = job_consolidation_agent(ctx, &agent, b, &in_flight_clone); - queue_clone.remove(&task_id); - result - }) - .run(); - all_tasks.push(task); - } - // Orphans phase depends on all agent tasks completing - let prev_agent = all_tasks.last().cloned(); - - // Phase 2: Link orphans (CPU-only, no LLM) - let mut orphans = choir_sched.spawn(format!("c-orphans:{}", today)) - .retries(1) - .init(job_link_orphans); - if let Some(ref dep) = prev_agent { - orphans.depend_on(dep); - } - let orphans = orphans.run(); - - // Phase 3: Cap degree - let mut cap = choir_sched.spawn(format!("c-cap:{}", today)) - .retries(1) - .init(job_cap_degree); - cap.depend_on(&orphans); - let cap = cap.run(); - - // Phase 4: Generate digests - let mut digest = choir_sched.spawn(format!("c-digest:{}", today)) - .resource(&llm_sched) - .retries(1) - .init(job_digest); - digest.depend_on(&cap); - let digest = digest.run(); - - // Phase 5: Apply digest links - let mut digest_links = choir_sched.spawn(format!("c-digest-links:{}", today)) - .retries(1) - .init(job_digest_links); - digest_links.depend_on(&digest); - let digest_links = digest_links.run(); - - // Phase 7: Knowledge loop - let mut knowledge = choir_sched.spawn(format!("c-knowledge:{}", today)) - .resource(&llm_sched) - .retries(1) - .init(job_knowledge_loop); - knowledge.depend_on(&digest_links); - - *last_daily_sched.lock().unwrap() = Some(today); - last_consolidation = std::time::Instant::now(); - ctx.set_progress(format!("daily pipeline triggered ({today})")); - } - - // Prune finished tasks from registry - let pruned = choir_sched.gc_finished(); - if pruned > 0 { - log::trace!("pruned {} finished tasks", pruned); - } - - write_status(&choir_sched, *last_daily_sched.lock().unwrap(), &graph_health_sched); - std::thread::sleep(SCHEDULER_INTERVAL); - } - }); - - // Register RPC handlers - { - let last_daily_rpc = Arc::clone(&last_daily); - daemon.add_rpc_handler(move |cmd, _ctx| { - if cmd == "consolidate" { - *last_daily_rpc.lock().unwrap() = None; - log_event("rpc", "consolidate", "triggered via socket"); - Some("{\"ok\":true,\"action\":\"consolidation scheduled\"}\n".into()) - } else { - None - } - }); - } - - daemon.add_rpc_handler(|cmd, _ctx| { - if cmd != "reload-config" { return None; } - let changed = crate::config::reload(); - let config = crate::config::get(); - let api = config.api_base_url.as_deref().unwrap_or("(none)"); - let model = config.api_model.as_deref().unwrap_or("(default)"); - log_event("daemon", "config-reload", - &format!("changed={}, api={}, model={}", changed, api, model)); - Some(format!("{{\"ok\":true,\"changed\":{},\"api_base_url\":\"{}\",\"api_model\":\"{}\"}}\n", - changed, api, model)) - }); - - daemon.add_rpc_handler(|cmd, _ctx| { - if !cmd.starts_with("record-hits ") { return None; } - let keys: Vec<&str> = cmd.strip_prefix("record-hits ") - .unwrap_or("") - .split('\t') - .filter(|k| !k.is_empty()) - .collect(); - if keys.is_empty() { - return Some("{\"ok\":false,\"error\":\"no keys\"}\n".into()); - } - let n = keys.len(); - match crate::counters::record_search_hits(&keys) { - Ok(()) => Some(format!("{{\"ok\":true,\"recorded\":{}}}\n", n)), - Err(e) => Some(format!("{{\"ok\":false,\"error\":\"{}\"}}\n", e.replace('"', "'"))), - } - }); - - { - let choir_rpc = Arc::clone(&choir); - let llm_rpc = Arc::clone(&llm); - let log_dir_rpc = task_log_dir.clone(); - let in_flight_rpc = Arc::clone(&in_flight); - daemon.add_rpc_handler(move |cmd, _ctx| { - if !cmd.starts_with("run-agent ") { return None; } - let parts: Vec<&str> = cmd.splitn(4, ' ').collect(); - let agent_type = parts.get(1).unwrap_or(&"replay"); - let count: usize = parts.get(2) - .and_then(|s| s.parse().ok()) - .unwrap_or(1); - // Optional target key: "run-agent linker 1 target:KEY" - let target_key: Option = parts.get(3) - .and_then(|s| s.strip_prefix("target:")) - .map(|s| s.to_string()); - let batch_size = 5; - let today = chrono::Local::now().format("%Y-%m-%d"); - let ts = chrono::Local::now().format("%H%M%S"); - let mut prev = None; - let mut spawned = 0; - let mut remaining = count; - - let is_rename = *agent_type == "rename"; - - // Targeted run: one task for a specific node - if let Some(ref key) = target_key { - let agent = agent_type.to_string(); - let key = key.clone(); - let task_name = format!("c-{}-{}:{}", agent, key.chars().take(30).collect::(), today); - if jobkit::daemon::event_log::enabled(jobkit::daemon::event_log::LogLevel::Verbose) { - log_event("daemon", "spawn-targeted", - &format!("{} (pool: {}/{})", task_name, llm_rpc.available(), llm_rpc.capacity())); - } - choir_rpc.spawn(task_name) - .resource(&llm_rpc) - .log_dir(&log_dir_rpc) - .retries(1) - .init(move |ctx| { - job_targeted_agent(ctx, &agent, &key) - }) - .run(); - spawned = 1; - remaining = 0; - } - - while remaining > 0 { - let batch = remaining.min(batch_size); - let agent = agent_type.to_string(); - let in_flight_clone = Arc::clone(&in_flight_rpc); - let task_name = format!("c-{}-rpc{}:{}", agent, ts, today); - let mut builder = choir_rpc.spawn(task_name) - .resource(&llm_rpc) - .log_dir(&log_dir_rpc) - .retries(1) - .init(move |ctx| { - if is_rename { - job_rename_agent(ctx, batch) - } else { - job_consolidation_agent(ctx, &agent, batch, &in_flight_clone) - } - }); - if let Some(ref dep) = prev { - builder.depend_on(dep); - } - prev = Some(builder.run()); - remaining -= batch; - spawned += 1; - } - - log_event("rpc", "run-agent", &format!("{} x{}", agent_type, count)); - Some(format!("{{\"ok\":true,\"action\":\"queued {} {} run(s) ({} tasks)\"}}\n", - count, agent_type, spawned)) - }); - } - - // Main thread: socket server + signal handling - let last_daily_status = Arc::clone(&last_daily); - let graph_health_status = Arc::clone(&graph_health); - daemon.run(move |ctx| { - build_status(&ctx.choir, *last_daily_status.lock().unwrap(), &graph_health_status) - }); - - log_event("daemon", "stopping", ""); - eprintln!("Shutting down..."); - - log_event("daemon", "stopped", ""); - std::process::exit(0) -} - -pub fn send_rpc_pub(cmd: &str) -> Option { - send_rpc(cmd) -} - -fn send_rpc(cmd: &str) -> Option { - jobkit::daemon::socket::send_rpc(&crate::config::get().data_dir, cmd) -} - -pub fn rpc_consolidate() -> Result<(), String> { - match send_rpc("consolidate") { - Some(resp) => { - println!("{}", resp.trim()); - Ok(()) - } - None => Err("Daemon not running.".into()), - } -} - -pub fn rpc_run_agent(agent: &str, count: usize) -> Result<(), String> { - let cmd = format!("run-agent {} {}", agent, count); - match send_rpc(&cmd) { - Some(resp) => { - println!("{}", resp.trim()); - Ok(()) - } - None => Err("Daemon not running.".into()), - } -} - -fn read_status_socket() -> Option { - let json = jobkit::daemon::socket::send_rpc(&crate::config::get().data_dir, "")?; - serde_json::from_str(&json).ok() -} - -// status_socket_loop has been replaced by daemon.run() in jobkit-daemon. - -fn build_status( - choir: &Choir, - last_daily: Option, - graph_health: &Arc>>, -) -> DaemonStatus { - DaemonStatus { - pid: std::process::id(), - tasks: choir.task_statuses(), - last_daily: last_daily.map(|d| d.to_string()), - graph_health: graph_health.lock().unwrap().clone(), - } -} - -// --- Status display --- - -fn format_duration_human(ms: u128) -> String { - if ms < 1_000 { - format!("{}ms", ms) - } else if ms < 60_000 { - format!("{:.1}s", ms as f64 / 1000.0) - } else if ms < 3_600_000 { - format!("{:.0}m{:.0}s", ms / 60_000, (ms % 60_000) / 1000) - } else { - format!("{:.0}h{:.0}m", ms / 3_600_000, (ms % 3_600_000) / 60_000) - } -} - -fn task_group(name: &str) -> &str { - if name == "scheduler" { "core" } - else if name.starts_with("c-") || name.starts_with("consolidate:") - || name.starts_with("knowledge-loop:") || name.starts_with("digest:") - || name.starts_with("decay:") { "daily" } - else if name == "health" { "health" } - else { "other" } -} - -/// Compute elapsed time for a task, using absolute started_at if available. -fn task_elapsed(t: &TaskInfo) -> Duration { - if matches!(t.status, TaskStatus::Running) { - if let Some(started) = t.started_at { - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or_default() - .as_secs_f64(); - Duration::from_secs_f64((now - started).max(0.0)) - } else { - t.elapsed - } - } else { - t.result.as_ref() - .map(|r| r.duration) - .unwrap_or(t.elapsed) - } -} - -fn status_symbol(t: &TaskInfo) -> &'static str { - if t.cancelled { return "✗" } - match t.status { - TaskStatus::Running => "▶", - TaskStatus::Completed => "✓", - TaskStatus::Failed => "✗", - TaskStatus::Pending => "·", - } -} - -/// Shorten a job name for display: "experience-mine /long/path/uuid.jsonl" → "experience-mine uuid…" -fn short_job_name(job: &str) -> String { - // Split "verb path" or just return as-is - if let Some((verb, path)) = job.split_once(' ') { - let file = path.rsplit('/').next().unwrap_or(path); - let file = file.strip_suffix(".jsonl").unwrap_or(file); - let short = if file.len() > 12 { &file[..12] } else { file }; - format!("{} {}", verb, short) - } else { - job.to_string() - } -} - -fn show_recent_completions(n: usize) { - let path = log_path(); - let content = match fs::read_to_string(&path) { - Ok(c) => c, - Err(_) => return, - }; - - let recent: Vec<&str> = content.lines().rev() - .filter(|line| { - line.contains("\"event\":\"completed\"") || line.contains("\"event\":\"failed\"") - }) - .take(n) - .collect(); - - if recent.is_empty() { return; } - - eprintln!(" Recent:"); - for line in recent.iter().rev() { - if let Ok(obj) = serde_json::from_str::(line) { - let ts = obj.get("ts").and_then(|v| v.as_str()).unwrap_or("?"); - let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("?"); - let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("?"); - let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or(""); - - let time = if ts.len() >= 19 { &ts[11..19] } else { ts }; - let sym = if event == "completed" { "✓" } else { "✗" }; - let name = short_job_name(job); - - eprintln!(" {} {} {:30} {}", sym, time, name, detail); - } - } - eprintln!(); -} - -pub fn show_status() -> Result<(), String> { - let status = match read_status_socket() { - Some(s) => s, - None => { - eprintln!("Daemon not running."); - return Ok(()); - } - }; - - let uptime_str = proc_uptime(status.pid).unwrap_or_default(); - if uptime_str.is_empty() { - eprintln!("poc-memory daemon pid={}", status.pid); - } else { - eprintln!("poc-memory daemon pid={} uptime {}", status.pid, uptime_str); - } - - if status.tasks.is_empty() { - eprintln!("\n No tasks"); - return Ok(()); - } - - // Count by status - let running = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)).count(); - let pending = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count(); - let completed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count(); - let failed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count(); - eprintln!(" tasks: {} running, {} pending, {} done, {} failed", - running, pending, completed, failed); - - // Graph health - if let Some(ref gh) = status.graph_health { - eprintln!(); - fn indicator(val: f32, target: f32, higher_is_better: bool) -> &'static str { - let ok = if higher_is_better { val >= target } else { val <= target }; - if ok { "✓" } else { "✗" } - } - eprintln!(" Graph health ({})", gh.computed_at); - eprintln!(" {} nodes, {} edges, {} communities", - gh.nodes, gh.edges, gh.communities); - eprintln!(" {} α={:.2} (≥2.5) {} gini={:.3} (≤0.4) {} cc={:.3} (≥0.2)", - indicator(gh.alpha, 2.5, true), gh.alpha, - indicator(gh.gini, 0.4, false), gh.gini, - indicator(gh.avg_cc, 0.2, true), gh.avg_cc); - eprintln!(" {} episodic={:.0}% (<40%) σ={:.1}", - indicator(gh.episodic_ratio, 0.4, false), gh.episodic_ratio * 100.0, - gh.sigma); - - let plan_total: usize = gh.plan_counts.values().sum::() + 1; - let plan_summary: Vec = gh.plan_counts.iter() - .filter(|(_, c)| **c > 0) - .map(|(a, c)| format!("{}{}", &a[..1], c)) - .collect(); - eprintln!(" consolidation plan: {} agents ({} +health)", - plan_total, plan_summary.join(" ")); - } - eprintln!(); - - // Group and display - let groups: &[(&str, &str)] = &[ - ("core", "Core"), - ("daily", "Daily pipeline"), - ("extract", "Session extraction"), - ("health", "Health"), - ("other", "Other"), - ]; - - // In-flight tasks first (running + pending) - let in_flight: Vec<&TaskInfo> = status.tasks.iter() - .filter(|t| matches!(t.status, TaskStatus::Running | TaskStatus::Pending)) - .collect(); - - if !in_flight.is_empty() { - eprintln!(" In flight:"); - for t in &in_flight { - let sym = status_symbol(t); - let e = task_elapsed(t); - let elapsed = if !e.is_zero() { - format!(" {}", format_duration_human(e.as_millis())) - } else { - String::new() - }; - let progress = t.progress.as_deref() - .filter(|p| *p != "idle") - .map(|p| format!(" {}", p)) - .unwrap_or_default(); - let name = short_job_name(&t.name); - eprintln!(" {} {:30}{}{}", sym, name, elapsed, progress); - if let Some(ref lp) = t.log_path { - // tail from log file - if matches!(t.status, TaskStatus::Running) { - eprintln!(" │ log: {}", lp); - } - } - } - eprintln!(); - } - - // Recent completions from log file - show_recent_completions(20); - - // Detailed group view only if there are failures worth showing - for (group_id, group_label) in groups { - let tasks: Vec<&TaskInfo> = status.tasks.iter() - .filter(|t| task_group(&t.name) == *group_id) - .collect(); - if tasks.is_empty() { continue; } - - // For extract group, show summary instead of individual tasks - if *group_id == "extract" { - let n_pending = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count(); - let n_running = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)).count(); - let n_done = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count(); - let n_failed = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count(); - eprintln!(" {} ({} total)", group_label, tasks.len()); - - if n_running > 0 { - for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)) { - let e = task_elapsed(t); - let elapsed = if !e.is_zero() { - format!(" ({})", format_duration_human(e.as_millis())) - } else { - String::new() - }; - let progress = t.progress.as_deref().map(|p| format!(" {}", p)).unwrap_or_default(); - eprintln!(" {} {}{}{}", status_symbol(t), t.name, elapsed, progress); - if let Some(ref lp) = t.log_path { - eprintln!(" │ log: {}", lp); - } - } - } - let mut parts = Vec::new(); - if n_done > 0 { parts.push(format!("{} done", n_done)); } - if n_running > 0 { parts.push(format!("{} running", n_running)); } - if n_pending > 0 { parts.push(format!("{} queued", n_pending)); } - if n_failed > 0 { parts.push(format!("{} FAILED", n_failed)); } - eprintln!(" {}", parts.join(", ")); - - // Show recent failures - for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).take(3) { - if let Some(ref err) = t.result.as_ref().and_then(|r| r.error.as_ref()) { - let short = if err.len() > 80 { &err[..80] } else { err }; - eprintln!(" ✗ {}: {}", t.name, short); - } - } - eprintln!(); - continue; - } - - eprintln!(" {}", group_label); - for t in &tasks { - let sym = status_symbol(t); - let e = task_elapsed(t); - let duration = if !e.is_zero() { - format_duration_human(e.as_millis()) - } else { - String::new() - }; - - let retry = if t.max_retries > 0 && t.retry_count > 0 { - format!(" retry {}/{}", t.retry_count, t.max_retries) - } else { - String::new() - }; - - let detail = if matches!(t.status, TaskStatus::Failed) { - t.result.as_ref() - .and_then(|r| r.error.as_ref()) - .map(|e| { - let short = if e.len() > 60 { &e[..60] } else { e }; - format!(" err: {}", short) - }) - .unwrap_or_default() - } else { - String::new() - }; - - if duration.is_empty() { - eprintln!(" {} {:30}{}{}", sym, t.name, retry, detail); - } else { - eprintln!(" {} {:30} {:>8}{}{}", sym, t.name, duration, retry, detail); - } - - // Show output log tail for running tasks - if let Some(ref lp) = t.log_path { - // tail from log file - if matches!(t.status, TaskStatus::Running) { - eprintln!(" │ log: {}", lp); - } - } - } - eprintln!(); - } - - Ok(()) -} - -/// Drill down into a task's log file. Finds the log path from: -/// 1. Running task status (daemon-status.json) -/// 2. daemon.log started events (for completed/failed tasks) -pub fn show_task_log(task_name: &str, lines: usize) -> Result<(), String> { - // Try running tasks first - let Some(status_json) = send_rpc_pub("") else { - return search_log_fallback(task_name, lines); - }; - let Ok(status) = serde_json::from_str::(&status_json) else { - return search_log_fallback(task_name, lines); - }; - let Some(tasks) = status.get("tasks").and_then(|t| t.as_array()) else { - return search_log_fallback(task_name, lines); - }; - for t in tasks { - let name = t.get("name").and_then(|n| n.as_str()).unwrap_or(""); - if !name.contains(task_name) { continue; } - if let Some(lp) = t.get("log_path").and_then(|p| p.as_str()) { - return tail_file(lp, lines); - } - } - search_log_fallback(task_name, lines) -} - -fn search_log_fallback(task_name: &str, lines: usize) -> Result<(), String> { - // Fall back to searching daemon.log for the most recent started event with a log path - let log = log_path(); - if log.exists() { - let content = fs::read_to_string(&log).map_err(|e| format!("read log: {}", e))?; - for line in content.lines().rev() { - if let Ok(obj) = serde_json::from_str::(line) { - let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or(""); - let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or(""); - let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or(""); - if job.contains(task_name) && event == "started" && detail.starts_with("log: ") { - let path = &detail[5..]; - return tail_file(path, lines); - } - } - } - } - - Err(format!("no log file found for task '{}'", task_name)) -} - -fn tail_file(path: &str, lines: usize) -> Result<(), String> { - let content = fs::read_to_string(path) - .map_err(|e| format!("read {}: {}", path, e))?; - let all_lines: Vec<&str> = content.lines().collect(); - let skip = all_lines.len().saturating_sub(lines); - eprintln!("--- {} ({} lines) ---", path, all_lines.len()); - for line in &all_lines[skip..] { - eprintln!("{}", line); - } - Ok(()) -} - -pub fn show_log(job_filter: Option<&str>, lines: usize) -> Result<(), String> { - let path = log_path(); - if !path.exists() { - eprintln!("No daemon log found."); - return Ok(()); - } - - let content = fs::read_to_string(&path) - .map_err(|e| format!("read log: {}", e))?; - - let filtered: Vec<&str> = content.lines().rev() - .filter(|line| { - if let Some(job) = job_filter { - line.contains(&format!("\"job\":\"{}\"", job)) - } else { - true - } - }) - .take(lines) - .collect(); - - if filtered.is_empty() { - eprintln!("No log entries{}", job_filter.map(|j| format!(" for job '{}'", j)).unwrap_or_default()); - return Ok(()); - } - - // Pretty-print: parse JSON and format as "TIME JOB EVENT [DETAIL]" - for line in filtered.into_iter().rev() { - if let Ok(obj) = serde_json::from_str::(line) { - let ts = obj.get("ts").and_then(|v| v.as_str()).unwrap_or("?"); - let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("?"); - let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("?"); - let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or(""); - - // Shorten timestamp to just time portion - let time = if ts.len() >= 19 { &ts[11..19] } else { ts }; - - if detail.is_empty() { - eprintln!(" {} {:20} {}", time, job, event); - } else { - // Truncate long details (file paths) - let short = if detail.len() > 60 { - let last = detail.rfind('/').map(|i| &detail[i+1..]).unwrap_or(detail); - if last.len() > 60 { &last[..60] } else { last } - } else { - detail - }; - eprintln!(" {} {:20} {:12} {}", time, job, event, short); - } - } else { - eprintln!("{}", line); - } - } - Ok(()) -}