timestamp sanitization, CoT logging, reasoning field fix, persistent queue

- store/types.rs: sanitize timestamps on capnp load — old records had
  raw offsets instead of unix epoch, breaking sort-by-timestamp queries
- agents/api.rs: drain reasoning tokens from UI channel into LLM logs
  so we can see Qwen's chain-of-thought in agent output
- agents/daemon.rs: persistent task queue (pending-tasks.jsonl) —
  tasks survive daemon restarts. Push before spawn, remove on completion,
  recover on startup.
- api/openai.rs: only send reasoning field when explicitly configured,
  not on every request (fixes vllm warning)
- api/mod.rs: add 600s total request timeout as backstop for hung
  connections
- Cargo.toml: enable tokio-console feature for task introspection

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Kent Overstreet 2026-03-21 11:33:36 -04:00
parent 869a2fbc38
commit 34937932ab
7 changed files with 477 additions and 30 deletions

View file

@ -20,7 +20,7 @@ memmap2 = "0.9"
rayon = "1"
peg = "0.8"
paste = "1"
jobkit = { path = "/home/kent/jobkit", features = ["daemon"] }
jobkit = { path = "/home/kent/jobkit", features = ["daemon", "console"] }
poc-agent = { path = "../poc-agent" }
tokio = { version = "1", features = ["rt-multi-thread"] }
redb = "2"

View file

@ -35,8 +35,8 @@ pub async fn call_api_with_tools(
) -> Result<String, String> {
let client = get_client()?;
// Set up a minimal UI channel (we just collect messages, no TUI)
let (ui_tx, _ui_rx) = poc_agent::ui_channel::channel();
// Set up a UI channel — we drain reasoning tokens into the log
let (ui_tx, mut ui_rx) = poc_agent::ui_channel::channel();
// Build tool definitions — memory tools for graph operations
let all_defs = tools::definitions();
@ -66,6 +66,19 @@ pub async fn call_api_with_tools(
u.prompt_tokens, u.completion_tokens));
}
// Drain reasoning tokens from the UI channel into the log
{
let mut reasoning_buf = String::new();
while let Ok(ui_msg) = ui_rx.try_recv() {
if let poc_agent::ui_channel::UiMessage::Reasoning(r) = ui_msg {
reasoning_buf.push_str(&r);
}
}
if !reasoning_buf.is_empty() {
log(&format!("<think>\n{}\n</think>", reasoning_buf.trim()));
}
}
let has_content = msg.content.is_some();
let has_tools = msg.tool_calls.as_ref().is_some_and(|tc| !tc.is_empty());

View file

@ -22,6 +22,67 @@ use std::time::{Duration, SystemTime};
const SESSION_STALE_SECS: u64 = 600; // 10 minutes
const SCHEDULER_INTERVAL: Duration = Duration::from_secs(60);
const HEALTH_INTERVAL: Duration = Duration::from_secs(3600);
// --- Persistent task queue ---
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
struct PendingTask {
id: String,
agent: String,
batch: usize,
#[serde(default)]
target_key: Option<String>,
}
struct TaskQueue {
path: PathBuf,
tasks: Mutex<Vec<PendingTask>>,
}
impl TaskQueue {
fn load(data_dir: &Path) -> Arc<Self> {
let path = data_dir.join("pending-tasks.jsonl");
let tasks = if path.exists() {
fs::read_to_string(&path)
.unwrap_or_default()
.lines()
.filter_map(|l| serde_json::from_str(l).ok())
.collect()
} else {
Vec::new()
};
let count = tasks.len();
if count > 0 {
log_event("task-queue", "loaded", &format!("{} pending tasks", count));
}
Arc::new(Self { path, tasks: Mutex::new(tasks) })
}
fn push(&self, task: PendingTask) {
let mut tasks = self.tasks.lock().unwrap();
tasks.push(task);
self.write_locked(&tasks);
}
fn remove(&self, id: &str) {
let mut tasks = self.tasks.lock().unwrap();
tasks.retain(|t| t.id != id);
self.write_locked(&tasks);
}
fn drain(&self) -> Vec<PendingTask> {
let tasks = self.tasks.lock().unwrap();
tasks.clone()
}
fn write_locked(&self, tasks: &[PendingTask]) {
let content: String = tasks.iter()
.filter_map(|t| serde_json::to_string(t).ok())
.collect::<Vec<_>>()
.join("\n");
let _ = fs::write(&self.path, if content.is_empty() { String::new() } else { content + "\n" });
}
}
fn log_path() -> PathBuf {
crate::config::get().data_dir.join("daemon.log")
}
@ -720,6 +781,9 @@ pub fn run_daemon() -> Result<(), String> {
let graph_health: Arc<Mutex<Option<GraphHealth>>> = Arc::new(Mutex::new(None));
// Persistent task queue — survives daemon restarts
let task_queue = TaskQueue::load(&config.data_dir);
// Nodes currently being processed by agents — prevents concurrent
// agents from working on overlapping graph regions.
let in_flight: InFlightNodes = Arc::new(Mutex::new(std::collections::HashSet::new()));
@ -727,6 +791,31 @@ pub fn run_daemon() -> Result<(), String> {
log_event("daemon", "started", &format!("pid {}", std::process::id()));
eprintln!("poc-memory daemon started (pid {})", std::process::id());
// Recover pending tasks from previous run
{
let recovered = task_queue.drain();
if !recovered.is_empty() {
log_event("task-queue", "recovering", &format!("{} tasks", recovered.len()));
for pt in &recovered {
let agent = pt.agent.clone();
let b = pt.batch;
let task_id = pt.id.clone();
let in_flight_clone = Arc::clone(&in_flight);
let queue_clone = Arc::clone(&task_queue);
choir.spawn(pt.id.clone())
.resource(&llm)
.log_dir(&task_log_dir)
.retries(1)
.init(move |ctx| {
let result = job_consolidation_agent(ctx, &agent, b, &in_flight_clone);
queue_clone.remove(&task_id);
result
});
// Drop schedules via IdleTask::Drop
}
}
}
// Write initial status
write_status(&choir, *last_daily.lock().unwrap(), &graph_health);
@ -997,6 +1086,7 @@ pub fn run_daemon() -> Result<(), String> {
let graph_health_sched = Arc::clone(&graph_health);
let in_flight_sched = Arc::clone(&in_flight);
let log_dir_sched = task_log_dir.clone();
let queue_sched = Arc::clone(&task_queue);
const CONSOLIDATION_INTERVAL: Duration = Duration::from_secs(6 * 3600); // 6 hours
choir.spawn("scheduler").init(move |ctx| {
@ -1050,12 +1140,22 @@ pub fn run_daemon() -> Result<(), String> {
let b = *batch;
let in_flight_clone = Arc::clone(&in_flight_sched);
let task_name = format!("c-{}-{}:{}", agent, i, today);
let task_id = task_name.clone();
let queue_clone = Arc::clone(&queue_sched);
queue_sched.push(PendingTask {
id: task_id.clone(),
agent: agent.clone(),
batch: b,
target_key: None,
});
let task = choir_sched.spawn(task_name)
.resource(&llm_sched)
.log_dir(&log_dir_sched)
.retries(1)
.init(move |ctx| {
job_consolidation_agent(ctx, &agent, b, &in_flight_clone)
let result = job_consolidation_agent(ctx, &agent, b, &in_flight_clone);
queue_clone.remove(&task_id);
result
})
.run();
all_tasks.push(task);

View file

@ -357,6 +357,15 @@ impl Node {
node.provenance = Provenance::from_capnp(old).label().to_string();
}
}
// Sanitize timestamps: old capnp records have raw offsets instead
// of unix epoch. Anything past year 2100 (~4102444800) is bogus.
const MAX_SANE_EPOCH: i64 = 4_102_444_800;
if node.timestamp > MAX_SANE_EPOCH || node.timestamp < 0 {
node.timestamp = node.created_at;
}
if node.created_at > MAX_SANE_EPOCH || node.created_at < 0 {
node.created_at = node.timestamp.min(MAX_SANE_EPOCH);
}
Ok(node)
}
}