diff --git a/Cargo.lock b/Cargo.lock index 96af429..f13096b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -112,6 +112,39 @@ version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "atomic" version = "0.6.1" @@ -165,6 +198,59 @@ dependencies = [ "fs_extra", ] +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower 0.5.3", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", +] + +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "base64" version = "0.22.1" @@ -455,6 +541,45 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "console-api" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8030735ecb0d128428b64cd379809817e620a40e5001c54465b99ec5feec2857" +dependencies = [ + "futures-core", + "prost", + "prost-types", + "tonic", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6539aa9c6a4cd31f4b1c040f860a1eac9aa80e7df6b05d506a6e7179936d6a01" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures-task", + "hdrhistogram", + "humantime", + "hyper-util", + "prost", + "prost-types", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "convert_case" version = "0.10.0" @@ -1391,7 +1516,7 @@ dependencies = [ "futures-core", "futures-sink", "http", - "indexmap", + "indexmap 2.13.0", "slab", "tokio", "tokio-util", @@ -1440,6 +1565,19 @@ dependencies = [ "foldhash 0.2.0", ] +[[package]] +name = "hdrhistogram" +version = "7.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" +dependencies = [ + "base64 0.21.7", + "byteorder", + "flate2", + "nom 7.1.3", + "num-traits", +] + [[package]] name = "heck" version = "0.5.0" @@ -1497,6 +1635,18 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + +[[package]] +name = "humantime" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" + [[package]] name = "hyper" version = "1.8.1" @@ -1511,6 +1661,7 @@ dependencies = [ "http", "http-body", "httparse", + "httpdate", "itoa", "pin-project-lite", "pin-utils", @@ -1536,6 +1687,19 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-tls" version = "0.6.0" @@ -1558,7 +1722,7 @@ version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "futures-channel", "futures-util", @@ -1569,7 +1733,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2", + "socket2 0.6.2", "system-configuration", "tokio", "tower-service", @@ -1715,6 +1879,16 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.13.0" @@ -1808,6 +1982,7 @@ name = "jobkit" version = "0.3.0" dependencies = [ "chrono", + "console-subscriber", "libc", "log", "profiling", @@ -1998,6 +2173,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "memchr" version = "2.8.0" @@ -2550,6 +2731,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1749c7ed4bcaf4c3d0a3efc28538844fb29bcdd7d2b67b2be7e20ba861ff517" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "pin-project-lite" version = "0.2.17" @@ -2574,8 +2775,8 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "740ebea15c5d1428f910cd1a5f52cebf8d25006245ed8ade92702f4943d91e07" dependencies = [ - "base64", - "indexmap", + "base64 0.22.1", + "indexmap 2.13.0", "quick-xml", "serde", "time", @@ -2586,7 +2787,7 @@ name = "poc-agent" version = "0.4.0" dependencies = [ "anyhow", - "base64", + "base64 0.22.1", "chrono", "clap", "crossterm 0.29.0", @@ -2778,6 +2979,38 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "prost-types" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +dependencies = [ + "prost", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -2887,7 +3120,7 @@ dependencies = [ "quinn-udp", "rustc-hash 2.1.1", "rustls", - "socket2", + "socket2 0.6.2", "thiserror 2.0.18", "tokio", "tracing", @@ -2924,7 +3157,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2", + "socket2 0.6.2", "tracing", "windows-sys 0.60.2", ] @@ -2956,6 +3189,8 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ + "libc", + "rand_chacha 0.3.1", "rand_core 0.6.4", ] @@ -2965,10 +3200,20 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" dependencies = [ - "rand_chacha", + "rand_chacha 0.9.0", "rand_core 0.9.5", ] +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.4", +] + [[package]] name = "rand_chacha" version = "0.9.0" @@ -2984,6 +3229,9 @@ name = "rand_core" version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.17", +] [[package]] name = "rand_core" @@ -3203,7 +3451,7 @@ version = "0.12.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "encoding_rs", "futures-core", @@ -3231,7 +3479,7 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls", - "tower", + "tower 0.5.3", "tower-http", "tower-service", "url", @@ -3638,6 +3886,16 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "socket2" version = "0.6.2" @@ -3851,7 +4109,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4676b37242ccbd1aabf56edb093a4827dc49086c0ffd764a5705899e0f35f8f7" dependencies = [ "anyhow", - "base64", + "base64 0.22.1", "bitflags 2.11.0", "fancy-regex 0.11.0", "filedescriptor", @@ -3942,7 +4200,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a19830747d9034cd9da43a60eaa8e552dfda7712424aebf187b7a60126bae0d" dependencies = [ "anyhow", - "base64", + "base64 0.22.1", "bstr", "fancy-regex 0.13.0", "lazy_static", @@ -4020,8 +4278,9 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.6.2", "tokio-macros", + "tracing", "windows-sys 0.61.2", ] @@ -4056,6 +4315,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -4106,7 +4376,7 @@ version = "0.22.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" dependencies = [ - "indexmap", + "indexmap 2.13.0", "serde", "serde_spanned", "toml_datetime 0.6.11", @@ -4120,7 +4390,7 @@ version = "0.25.5+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ca1a40644a28bce036923f6a431df0b34236949d111cc07cb6dca830c9ef2e1" dependencies = [ - "indexmap", + "indexmap 2.13.0", "toml_datetime 1.0.1+spec-1.1.0", "toml_parser", "winnow 1.0.0", @@ -4141,6 +4411,56 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.22.1", + "bytes", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "socket2 0.5.10", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.5.3" @@ -4169,7 +4489,7 @@ dependencies = [ "http-body", "iri-string", "pin-project-lite", - "tower", + "tower 0.5.3", "tower-layer", "tower-service", ] @@ -4531,7 +4851,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" dependencies = [ "anyhow", - "indexmap", + "indexmap 2.13.0", "wasm-encoder", "wasmparser", ] @@ -4544,7 +4864,7 @@ checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ "bitflags 2.11.0", "hashbrown 0.15.5", - "indexmap", + "indexmap 2.13.0", "semver", ] @@ -5009,7 +5329,7 @@ checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" dependencies = [ "anyhow", "heck", - "indexmap", + "indexmap 2.13.0", "prettyplease", "syn 2.0.117", "wasm-metadata", @@ -5040,7 +5360,7 @@ checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" dependencies = [ "anyhow", "bitflags 2.11.0", - "indexmap", + "indexmap 2.13.0", "log", "serde", "serde_derive", @@ -5059,7 +5379,7 @@ checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" dependencies = [ "anyhow", "id-arena", - "indexmap", + "indexmap 2.13.0", "log", "semver", "serde", diff --git a/poc-agent/src/api/mod.rs b/poc-agent/src/api/mod.rs index 22ae3c3..6c3079f 100644 --- a/poc-agent/src/api/mod.rs +++ b/poc-agent/src/api/mod.rs @@ -39,6 +39,7 @@ impl ApiClient { pub fn new(base_url: &str, api_key: &str, model: &str) -> Self { let client = Client::builder() .connect_timeout(Duration::from_secs(30)) + .timeout(Duration::from_secs(600)) .build() .expect("failed to build HTTP client"); diff --git a/poc-agent/src/api/openai.rs b/poc-agent/src/api/openai.rs index 7a7d0f6..7ad6b53 100644 --- a/poc-agent/src/api/openai.rs +++ b/poc-agent/src/api/openai.rs @@ -30,10 +30,14 @@ pub async fn stream( max_tokens: Some(16384), temperature: Some(0.6), stream: Some(true), - reasoning: Some(ReasoningConfig { - enabled: reasoning_effort != "none", - effort: Some(reasoning_effort.to_string()), - }), + reasoning: if reasoning_effort != "none" && reasoning_effort != "default" { + Some(ReasoningConfig { + enabled: true, + effort: Some(reasoning_effort.to_string()), + }) + } else { + None + }, chat_template_kwargs: None, }; diff --git a/poc-memory/Cargo.toml b/poc-memory/Cargo.toml index f56d80c..14fde9e 100644 --- a/poc-memory/Cargo.toml +++ b/poc-memory/Cargo.toml @@ -20,7 +20,7 @@ memmap2 = "0.9" rayon = "1" peg = "0.8" paste = "1" -jobkit = { path = "/home/kent/jobkit", features = ["daemon"] } +jobkit = { path = "/home/kent/jobkit", features = ["daemon", "console"] } poc-agent = { path = "../poc-agent" } tokio = { version = "1", features = ["rt-multi-thread"] } redb = "2" diff --git a/poc-memory/src/agents/api.rs b/poc-memory/src/agents/api.rs index 4df4810..07383f4 100644 --- a/poc-memory/src/agents/api.rs +++ b/poc-memory/src/agents/api.rs @@ -35,8 +35,8 @@ pub async fn call_api_with_tools( ) -> Result { let client = get_client()?; - // Set up a minimal UI channel (we just collect messages, no TUI) - let (ui_tx, _ui_rx) = poc_agent::ui_channel::channel(); + // Set up a UI channel — we drain reasoning tokens into the log + let (ui_tx, mut ui_rx) = poc_agent::ui_channel::channel(); // Build tool definitions — memory tools for graph operations let all_defs = tools::definitions(); @@ -66,6 +66,19 @@ pub async fn call_api_with_tools( u.prompt_tokens, u.completion_tokens)); } + // Drain reasoning tokens from the UI channel into the log + { + let mut reasoning_buf = String::new(); + while let Ok(ui_msg) = ui_rx.try_recv() { + if let poc_agent::ui_channel::UiMessage::Reasoning(r) = ui_msg { + reasoning_buf.push_str(&r); + } + } + if !reasoning_buf.is_empty() { + log(&format!("\n{}\n", reasoning_buf.trim())); + } + } + let has_content = msg.content.is_some(); let has_tools = msg.tool_calls.as_ref().is_some_and(|tc| !tc.is_empty()); diff --git a/poc-memory/src/agents/daemon.rs b/poc-memory/src/agents/daemon.rs index 2cd3965..0ffc95f 100644 --- a/poc-memory/src/agents/daemon.rs +++ b/poc-memory/src/agents/daemon.rs @@ -22,6 +22,67 @@ use std::time::{Duration, SystemTime}; const SESSION_STALE_SECS: u64 = 600; // 10 minutes const SCHEDULER_INTERVAL: Duration = Duration::from_secs(60); const HEALTH_INTERVAL: Duration = Duration::from_secs(3600); + +// --- Persistent task queue --- + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +struct PendingTask { + id: String, + agent: String, + batch: usize, + #[serde(default)] + target_key: Option, +} + +struct TaskQueue { + path: PathBuf, + tasks: Mutex>, +} + +impl TaskQueue { + fn load(data_dir: &Path) -> Arc { + let path = data_dir.join("pending-tasks.jsonl"); + let tasks = if path.exists() { + fs::read_to_string(&path) + .unwrap_or_default() + .lines() + .filter_map(|l| serde_json::from_str(l).ok()) + .collect() + } else { + Vec::new() + }; + let count = tasks.len(); + if count > 0 { + log_event("task-queue", "loaded", &format!("{} pending tasks", count)); + } + Arc::new(Self { path, tasks: Mutex::new(tasks) }) + } + + fn push(&self, task: PendingTask) { + let mut tasks = self.tasks.lock().unwrap(); + tasks.push(task); + self.write_locked(&tasks); + } + + fn remove(&self, id: &str) { + let mut tasks = self.tasks.lock().unwrap(); + tasks.retain(|t| t.id != id); + self.write_locked(&tasks); + } + + fn drain(&self) -> Vec { + let tasks = self.tasks.lock().unwrap(); + tasks.clone() + } + + fn write_locked(&self, tasks: &[PendingTask]) { + let content: String = tasks.iter() + .filter_map(|t| serde_json::to_string(t).ok()) + .collect::>() + .join("\n"); + let _ = fs::write(&self.path, if content.is_empty() { String::new() } else { content + "\n" }); + } +} fn log_path() -> PathBuf { crate::config::get().data_dir.join("daemon.log") } @@ -720,6 +781,9 @@ pub fn run_daemon() -> Result<(), String> { let graph_health: Arc>> = Arc::new(Mutex::new(None)); + // Persistent task queue — survives daemon restarts + let task_queue = TaskQueue::load(&config.data_dir); + // Nodes currently being processed by agents — prevents concurrent // agents from working on overlapping graph regions. let in_flight: InFlightNodes = Arc::new(Mutex::new(std::collections::HashSet::new())); @@ -727,6 +791,31 @@ pub fn run_daemon() -> Result<(), String> { log_event("daemon", "started", &format!("pid {}", std::process::id())); eprintln!("poc-memory daemon started (pid {})", std::process::id()); + // Recover pending tasks from previous run + { + let recovered = task_queue.drain(); + if !recovered.is_empty() { + log_event("task-queue", "recovering", &format!("{} tasks", recovered.len())); + for pt in &recovered { + let agent = pt.agent.clone(); + let b = pt.batch; + let task_id = pt.id.clone(); + let in_flight_clone = Arc::clone(&in_flight); + let queue_clone = Arc::clone(&task_queue); + choir.spawn(pt.id.clone()) + .resource(&llm) + .log_dir(&task_log_dir) + .retries(1) + .init(move |ctx| { + let result = job_consolidation_agent(ctx, &agent, b, &in_flight_clone); + queue_clone.remove(&task_id); + result + }); + // Drop schedules via IdleTask::Drop + } + } + } + // Write initial status write_status(&choir, *last_daily.lock().unwrap(), &graph_health); @@ -997,6 +1086,7 @@ pub fn run_daemon() -> Result<(), String> { let graph_health_sched = Arc::clone(&graph_health); let in_flight_sched = Arc::clone(&in_flight); let log_dir_sched = task_log_dir.clone(); + let queue_sched = Arc::clone(&task_queue); const CONSOLIDATION_INTERVAL: Duration = Duration::from_secs(6 * 3600); // 6 hours choir.spawn("scheduler").init(move |ctx| { @@ -1050,12 +1140,22 @@ pub fn run_daemon() -> Result<(), String> { let b = *batch; let in_flight_clone = Arc::clone(&in_flight_sched); let task_name = format!("c-{}-{}:{}", agent, i, today); + let task_id = task_name.clone(); + let queue_clone = Arc::clone(&queue_sched); + queue_sched.push(PendingTask { + id: task_id.clone(), + agent: agent.clone(), + batch: b, + target_key: None, + }); let task = choir_sched.spawn(task_name) .resource(&llm_sched) .log_dir(&log_dir_sched) .retries(1) .init(move |ctx| { - job_consolidation_agent(ctx, &agent, b, &in_flight_clone) + let result = job_consolidation_agent(ctx, &agent, b, &in_flight_clone); + queue_clone.remove(&task_id); + result }) .run(); all_tasks.push(task); diff --git a/poc-memory/src/store/types.rs b/poc-memory/src/store/types.rs index ddcd75b..2c63361 100644 --- a/poc-memory/src/store/types.rs +++ b/poc-memory/src/store/types.rs @@ -357,6 +357,15 @@ impl Node { node.provenance = Provenance::from_capnp(old).label().to_string(); } } + // Sanitize timestamps: old capnp records have raw offsets instead + // of unix epoch. Anything past year 2100 (~4102444800) is bogus. + const MAX_SANE_EPOCH: i64 = 4_102_444_800; + if node.timestamp > MAX_SANE_EPOCH || node.timestamp < 0 { + node.timestamp = node.created_at; + } + if node.created_at > MAX_SANE_EPOCH || node.created_at < 0 { + node.created_at = node.timestamp.min(MAX_SANE_EPOCH); + } Ok(node) } }