Logging overhaul: per-task log files, daemon.log drill-down

Switch from jobkit-daemon crate to jobkit with daemon feature.
Wire up per-task log files for all daemon-spawned agent tasks.

Changes:
- Use jobkit::daemon:: instead of jobkit_daemon::
- All agent tasks get .log_dir() set to $data_dir/logs/
- Task log path shown in daemon status and TUI
- New CLI: poc-memory agent daemon log --task NAME
  Finds the task's log path from status or daemon.log, tails the file
- LLM backend selection logged to daemon.log via log_event
- Targeted agent job names include the target key for debuggability
- Logging architecture documented in doc/logging.md

Two-level logging, no duplication:
- daemon.log: lifecycle events with task log path for drill-down
- per-task logs: full agent output via ctx.log_line()

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Kent Overstreet 2026-03-19 11:17:07 -04:00
parent f2c2c02a22
commit 49f72cdac3
7 changed files with 192 additions and 54 deletions

View file

@ -29,14 +29,19 @@ fn log_path() -> PathBuf {
// --- Logging ---
fn log_event(job: &str, event: &str, detail: &str) {
jobkit_daemon::event_log::log(&crate::config::get().data_dir, job, event, detail);
jobkit::daemon::event_log::log(&crate::config::get().data_dir, job, event, detail);
}
/// Public wrapper for logging from other agent modules.
pub fn log_event_pub(job: &str, event: &str, detail: &str) {
log_event(job, event, detail);
}
// --- Job functions (direct, no subprocess) ---
/// Run a named job with logging, progress reporting, and error mapping.
fn run_job(ctx: &ExecutionContext, name: &str, f: impl FnOnce() -> Result<(), String>) -> Result<(), TaskError> {
jobkit_daemon::Daemon::run_job(&crate::config::get().data_dir, ctx, name, f)
jobkit::daemon::Daemon::run_job(&crate::config::get().data_dir, ctx, name, f)
}
// experience_mine and fact_mine removed — observation.agent handles all transcript mining
@ -49,11 +54,15 @@ fn job_targeted_agent(
) -> Result<(), TaskError> {
let agent = agent_type.to_string();
let key = target_key.to_string();
let job_name = format!("c-{}-target", agent);
let job_name = format!("c-{}-target({})", agent, key);
run_job(ctx, &job_name, || {
let mut store = crate::store::Store::load()?;
ctx.log_line(&format!("targeting: {}", key));
let log = |msg: &str| { ctx.log_line(msg); };
let job = job_name.clone();
let log = |msg: &str| {
ctx.log_line(msg);
log_event(&job, "progress", msg);
};
super::knowledge::run_one_agent_with_keys(
&mut store, &agent, &[key.clone()], 5, "daemon", &log, false,
)?;
@ -576,7 +585,7 @@ fn write_status(
graph_health: &Arc<Mutex<Option<GraphHealth>>>,
) {
let status = build_status(choir, last_daily, graph_health);
jobkit_daemon::status::write(&crate::config::get().data_dir, &status);
jobkit::daemon::status::write(&crate::config::get().data_dir, &status);
}
#[derive(Clone, Default, serde::Serialize, serde::Deserialize)]
@ -617,7 +626,7 @@ struct DaemonStatus {
pub fn run_daemon() -> Result<(), String> {
let config = crate::config::get();
let mut daemon = jobkit_daemon::Daemon::new(jobkit_daemon::DaemonConfig {
let mut daemon = jobkit::daemon::Daemon::new(jobkit::daemon::DaemonConfig {
data_dir: config.data_dir.clone(),
resource_slots: config.llm_concurrency,
resource_name: "llm".to_string(),
@ -626,10 +635,12 @@ pub fn run_daemon() -> Result<(), String> {
let choir = Arc::clone(&daemon.choir);
let llm = Arc::clone(&daemon.resource);
let task_log_dir = config.data_dir.join("logs");
let _ = fs::create_dir_all(&task_log_dir);
// Recover last_daily from previous status file
let last_daily: Arc<Mutex<Option<chrono::NaiveDate>>> = Arc::new(Mutex::new(
jobkit_daemon::status::load::<DaemonStatus>(&config.data_dir)
jobkit::daemon::status::load::<DaemonStatus>(&config.data_dir)
.and_then(|s| s.last_daily)
.and_then(|d| d.parse().ok())
));
@ -907,6 +918,7 @@ pub fn run_daemon() -> Result<(), String> {
let llm_sched = Arc::clone(&llm);
let last_daily_sched = Arc::clone(&last_daily);
let graph_health_sched = Arc::clone(&graph_health);
let log_dir_sched = task_log_dir.clone();
const CONSOLIDATION_INTERVAL: Duration = Duration::from_secs(6 * 3600); // 6 hours
choir.spawn("scheduler").init(move |ctx| {
@ -967,6 +979,7 @@ pub fn run_daemon() -> Result<(), String> {
let task_name = format!("c-{}-{}:{}", agent, i, today);
let mut builder = choir_sched.spawn(task_name)
.resource(&llm_sched)
.log_dir(&log_dir_sched)
.retries(1)
.init(move |ctx| {
job_consolidation_agent(ctx, &agent, b)
@ -1067,6 +1080,7 @@ pub fn run_daemon() -> Result<(), String> {
{
let choir_rpc = Arc::clone(&choir);
let llm_rpc = Arc::clone(&llm);
let log_dir_rpc = task_log_dir.clone();
daemon.add_rpc_handler(move |cmd, _ctx| {
if !cmd.starts_with("run-agent ") { return None; }
let parts: Vec<&str> = cmd.splitn(4, ' ').collect();
@ -1093,8 +1107,11 @@ pub fn run_daemon() -> Result<(), String> {
let agent = agent_type.to_string();
let key = key.clone();
let task_name = format!("c-{}-{}:{}", agent, key.chars().take(30).collect::<String>(), today);
log_event("daemon", "spawn-targeted",
&format!("{} (available slots: {})", task_name, llm_rpc.available()));
choir_rpc.spawn(task_name)
.resource(&llm_rpc)
.log_dir(&log_dir_rpc)
.retries(1)
.init(move |ctx| {
job_targeted_agent(ctx, &agent, &key)
@ -1133,6 +1150,7 @@ pub fn run_daemon() -> Result<(), String> {
let task_name = format!("c-{}-rpc{}:{}", agent, ts, today);
let mut builder = choir_rpc.spawn(task_name)
.resource(&llm_rpc)
.log_dir(&log_dir_rpc)
.retries(1)
.init(move |ctx| {
if is_rename {
@ -1174,7 +1192,7 @@ pub fn send_rpc_pub(cmd: &str) -> Option<String> {
}
fn send_rpc(cmd: &str) -> Option<String> {
jobkit_daemon::socket::send_rpc(&crate::config::get().data_dir, cmd)
jobkit::daemon::socket::send_rpc(&crate::config::get().data_dir, cmd)
}
pub fn rpc_consolidate() -> Result<(), String> {
@ -1209,7 +1227,7 @@ pub fn rpc_run_agent(agent: &str, count: usize) -> Result<(), String> {
}
fn read_status_socket() -> Option<DaemonStatus> {
let json = jobkit_daemon::socket::send_rpc(&crate::config::get().data_dir, "")?;
let json = jobkit::daemon::socket::send_rpc(&crate::config::get().data_dir, "")?;
serde_json::from_str(&json).ok()
}
@ -1411,10 +1429,10 @@ pub fn show_status() -> Result<(), String> {
.unwrap_or_default();
let name = short_job_name(&t.name);
eprintln!(" {} {:30}{}{}", sym, name, elapsed, progress);
if matches!(t.status, TaskStatus::Running) && !t.output_log.is_empty() {
let skip = t.output_log.len().saturating_sub(3);
for line in &t.output_log[skip..] {
eprintln!(" {}", line);
if let Some(ref lp) = t.log_path {
// tail from log file
if matches!(t.status, TaskStatus::Running) {
eprintln!(" log: {}", lp);
}
}
}
@ -1449,11 +1467,8 @@ pub fn show_status() -> Result<(), String> {
};
let progress = t.progress.as_deref().map(|p| format!(" {}", p)).unwrap_or_default();
eprintln!(" {} {}{}{}", status_symbol(t), t.name, elapsed, progress);
if !t.output_log.is_empty() {
let skip = t.output_log.len().saturating_sub(3);
for line in &t.output_log[skip..] {
eprintln!("{}", line);
}
if let Some(ref lp) = t.log_path {
eprintln!(" │ log: {}", lp);
}
}
}
@ -1512,10 +1527,10 @@ pub fn show_status() -> Result<(), String> {
}
// Show output log tail for running tasks
if matches!(t.status, TaskStatus::Running) && !t.output_log.is_empty() {
let skip = t.output_log.len().saturating_sub(5);
for line in &t.output_log[skip..] {
eprintln!(" {}", line);
if let Some(ref lp) = t.log_path {
// tail from log file
if matches!(t.status, TaskStatus::Running) {
eprintln!(" log: {}", lp);
}
}
}
@ -1739,6 +1754,58 @@ pub fn install_hook() -> Result<(), String> {
Ok(())
}
/// Drill down into a task's log file. Finds the log path from:
/// 1. Running task status (daemon-status.json)
/// 2. daemon.log started events (for completed/failed tasks)
pub fn show_task_log(task_name: &str, lines: usize) -> Result<(), String> {
// Try running tasks first
if let Some(status_json) = send_rpc_pub("") {
if let Ok(status) = serde_json::from_str::<serde_json::Value>(&status_json) {
if let Some(tasks) = status.get("tasks").and_then(|t| t.as_array()) {
for t in tasks {
let name = t.get("name").and_then(|n| n.as_str()).unwrap_or("");
if name.contains(task_name) {
if let Some(lp) = t.get("log_path").and_then(|p| p.as_str()) {
return tail_file(lp, lines);
}
}
}
}
}
}
// Fall back to searching daemon.log for the most recent started event with a log path
let log = log_path();
if log.exists() {
let content = fs::read_to_string(&log).map_err(|e| format!("read log: {}", e))?;
for line in content.lines().rev() {
if let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) {
let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("");
let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("");
let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or("");
if job.contains(task_name) && event == "started" && detail.starts_with("log: ") {
let path = &detail[5..];
return tail_file(path, lines);
}
}
}
}
Err(format!("no log file found for task '{}'", task_name))
}
fn tail_file(path: &str, lines: usize) -> Result<(), String> {
let content = fs::read_to_string(path)
.map_err(|e| format!("read {}: {}", path, e))?;
let all_lines: Vec<&str> = content.lines().collect();
let skip = all_lines.len().saturating_sub(lines);
eprintln!("--- {} ({} lines) ---", path, all_lines.len());
for line in &all_lines[skip..] {
eprintln!("{}", line);
}
Ok(())
}
pub fn show_log(job_filter: Option<&str>, lines: usize) -> Result<(), String> {
let path = log_path();
if !path.exists() {