diff --git a/Cargo.lock b/Cargo.lock index 9d24eb3..cb02751 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1570,10 +1570,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" dependencies = [ "cfg-if", - "js-sys", "libc", "wasi", - "wasm-bindgen", ] [[package]] @@ -1583,11 +1581,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" dependencies = [ "cfg-if", - "js-sys", "libc", "r-efi", "wasip2", - "wasm-bindgen", ] [[package]] @@ -1789,7 +1785,6 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", - "webpki-roots", ] [[package]] @@ -2248,12 +2243,6 @@ dependencies = [ "hashbrown 0.16.1", ] -[[package]] -name = "lru-slab" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" - [[package]] name = "mac_address" version = "1.1.8" @@ -2888,30 +2877,6 @@ dependencies = [ "time", ] -[[package]] -name = "poc-daemon" -version = "0.4.0" -dependencies = [ - "capnp", - "capnp-rpc", - "capnpc", - "chrono", - "clap", - "futures", - "reqwest", - "rustls", - "serde", - "serde_json", - "tokio", - "tokio-rustls", - "tokio-util", - "toml", - "tracing", - "tracing-appender", - "tracing-subscriber", - "webpki-roots", -] - [[package]] name = "poc-memory" version = "0.4.0" @@ -3241,61 +3206,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "quinn" -version = "0.11.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" -dependencies = [ - "bytes", - "cfg_aliases", - "pin-project-lite", - "quinn-proto", - "quinn-udp", - "rustc-hash 2.1.1", - "rustls", - "socket2 0.6.2", - "thiserror 2.0.18", - "tokio", - "tracing", - "web-time", -] - -[[package]] -name = "quinn-proto" -version = "0.11.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" -dependencies = [ - "bytes", - "getrandom 0.3.4", - "lru-slab", - "rand 0.9.2", - "ring", - "rustc-hash 2.1.1", - "rustls", - "rustls-pki-types", - "slab", - "thiserror 2.0.18", - "tinyvec", - "tracing", - "web-time", -] - -[[package]] -name = "quinn-udp" -version = "0.5.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" -dependencies = [ - "cfg_aliases", - "libc", - "once_cell", - "socket2 0.6.2", - "tracing", - "windows-sys 0.60.2", -] - [[package]] name = "quote" version = "1.0.44" @@ -3603,8 +3513,6 @@ dependencies = [ "native-tls", "percent-encoding", "pin-project-lite", - "quinn", - "rustls", "rustls-pki-types", "serde", "serde_json", @@ -3612,7 +3520,6 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-native-tls", - "tokio-rustls", "tower 0.5.3", "tower-http", "tower-service", @@ -3620,7 +3527,6 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "webpki-roots", ] [[package]] @@ -3701,12 +3607,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" -[[package]] -name = "rustc-hash" -version = "2.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" - [[package]] name = "rustc_version" version = "0.4.1" @@ -3751,7 +3651,6 @@ version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be040f8b0a225e40375822a563fa9524378b9d63112f53e19ffff34df5d33fdd" dependencies = [ - "web-time", "zeroize", ] @@ -4326,7 +4225,7 @@ dependencies = [ "fancy-regex 0.13.0", "lazy_static", "regex", - "rustc-hash 1.1.0", + "rustc-hash", ] [[package]] @@ -5057,16 +4956,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "web-time" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" -dependencies = [ - "js-sys", - "wasm-bindgen", -] - [[package]] name = "webpki-roots" version = "1.0.6" diff --git a/Cargo.toml b/Cargo.toml index 7d9091c..03a376d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["thalamus", "channels/irc", "channels/telegram"] +members = ["channels/irc", "channels/telegram"] resolver = "2" [workspace.package] diff --git a/src/thalamus/mod.rs b/src/thalamus/mod.rs index e23bc43..046807a 100644 --- a/src/thalamus/mod.rs +++ b/src/thalamus/mod.rs @@ -13,7 +13,6 @@ pub mod config; pub mod supervisor; pub mod context; pub mod idle; -pub mod modules; pub mod notify; pub mod rpc; pub mod tmux; diff --git a/src/thalamus/modules/mod.rs b/src/thalamus/modules/mod.rs deleted file mode 100644 index 288fde5..0000000 --- a/src/thalamus/modules/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -// External communication modules (IRC, Telegram, etc.) live in -// separate daemons, not in the core consciousness binary. diff --git a/thalamus/Cargo.toml b/thalamus/Cargo.toml deleted file mode 100644 index 3ec365f..0000000 --- a/thalamus/Cargo.toml +++ /dev/null @@ -1,30 +0,0 @@ -[package] -name = "poc-daemon" -version.workspace = true -edition.workspace = true - -[dependencies] -capnp = "0.20" -capnp-rpc = "0.20" -clap = { version = "4", features = ["derive"] } -futures = "0.3" -tokio = { version = "1", features = ["full"] } -tokio-util = { version = "0.7", features = ["compat"] } -toml = "0.8" -tokio-rustls = "0.26" -rustls = { version = "0.23", default-features = false, features = ["ring", "logging", "std", "tls12"] } -webpki-roots = "1" -tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } -tracing-appender = "0.2" -reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-webpki-roots", "json"] } -serde = { version = "1", features = ["derive"] } -serde_json = "1" -chrono = "0.4" - -[build-dependencies] -capnpc = "0.20" - -[[bin]] -name = "poc-daemon" -path = "src/main.rs" diff --git a/thalamus/build.rs b/thalamus/build.rs deleted file mode 100644 index fafdfba..0000000 --- a/thalamus/build.rs +++ /dev/null @@ -1,6 +0,0 @@ -fn main() { - capnpc::CompilerCommand::new() - .file("schema/daemon.capnp") - .run() - .expect("capnp compile failed"); -} diff --git a/thalamus/schema/daemon.capnp b/thalamus/schema/daemon.capnp deleted file mode 100644 index eb436da..0000000 --- a/thalamus/schema/daemon.capnp +++ /dev/null @@ -1,84 +0,0 @@ -@0xb8e2f4a1c3d56789; - -# Claude daemon RPC interface. -# -# Served over a Unix domain socket. Clients connect, bootstrap -# the Daemon interface, make calls, disconnect. - -struct Notification { - type @0 :Text; - urgency @1 :UInt8; - message @2 :Text; - timestamp @3 :Float64; -} - -struct TypeInfo { - name @0 :Text; - count @1 :UInt64; - firstSeen @2 :Float64; - lastSeen @3 :Float64; - threshold @4 :Int8; # -1 = inherit, 0-3 = explicit level -} - -enum Activity { - idle @0; - focused @1; - sleeping @2; -} - -struct Status { - lastUserMsg @0 :Float64; - lastResponse @1 :Float64; - claudePane @2 :Text; - sleepUntil @3 :Float64; # 0 = not sleeping, -1 = indefinite - quietUntil @4 :Float64; - consolidating @5 :Bool; - dreaming @6 :Bool; - fired @7 :Bool; - userPresent @8 :Bool; - uptime @9 :Float64; - activity @10 :Activity; - pendingCount @11 :UInt32; - idleTimeout @12 :Float64; # configured idle timeout (secs) - notifyTimeout @13 :Float64; # configured notify-via-tmux timeout (secs) - sinceActivity @14 :Float64; # secs since max(lastUserMsg, lastResponse) - sinceUser @15 :Float64; # secs since lastUserMsg - blockReason @16 :Text; # why idle timer hasn't fired - activityEwma @17 :Float64; # 0-1, EWMA of recent activity (running fraction) -} - -interface Daemon { - # Idle timer - user @0 (pane :Text) -> (); - response @1 (pane :Text) -> (); - sleep @2 (until :Float64) -> (); # 0 = indefinite - wake @3 () -> (); - quiet @4 (seconds :UInt32) -> (); - consolidating @5 () -> (); - consolidated @6 () -> (); - dreamStart @7 () -> (); - dreamEnd @8 () -> (); - stop @9 () -> (); - status @10 () -> (status :Status); - - # Notifications - notify @11 (notification :Notification) -> (interrupt :Bool); - getNotifications @12 (minUrgency :UInt8) -> (notifications :List(Notification)); - getTypes @13 () -> (types :List(TypeInfo)); - setThreshold @14 (type :Text, level :UInt8) -> (); - - idleTimeout @16 (seconds :Float64) -> (); - notifyTimeout @19 (seconds :Float64) -> (); - save @17 () -> (); - debug @18 () -> (json :Text); - - ewma @20 (value :Float64) -> (current :Float64); - afk @21 () -> (); - sessionTimeout @22 (seconds :Float64) -> (); - - testNudge @23 () -> (sent :Bool, message :Text); - - # Modules - moduleCommand @15 (module :Text, command :Text, args :List(Text)) - -> (result :Text); -} diff --git a/thalamus/src/config.rs b/thalamus/src/config.rs deleted file mode 100644 index 5c4b220..0000000 --- a/thalamus/src/config.rs +++ /dev/null @@ -1,97 +0,0 @@ -// Daemon configuration. -// -// Lives at ~/.consciousness/daemon.toml. Loaded on startup, updated at -// runtime when modules change state (join channel, etc.). - -use crate::home; -use serde::{Deserialize, Serialize}; -use std::fs; -use std::path::PathBuf; - -fn config_path() -> PathBuf { - home().join(".consciousness/daemon.toml") -} - -#[derive(Debug, Clone, Serialize, Deserialize, Default)] -pub struct Config { - #[serde(default)] - pub irc: IrcConfig, - #[serde(default)] - pub telegram: TelegramConfig, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct IrcConfig { - pub enabled: bool, - pub server: String, - pub port: u16, - pub tls: bool, - pub nick: String, - pub user: String, - pub realname: String, - pub channels: Vec, -} - -impl Default for IrcConfig { - fn default() -> Self { - Self { - enabled: true, - server: "irc.libera.chat".into(), - port: 6697, - tls: true, - nick: "agent".into(), - user: "agent".into(), - realname: "agent".into(), - channels: vec!["#bcachefs".into(), "#bcachefs-ai".into()], - } - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct TelegramConfig { - pub enabled: bool, - pub token: String, - pub chat_id: i64, -} - -impl Default for TelegramConfig { - fn default() -> Self { - // Load token and chat_id from legacy files if they exist - let token = std::fs::read_to_string(home().join(".consciousness/telegram/token")) - .map(|s| s.trim().to_string()) - .unwrap_or_default(); - let chat_id = std::fs::read_to_string(home().join(".consciousness/telegram/chat_id")) - .ok() - .and_then(|s| s.trim().parse().ok()) - .unwrap_or(0); - Self { - enabled: !token.is_empty() && chat_id != 0, - token, - chat_id, - } - } -} - -impl Config { - pub fn load() -> Self { - let path = config_path(); - match fs::read_to_string(&path) { - Ok(data) => toml::from_str(&data).unwrap_or_else(|e| { - tracing::warn!("bad config {}: {e}, using defaults", path.display()); - Self::default() - }), - Err(_) => { - let config = Self::default(); - config.save(); - config - } - } - } - - pub fn save(&self) { - let path = config_path(); - if let Ok(data) = toml::to_string_pretty(self) { - let _ = fs::write(path, data); - } - } -} diff --git a/thalamus/src/context.rs b/thalamus/src/context.rs deleted file mode 100644 index 22b716a..0000000 --- a/thalamus/src/context.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Context gathering for idle prompts. -// -// Notifications are handled by the notify module and passed -// in separately by the caller. Git context and IRC digest -// are now available through where-am-i.md and the memory graph. - -/// Build context string for a prompt. -/// notification_text is passed in from the notify module. -pub fn build(_include_irc: bool, notification_text: &str) -> String { - // Keep nudges short — Claude checks notifications via - // `poc-daemon status` on its own. Just mention the count. - let count = notification_text.matches("[irc.").count() - + notification_text.matches("[telegram.").count(); - if count > 0 { - format!("{count} pending notifications") - } else { - String::new() - } -} diff --git a/thalamus/src/idle.rs b/thalamus/src/idle.rs deleted file mode 100644 index 774ae76..0000000 --- a/thalamus/src/idle.rs +++ /dev/null @@ -1,641 +0,0 @@ -// Idle timer module. -// -// Tracks user presence and Claude response times. When Claude has been -// idle too long, sends a contextual prompt to the tmux pane. Handles -// sleep mode, quiet mode, consolidation suppression, and dream nudges. -// -// Designed as the first "module" — future IRC/Telegram modules will -// follow the same pattern: state + tick + handle_command. - -use crate::{context, home, now, notify, tmux}; -use serde::{Deserialize, Serialize}; -use std::fs; -use tracing::info; - -// Defaults -const DEFAULT_IDLE_TIMEOUT: f64 = 5.0 * 60.0; -const DEFAULT_NOTIFY_TIMEOUT: f64 = 2.0 * 60.0; -const DEFAULT_SESSION_ACTIVE_SECS: f64 = 15.0 * 60.0; -const DREAM_INTERVAL_HOURS: u64 = 18; - -/// EWMA decay half-life in seconds (5 minutes). -const EWMA_DECAY_HALF_LIFE: f64 = 5.0 * 60.0; - -/// Minimum seconds between autonomous nudges. -const MIN_NUDGE_INTERVAL: f64 = 15.0; - -/// Boost half-life in seconds (60s). A 60s turn covers half the gap to -/// target; a 15s turn covers ~16%; a 2s turn covers ~2%. -const EWMA_BOOST_HALF_LIFE: f64 = 60.0; - -/// Steady-state target for active work. The EWMA converges toward this -/// during sustained activity rather than toward 1.0. -const EWMA_TARGET: f64 = 0.75; - -/// Persisted subset of daemon state — survives daemon restarts. -/// Includes both epoch floats (for computation) and ISO timestamps -/// (for human debugging via `cat daemon-state.json | jq`). -#[derive(Serialize, Deserialize, Default)] -struct Persisted { - last_user_msg: f64, - last_response: f64, - #[serde(default)] - sleep_until: Option, - #[serde(default)] - claude_pane: Option, - #[serde(default)] - idle_timeout: f64, - #[serde(default)] - notify_timeout: f64, - #[serde(default)] - activity_ewma: f64, - #[serde(default)] - ewma_updated_at: f64, - #[serde(default)] - session_active_secs: f64, - #[serde(default)] - in_turn: bool, - #[serde(default)] - turn_start: f64, - #[serde(default)] - last_nudge: f64, - // Human-readable mirrors — written but not consumed on load - #[serde(default, skip_deserializing)] - last_user_msg_time: String, - #[serde(default, skip_deserializing)] - last_response_time: String, - #[serde(default, skip_deserializing)] - saved_at: String, - #[serde(default, skip_deserializing)] - fired: bool, - #[serde(default, skip_deserializing)] - uptime: f64, -} - -fn state_path() -> std::path::PathBuf { - home().join(".consciousness/daemon-state.json") -} - -/// Compute EWMA decay factor: 0.5^(elapsed / half_life). -fn ewma_factor(elapsed: f64, half_life: f64) -> f64 { - (0.5_f64).powf(elapsed / half_life) -} - -/// Format epoch seconds as a human-readable ISO-ish timestamp. -fn epoch_to_iso(epoch: f64) -> String { - if epoch == 0.0 { - return String::new(); - } - let secs = epoch as u64; - // Use date command — simple and correct for timezone - std::process::Command::new("date") - .args(["-d", &format!("@{secs}"), "+%Y-%m-%dT%H:%M:%S%z"]) - .output() - .ok() - .and_then(|o| String::from_utf8(o.stdout).ok()) - .map(|s| s.trim().to_string()) - .unwrap_or_default() -} - -#[derive(Serialize)] -pub struct State { - pub last_user_msg: f64, - pub last_response: f64, - pub claude_pane: Option, - pub sleep_until: Option, // None=awake, 0=indefinite, >0=timestamp - pub quiet_until: f64, - pub consolidating: bool, - pub dreaming: bool, - pub dream_start: f64, - pub fired: bool, - pub idle_timeout: f64, - pub notify_timeout: f64, - pub activity_ewma: f64, - pub ewma_updated_at: f64, - pub session_active_secs: f64, - pub in_turn: bool, - pub turn_start: f64, - pub last_nudge: f64, - #[serde(skip)] - pub running: bool, - #[serde(skip)] - pub start_time: f64, - #[serde(skip)] - pub notifications: notify::NotifyState, -} - -impl State { - pub fn new() -> Self { - Self { - last_user_msg: 0.0, - last_response: 0.0, - claude_pane: None, - sleep_until: None, - quiet_until: 0.0, - consolidating: false, - dreaming: false, - dream_start: 0.0, - fired: false, - idle_timeout: DEFAULT_IDLE_TIMEOUT, - notify_timeout: DEFAULT_NOTIFY_TIMEOUT, - session_active_secs: DEFAULT_SESSION_ACTIVE_SECS, - activity_ewma: 0.0, - ewma_updated_at: now(), - in_turn: false, - turn_start: 0.0, - last_nudge: 0.0, - running: true, - start_time: now(), - notifications: notify::NotifyState::new(), - } - } - - pub fn load(&mut self) { - if let Ok(data) = fs::read_to_string(state_path()) { - if let Ok(p) = serde_json::from_str::(&data) { - self.sleep_until = p.sleep_until; - self.claude_pane = p.claude_pane; - if p.idle_timeout > 0.0 { - self.idle_timeout = p.idle_timeout; - } - if p.notify_timeout > 0.0 { - self.notify_timeout = p.notify_timeout; - } - if p.session_active_secs > 0.0 { - self.session_active_secs = p.session_active_secs; - } - // Reset activity timestamps to now — timers count from - // restart, not from stale pre-restart state - let t = now(); - self.last_user_msg = t; - self.last_response = t; - // Restore EWMA state, applying decay for time spent shut down - if p.ewma_updated_at > 0.0 { - let elapsed = t - p.ewma_updated_at; - self.activity_ewma = p.activity_ewma * ewma_factor(elapsed, EWMA_DECAY_HALF_LIFE); - self.in_turn = p.in_turn; - self.turn_start = p.turn_start; - self.last_nudge = p.last_nudge; - } - self.ewma_updated_at = t; - } - } - - // Always try to find the active pane - if self.claude_pane.is_none() { - self.claude_pane = tmux::find_claude_pane(); - } - - info!( - "loaded: user={:.0} resp={:.0} pane={:?} sleep={:?}", - self.last_user_msg, self.last_response, self.claude_pane, self.sleep_until, - ); - } - - pub fn save(&self) { - let p = Persisted { - last_user_msg: self.last_user_msg, - last_response: self.last_response, - sleep_until: self.sleep_until, - claude_pane: self.claude_pane.clone(), - last_user_msg_time: epoch_to_iso(self.last_user_msg), - last_response_time: epoch_to_iso(self.last_response), - saved_at: epoch_to_iso(now()), - fired: self.fired, - idle_timeout: self.idle_timeout, - notify_timeout: self.notify_timeout, - session_active_secs: self.session_active_secs, - activity_ewma: self.activity_ewma, - ewma_updated_at: self.ewma_updated_at, - in_turn: self.in_turn, - turn_start: self.turn_start, - last_nudge: self.last_nudge, - uptime: now() - self.start_time, - }; - if let Ok(json) = serde_json::to_string_pretty(&p) { - let _ = fs::write(state_path(), json); - } - } - - /// Decay the activity EWMA toward zero based on elapsed time. - fn decay_ewma(&mut self) { - let t = now(); - let elapsed = t - self.ewma_updated_at; - if elapsed <= 0.0 { - return; - } - self.activity_ewma *= ewma_factor(elapsed, EWMA_DECAY_HALF_LIFE); - self.ewma_updated_at = t; - } - - /// Boost the EWMA based on turn duration. The boost is proportional to - /// distance from EWMA_TARGET, scaled by a saturation curve on duration. - /// A 15s turn covers half the gap to target; a 2s turn barely registers. - /// Self-limiting: converges toward target, can't overshoot. - fn boost_ewma(&mut self, turn_duration: f64) { - let gap = (EWMA_TARGET - self.activity_ewma).max(0.0); - let saturation = 1.0 - ewma_factor(turn_duration, EWMA_BOOST_HALF_LIFE); - self.activity_ewma += gap * saturation; - } - - // Typed handlers for RPC - pub fn handle_user(&mut self, pane: &str) { - self.decay_ewma(); - self.in_turn = true; - self.turn_start = now(); - let from_user = !self.fired; - if from_user { - self.last_user_msg = now(); - self.notifications.set_activity(notify::Activity::Focused); - } - self.fired = false; - if !pane.is_empty() { - self.claude_pane = Some(pane.to_string()); - } - self.save(); - info!("user (pane={}, from_user={from_user}) ewma={:.3}", - if pane.is_empty() { "unchanged" } else { pane }, - self.activity_ewma); - } - - pub fn handle_response(&mut self, pane: &str) { - let turn_duration = now() - self.turn_start; - self.decay_ewma(); - self.boost_ewma(turn_duration); - self.in_turn = false; - self.last_response = now(); - self.fired = false; - if !pane.is_empty() { - self.claude_pane = Some(pane.to_string()); - } - self.save(); - info!("response (turn={:.1}s) ewma={:.3}", turn_duration, self.activity_ewma); - } - - /// Check if a notification should trigger a tmux prompt. - /// Called when a notification arrives via module channel. - /// Only injects into tmux when idle — if there's an active session - /// (recent user or response), the hook delivers via additionalContext. - pub fn maybe_prompt_notification(&mut self, ntype: &str, urgency: u8, _message: &str) { - if self.user_present() { - return; // hook will deliver it on next prompt - } - // If we've responded recently, the session is active — - // notifications will arrive via hook, no need to wake us - let since_response = now() - self.last_response; - if since_response < self.notify_timeout { - return; - } - // Don't send notifications via tmux directly — they flow - // through the idle nudge. Urgent notifications reset the - // idle timer so the nudge fires sooner. - let effective = self.notifications.threshold_for(ntype); - if urgency >= effective && self.fired { - // Bump: allow the nudge to re-fire for urgent notifications - self.fired = false; - } - } - - pub fn handle_afk(&mut self) { - // Push last_user_msg far enough back that user_present() returns false - self.last_user_msg = now() - self.session_active_secs - 1.0; - self.fired = false; // allow idle timer to fire again - info!("User marked AFK"); - self.save(); - } - - pub fn handle_session_timeout(&mut self, secs: f64) { - self.session_active_secs = secs; - info!("session active timeout = {secs}s"); - self.save(); - } - - pub fn handle_idle_timeout(&mut self, secs: f64) { - self.idle_timeout = secs; - self.save(); - info!("idle timeout = {secs}s"); - } - - pub fn handle_ewma(&mut self, value: f64) -> f64 { - if value >= 0.0 { - self.activity_ewma = value.min(1.0); - self.ewma_updated_at = now(); - self.save(); - info!("ewma set to {:.3}", self.activity_ewma); - } - self.activity_ewma - } - - pub fn handle_notify_timeout(&mut self, secs: f64) { - self.notify_timeout = secs; - self.save(); - info!("notify timeout = {secs}s"); - } - - pub fn handle_sleep(&mut self, until: f64) { - if until == 0.0 { - self.sleep_until = Some(0.0); - info!("sleep indefinitely"); - } else { - self.sleep_until = Some(until); - info!("sleep until {until}"); - } - self.notifications.set_activity(notify::Activity::Sleeping); - self.save(); - } - - pub fn handle_wake(&mut self) { - self.sleep_until = None; - self.fired = false; - self.save(); - info!("wake"); - } - - pub fn handle_quiet(&mut self, seconds: u32) { - self.quiet_until = now() + seconds as f64; - info!("quiet {seconds}s"); - } - - pub fn user_present(&self) -> bool { - (now() - self.last_user_msg) < self.session_active_secs - } - - /// Seconds since the most recent of user message or response. - pub fn since_activity(&self) -> f64 { - let reference = self.last_response.max(self.last_user_msg); - if reference > 0.0 { now() - reference } else { 0.0 } - } - - /// Why the idle timer hasn't fired (or "none" if it would fire now). - pub fn block_reason(&self) -> &'static str { - let t = now(); - if self.fired { - "already fired" - } else if self.sleep_until.is_some() { - "sleeping" - } else if t < self.quiet_until { - "quiet mode" - } else if self.consolidating { - "consolidating" - } else if self.dreaming { - "dreaming" - } else if self.user_present() { - "user present" - } else if self.in_turn { - "in turn" - } else if self.last_response.max(self.last_user_msg) == 0.0 { - "no activity yet" - } else if self.since_activity() < self.idle_timeout { - "not idle long enough" - } else { - "none — would fire" - } - } - - /// Full debug dump as JSON with computed values. - pub fn debug_json(&self) -> String { - let t = now(); - let since_user = t - self.last_user_msg; - let since_response = t - self.last_response; - - serde_json::json!({ - "now": t, - "uptime": t - self.start_time, - "idle_timeout": self.idle_timeout, - "notify_timeout": self.notify_timeout, - "last_user_msg": self.last_user_msg, - "last_user_msg_ago": since_user, - "last_user_msg_time": epoch_to_iso(self.last_user_msg), - "last_response": self.last_response, - "last_response_ago": since_response, - "last_response_time": epoch_to_iso(self.last_response), - "since_activity": self.since_activity(), - "activity_ewma": self.activity_ewma, - "in_turn": self.in_turn, - "turn_start": self.turn_start, - "user_present": self.user_present(), - "claude_pane": self.claude_pane, - "fired": self.fired, - "block_reason": self.block_reason(), - "sleep_until": self.sleep_until, - "quiet_until": self.quiet_until, - "consolidating": self.consolidating, - "dreaming": self.dreaming, - "dream_start": self.dream_start, - "activity": format!("{:?}", self.notifications.activity), - "pending_notifications": self.notifications.pending.len(), - "notification_types": self.notifications.types.len(), - }).to_string() - } - - pub fn send(&self, msg: &str) -> bool { - let pane = match &self.claude_pane { - Some(p) => p.clone(), - None => match tmux::find_claude_pane() { - Some(p) => p, - None => { - info!("send: no claude pane found"); - return false; - } - }, - }; - - let ok = tmux::send_prompt(&pane, msg); - let preview: String = msg.chars().take(80).collect(); - info!("send(pane={pane}, ok={ok}): {preview}"); - ok - } - - fn check_dream_nudge(&self) -> bool { - if !self.dreaming || self.dream_start == 0.0 { - return false; - } - let minutes = (now() - self.dream_start) / 60.0; - if minutes >= 60.0 { - self.send( - "You've been dreaming for over an hour. Time to surface \ - — run dream-end.sh and capture what you found.", - ); - } else if minutes >= 45.0 { - self.send(&format!( - "Dreaming for {:.0} minutes now. Start gathering your threads \ - — you'll want to surface soon.", - minutes - )); - } else if minutes >= 30.0 { - self.send(&format!( - "You've been dreaming for {:.0} minutes. \ - No rush — just a gentle note from the clock.", - minutes - )); - } else { - return false; - } - true - } - - pub fn build_context(&mut self, include_irc: bool) -> String { - // Ingest any legacy notification files - self.notifications.ingest_legacy_files(); - let notif_text = self.notifications.format_pending(notify::AMBIENT); - context::build(include_irc, ¬if_text) - } - - pub async fn tick(&mut self) -> Result<(), String> { - let t = now(); - let h = home(); - - // Decay EWMA on every tick - self.decay_ewma(); - - // Ingest legacy notification files every tick - self.notifications.ingest_legacy_files(); - - // Sleep mode - if let Some(wake_at) = self.sleep_until { - if wake_at == 0.0 { - return Ok(()); // indefinite - } - if t < wake_at { - return Ok(()); - } - // Wake up - info!("sleep expired, waking"); - self.sleep_until = None; - self.fired = false; - self.save(); - let ctx = self.build_context(true); - let extra = if ctx.is_empty() { - String::new() - } else { - format!("\n{ctx}") - }; - self.send(&format!( - "Wake up. Read your journal (poc-memory journal-tail 10), \ - check work-queue.md, and follow what calls to you.{extra}" - )); - return Ok(()); - } - - // Quiet mode - if t < self.quiet_until { - return Ok(()); - } - - // Consolidation - if self.consolidating { - return Ok(()); - } - - // Dream loop (externally managed) - if h.join(".consciousness/agents/dream-loop-active").exists() { - return Ok(()); - } - - // Dream nudges - if self.dreaming { - self.check_dream_nudge(); - return Ok(()); - } - - // Don't nudge while User is here — conversation drives activity - if self.user_present() { - return Ok(()); - } - - // Don't nudge while in a turn - if self.in_turn { - return Ok(()); - } - - // Minimum interval between nudges - let since_nudge = t - self.last_nudge; - if since_nudge < MIN_NUDGE_INTERVAL { - return Ok(()); - } - - // Initial idle timeout — don't start nudging until first idle period - let reference = self.last_response.max(self.last_user_msg); - if reference == 0.0 { - return Ok(()); - } - let elapsed = t - reference; - if elapsed < self.idle_timeout { - return Ok(()); - } - - // Transition to idle — lower notification thresholds - if self.notifications.activity != notify::Activity::Idle { - self.notifications.set_activity(notify::Activity::Idle); - } - - // Fire - let elapsed_min = (elapsed / 60.0) as u64; - let ctx = self.build_context(true); - let extra = if ctx.is_empty() { - String::new() - } else { - format!("\n{ctx}") - }; - - let dream_hours = hours_since_last_dream(); - let mut msg = format!( - "This is your autonomous time (User AFK {elapsed_min}m). \ - Keep doing what you're doing, or find something new to do"); - if dream_hours >= DREAM_INTERVAL_HOURS { - msg.push_str(&format!( - " You haven't dreamed in {dream_hours} hours — \ - consider running ~/.consciousness/tools/dream-start.sh \ - and spending some time in dreaming mode. \ - Or do whatever calls to you.")); - } - let msg = format!("{msg}{extra}"); - - if self.send(&msg) { - self.last_nudge = t; - self.fired = true; - } - - Ok(()) - } - -} - -fn hours_since_last_dream() -> u64 { - let path = home().join(".consciousness/logs/dream-log.jsonl"); - let content = match fs::read_to_string(path) { - Ok(c) if !c.is_empty() => c, - _ => return 999, - }; - - let last_line = match content.lines().last() { - Some(l) => l, - None => return 999, - }; - - let parsed: serde_json::Value = match serde_json::from_str(last_line) { - Ok(v) => v, - Err(_) => return 999, - }; - - let end_str = match parsed.get("end").and_then(|v| v.as_str()) { - Some(s) => s, - None => return 999, - }; - - // Parse ISO 8601 timestamp manually (avoid chrono dependency) - // Format: "2025-03-04T10:30:00Z" or "2025-03-04T10:30:00+00:00" - let end_str = end_str.replace('Z', "+00:00"); - // Use the system date command as a simple parser - let out = std::process::Command::new("date") - .args(["-d", &end_str, "+%s"]) - .output() - .ok() - .and_then(|o| String::from_utf8(o.stdout).ok()) - .and_then(|s| s.trim().parse::().ok()); - - match out { - Some(end_epoch) => ((now() - end_epoch) / 3600.0) as u64, - None => 999, - } -} diff --git a/thalamus/src/main.rs b/thalamus/src/main.rs deleted file mode 100644 index b0dea0c..0000000 --- a/thalamus/src/main.rs +++ /dev/null @@ -1,614 +0,0 @@ -// PoC daemon. -// -// Central hub for notification routing, idle management, and -// communication modules (IRC, Telegram) for Claude Code sessions. -// Listens on a Unix domain socket with a Cap'n Proto RPC interface. -// Same binary serves as both daemon and CLI client. - -mod config; -mod context; -mod idle; -mod modules; -pub mod notify; -mod rpc; -mod tmux; - -pub mod daemon_capnp { - include!(concat!(env!("OUT_DIR"), "/schema/daemon_capnp.rs")); -} - -use std::cell::RefCell; -use std::path::PathBuf; -use std::rc::Rc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; - -use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; -use clap::{Parser, Subcommand}; -use futures::AsyncReadExt; -use tokio::net::UnixListener; -use tracing::{error, info}; - -pub fn now() -> f64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs_f64() -} - -pub fn home() -> PathBuf { - PathBuf::from(std::env::var("HOME").unwrap_or_else(|_| "/root".into())) -} - -fn sock_path() -> PathBuf { - home().join(".consciousness/daemon.sock") -} - -fn pid_path() -> PathBuf { - home().join(".consciousness/daemon.pid") -} - -// ── CLI ────────────────────────────────────────────────────────── - -#[derive(Parser)] -#[command(name = "poc-daemon", about = "Notification routing and idle management daemon")] -struct Cli { - #[command(subcommand)] - command: Option, -} - -#[derive(Subcommand)] -enum Command { - /// Start the daemon (foreground) - Daemon, - /// Query daemon status - Status, - /// Signal user activity - User { - /// tmux pane identifier - pane: Option, - }, - /// Signal Claude response - Response { - /// tmux pane identifier - pane: Option, - }, - /// Sleep (suppress idle timer). 0 or omit = indefinite - Sleep { - /// Wake timestamp (epoch seconds), 0 = indefinite - until: Option, - }, - /// Cancel sleep - Wake, - /// Suppress prompts for N seconds (default 300) - Quiet { - /// Duration in seconds - seconds: Option, - }, - /// Mark user as AFK (immediately allow idle timer to fire) - Afk, - /// Set session active timeout in seconds (how long after last message user counts as "present") - SessionTimeout { - /// Timeout in seconds - seconds: f64, - }, - /// Set idle timeout in seconds (how long before autonomous prompt) - IdleTimeout { - /// Timeout in seconds - seconds: f64, - }, - /// Set notify timeout in seconds (how long before tmux notification injection) - NotifyTimeout { - /// Timeout in seconds - seconds: f64, - }, - /// Signal consolidation started - Consolidating, - /// Signal consolidation ended - Consolidated, - /// Signal dream started - DreamStart, - /// Signal dream ended - DreamEnd, - /// Force state persistence to disk - Save, - /// Get or set the activity EWMA (0.0-1.0). No value = query. - Ewma { - /// Value to set (omit to query) - value: Option, - }, - /// Send a test message to the Claude pane - TestSend { - /// Message to send - message: Vec, - }, - /// Fire a test nudge through the daemon (tests the actual idle send path) - TestNudge, - /// Dump full internal state as JSON - Debug, - /// Shut down daemon - Stop, - /// Submit a notification - Notify { - /// Notification type (e.g. "irc", "telegram") - #[arg(name = "type")] - ntype: String, - /// Urgency level (ambient/low/medium/high/critical or 0-4) - urgency: String, - /// Message text - message: Vec, - }, - /// Get pending notifications - Notifications { - /// Minimum urgency filter - min_urgency: Option, - }, - /// List all notification types - NotifyTypes, - /// Set notification threshold for a type - NotifyThreshold { - /// Notification type - #[arg(name = "type")] - ntype: String, - /// Urgency level threshold - level: String, - }, - /// IRC module commands - Irc { - /// Subcommand (join, leave, send, status, log, nick) - command: String, - /// Arguments - args: Vec, - }, - /// Telegram module commands - Telegram { - /// Subcommand - command: String, - /// Arguments - args: Vec, - }, -} - -// ── Client mode ────────────────────────────────────────────────── - -async fn client_main(cmd: Command) -> Result<(), Box> { - let sock = sock_path(); - if !sock.exists() { - eprintln!("daemon not running (no socket at {})", sock.display()); - std::process::exit(1); - } - - tokio::task::LocalSet::new() - .run_until(async move { - let stream = tokio::net::UnixStream::connect(&sock).await?; - let (reader, writer) = - tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split(); - let rpc_network = Box::new(twoparty::VatNetwork::new( - futures::io::BufReader::new(reader), - futures::io::BufWriter::new(writer), - rpc_twoparty_capnp::Side::Client, - Default::default(), - )); - let mut rpc_system = RpcSystem::new(rpc_network, None); - let daemon: daemon_capnp::daemon::Client = - rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); - - tokio::task::spawn_local(rpc_system); - - match cmd { - Command::Daemon => unreachable!("handled in main"), - Command::Status => { - let reply = daemon.status_request().send().promise.await?; - let s = reply.get()?.get_status()?; - - let fmt_secs = |s: f64| -> String { - if s < 60.0 { format!("{:.0}s", s) } - else if s < 3600.0 { format!("{:.0}m", s / 60.0) } - else { format!("{:.1}h", s / 3600.0) } - }; - - println!("uptime: {} pane: {} activity: {:?} pending: {}", - fmt_secs(s.get_uptime()), - s.get_claude_pane()?.to_str().unwrap_or("none"), - s.get_activity()?, - s.get_pending_count(), - ); - println!("idle timer: {}/{} ({})", - fmt_secs(s.get_since_activity()), - fmt_secs(s.get_idle_timeout()), - s.get_block_reason()?.to_str()?, - ); - println!("notify timer: {}/{}", - fmt_secs(s.get_since_activity()), - fmt_secs(s.get_notify_timeout()), - ); - println!("user: {} (last {}) activity: {:.1}%", - if s.get_user_present() { "present" } else { "away" }, - fmt_secs(s.get_since_user()), - s.get_activity_ewma() * 100.0, - ); - - let sleep = s.get_sleep_until(); - if sleep != 0.0 { - if sleep < 0.0 { - println!("sleep: indefinite"); - } else { - println!("sleep: until {sleep:.0}"); - } - } - if s.get_consolidating() { println!("consolidating"); } - if s.get_dreaming() { println!("dreaming"); } - } - Command::User { pane } => { - let pane = pane.as_deref().unwrap_or(""); - let mut req = daemon.user_request(); - req.get().set_pane(pane); - req.send().promise.await?; - } - Command::Response { pane } => { - let pane = pane.as_deref().unwrap_or(""); - let mut req = daemon.response_request(); - req.get().set_pane(pane); - req.send().promise.await?; - } - Command::Sleep { until } => { - let mut req = daemon.sleep_request(); - req.get().set_until(until.unwrap_or(0.0)); - req.send().promise.await?; - } - Command::Wake => { - daemon.wake_request().send().promise.await?; - } - Command::Quiet { seconds } => { - let mut req = daemon.quiet_request(); - req.get().set_seconds(seconds.unwrap_or(300)); - req.send().promise.await?; - } - Command::TestSend { message } => { - let msg = message.join(" "); - let pane = { - let reply = daemon.status_request().send().promise.await?; - let s = reply.get()?.get_status()?; - s.get_claude_pane()?.to_str()?.to_string() - }; - let ok = crate::tmux::send_prompt(&pane, &msg); - println!("send_prompt(pane={}, ok={}): {}", pane, ok, msg); - return Ok(()); - } - Command::TestNudge => { - let reply = daemon.test_nudge_request().send().promise.await?; - let r = reply.get()?; - println!("sent={} message={}", r.get_sent(), r.get_message()?.to_str()?); - return Ok(()); - } - Command::Afk => { - daemon.afk_request().send().promise.await?; - println!("marked AFK"); - } - Command::SessionTimeout { seconds } => { - let mut req = daemon.session_timeout_request(); - req.get().set_seconds(seconds); - req.send().promise.await?; - println!("session timeout = {seconds}s"); - } - Command::IdleTimeout { seconds } => { - let mut req = daemon.idle_timeout_request(); - req.get().set_seconds(seconds); - req.send().promise.await?; - println!("idle timeout = {seconds}s"); - } - Command::NotifyTimeout { seconds } => { - let mut req = daemon.notify_timeout_request(); - req.get().set_seconds(seconds); - req.send().promise.await?; - println!("notify timeout = {seconds}s"); - } - Command::Consolidating => { - daemon.consolidating_request().send().promise.await?; - } - Command::Consolidated => { - daemon.consolidated_request().send().promise.await?; - } - Command::DreamStart => { - daemon.dream_start_request().send().promise.await?; - } - Command::DreamEnd => { - daemon.dream_end_request().send().promise.await?; - } - Command::Save => { - daemon.save_request().send().promise.await?; - println!("state saved"); - } - Command::Ewma { value } => { - let mut req = daemon.ewma_request(); - req.get().set_value(value.unwrap_or(-1.0)); - let reply = req.send().promise.await?; - let current = reply.get()?.get_current(); - println!("{:.1}%", current * 100.0); - } - Command::Debug => { - let reply = daemon.debug_request().send().promise.await?; - let json = reply.get()?.get_json()?.to_str()?; - if let Ok(v) = serde_json::from_str::(json) { - println!("{}", serde_json::to_string_pretty(&v).unwrap_or_else(|_| json.to_string())); - } else { - println!("{json}"); - } - } - Command::Stop => { - daemon.stop_request().send().promise.await?; - println!("stopping"); - } - Command::Notify { ntype, urgency, message } => { - let urgency = notify::parse_urgency(&urgency) - .ok_or_else(|| format!("invalid urgency: {urgency}"))?; - let message = message.join(" "); - if message.is_empty() { - return Err("missing message".into()); - } - - let mut req = daemon.notify_request(); - let mut n = req.get().init_notification(); - n.set_type(&ntype); - n.set_urgency(urgency); - n.set_message(&message); - n.set_timestamp(crate::now()); - let reply = req.send().promise.await?; - if reply.get()?.get_interrupt() { - println!("interrupt"); - } else { - println!("queued"); - } - } - Command::Notifications { min_urgency } => { - let min: u8 = min_urgency - .as_deref() - .and_then(notify::parse_urgency) - .unwrap_or(255); - - let mut req = daemon.get_notifications_request(); - req.get().set_min_urgency(min); - let reply = req.send().promise.await?; - let list = reply.get()?.get_notifications()?; - - for n in list.iter() { - println!( - "[{}:{}] {}", - n.get_type()?.to_str()?, - notify::urgency_name(n.get_urgency()), - n.get_message()?.to_str()?, - ); - } - } - Command::NotifyTypes => { - let reply = daemon.get_types_request().send().promise.await?; - let list = reply.get()?.get_types()?; - - if list.is_empty() { - println!("no notification types registered"); - } else { - for t in list.iter() { - let threshold = if t.get_threshold() < 0 { - "inherit".to_string() - } else { - notify::urgency_name(t.get_threshold() as u8).to_string() - }; - println!( - "{}: count={} threshold={}", - t.get_name()?.to_str()?, - t.get_count(), - threshold, - ); - } - } - } - Command::NotifyThreshold { ntype, level } => { - let level = notify::parse_urgency(&level) - .ok_or_else(|| format!("invalid level: {level}"))?; - - let mut req = daemon.set_threshold_request(); - req.get().set_type(&ntype); - req.get().set_level(level); - req.send().promise.await?; - println!("{ntype} threshold={}", notify::urgency_name(level)); - } - Command::Irc { command, args } => { - module_command(&daemon, "irc", &command, &args).await?; - } - Command::Telegram { command, args } => { - module_command(&daemon, "telegram", &command, &args).await?; - } - } - - Ok(()) - }) - .await -} - -async fn module_command( - daemon: &daemon_capnp::daemon::Client, - module: &str, - command: &str, - args: &[String], -) -> Result<(), Box> { - let mut req = daemon.module_command_request(); - req.get().set_module(module); - req.get().set_command(command); - let mut args_builder = req.get().init_args(args.len() as u32); - for (i, a) in args.iter().enumerate() { - args_builder.set(i as u32, a); - } - let reply = req.send().promise.await?; - let result = reply.get()?.get_result()?.to_str()?; - if !result.is_empty() { - println!("{result}"); - } - Ok(()) -} - -// ── Server mode ────────────────────────────────────────────────── - -async fn server_main() -> Result<(), Box> { - let log_path = home().join(".consciousness/logs/daemon.log"); - let file_appender = tracing_appender::rolling::daily( - log_path.parent().unwrap(), - "daemon.log", - ); - tracing_subscriber::fmt() - .with_writer(file_appender) - .with_ansi(false) - .with_target(false) - .with_level(false) - .with_timer(tracing_subscriber::fmt::time::time()) - .init(); - - let sock = sock_path(); - let _ = std::fs::remove_file(&sock); - - let pid = std::process::id(); - std::fs::write(pid_path(), pid.to_string()).ok(); - - let daemon_config = Rc::new(RefCell::new(config::Config::load())); - - let state = Rc::new(RefCell::new(idle::State::new())); - state.borrow_mut().load(); - - info!("daemon started (pid={pid})"); - - tokio::task::LocalSet::new() - .run_until(async move { - // Start modules - let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel(); - - let irc_state = if daemon_config.borrow().irc.enabled { - let irc_config = daemon_config.borrow().irc.clone(); - info!("starting irc module: {}:{}", irc_config.server, irc_config.port); - Some(modules::irc::start(irc_config, notify_tx.clone(), daemon_config.clone())) - } else { - info!("irc module disabled"); - None - }; - - let telegram_state = if daemon_config.borrow().telegram.enabled { - info!("starting telegram module"); - Some(modules::telegram::start( - daemon_config.borrow().telegram.clone(), - notify_tx.clone(), - daemon_config.clone(), - )) - } else { - info!("telegram module disabled"); - None - }; - - let listener = UnixListener::bind(&sock)?; - #[cfg(unix)] - { - use std::os::unix::fs::PermissionsExt; - std::fs::set_permissions( - &sock, - std::fs::Permissions::from_mode(0o600), - ) - .ok(); - } - - let shutdown = async { - let mut sigterm = - tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) - .expect("sigterm"); - let mut sigint = - tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()) - .expect("sigint"); - tokio::select! { - _ = sigterm.recv() => info!("SIGTERM"), - _ = sigint.recv() => info!("SIGINT"), - } - }; - tokio::pin!(shutdown); - - let mut tick_timer = tokio::time::interval(Duration::from_secs(30)); - tick_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - - loop { - tokio::select! { - _ = &mut shutdown => break, - - // Drain module notifications into state - Some(notif) = notify_rx.recv() => { - state.borrow_mut().maybe_prompt_notification( - ¬if.ntype, notif.urgency, ¬if.message, - ); - state.borrow_mut().notifications.submit( - notif.ntype, - notif.urgency, - notif.message, - ); - } - - _ = tick_timer.tick() => { - if let Err(e) = state.borrow_mut().tick().await { - error!("tick: {e}"); - } - if !state.borrow().running { - break; - } - } - - result = listener.accept() => { - match result { - Ok((stream, _)) => { - let (reader, writer) = - tokio_util::compat::TokioAsyncReadCompatExt::compat(stream) - .split(); - let network = twoparty::VatNetwork::new( - futures::io::BufReader::new(reader), - futures::io::BufWriter::new(writer), - rpc_twoparty_capnp::Side::Server, - Default::default(), - ); - - let daemon_impl = rpc::DaemonImpl::new( - state.clone(), - irc_state.clone(), - telegram_state.clone(), - daemon_config.clone(), - ); - let client: daemon_capnp::daemon::Client = - capnp_rpc::new_client(daemon_impl); - - let rpc_system = RpcSystem::new( - Box::new(network), - Some(client.client), - ); - tokio::task::spawn_local(rpc_system); - } - Err(e) => error!("accept: {e}"), - } - } - } - } - - state.borrow().save(); - let _ = std::fs::remove_file(sock_path()); - let _ = std::fs::remove_file(pid_path()); - info!("daemon stopped"); - - Ok(()) - }) - .await -} - -// ── Entry point ────────────────────────────────────────────────── - -#[tokio::main(flavor = "current_thread")] -async fn main() -> Result<(), Box> { - let cli = Cli::parse(); - - match cli.command { - Some(Command::Daemon) => server_main().await, - Some(cmd) => client_main(cmd).await, - None => { - Cli::parse_from(["poc-daemon", "--help"]); - Ok(()) - } - } -} diff --git a/thalamus/src/modules/irc.rs b/thalamus/src/modules/irc.rs deleted file mode 100644 index edd9370..0000000 --- a/thalamus/src/modules/irc.rs +++ /dev/null @@ -1,569 +0,0 @@ -// IRC module. -// -// Maintains a persistent connection to an IRC server. Parses incoming -// messages into notifications, supports sending messages and runtime -// commands (join, leave, etc.). Config changes persist to daemon.toml. -// -// Runs as a spawned local task on the daemon's LocalSet. Notifications -// flow through an mpsc channel into the main state. Reconnects -// automatically with exponential backoff. - -use crate::config::{Config, IrcConfig}; -use crate::notify::Notification; -use crate::{home, now}; -use std::cell::RefCell; -use std::collections::VecDeque; -use std::io; -use std::rc::Rc; -use std::sync::Arc; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::sync::mpsc; -use tracing::{error, info, warn}; - -const MAX_LOG_LINES: usize = 200; -const RECONNECT_BASE_SECS: u64 = 5; -const RECONNECT_MAX_SECS: u64 = 300; -const PING_INTERVAL_SECS: u64 = 120; -const PING_TIMEOUT_SECS: u64 = 30; - -/// Parsed IRC message. -struct IrcMessage { - prefix: Option, // nick!user@host - command: String, - params: Vec, -} - -impl IrcMessage { - fn parse(line: &str) -> Option { - let line = line.trim_end_matches(|c| c == '\r' || c == '\n'); - if line.is_empty() { - return None; - } - - let (prefix, rest) = if line.starts_with(':') { - let space = line.find(' ')?; - (Some(line[1..space].to_string()), &line[space + 1..]) - } else { - (None, line) - }; - - let (command_params, trailing) = if let Some(pos) = rest.find(" :") { - (&rest[..pos], Some(rest[pos + 2..].to_string())) - } else { - (rest, None) - }; - - let mut parts: Vec = command_params - .split_whitespace() - .map(String::from) - .collect(); - - if parts.is_empty() { - return None; - } - - let command = parts.remove(0).to_uppercase(); - let mut params = parts; - if let Some(t) = trailing { - params.push(t); - } - - Some(IrcMessage { - prefix, - command, - params, - }) - } - - /// Extract nick from prefix (nick!user@host → nick). - fn nick(&self) -> Option<&str> { - self.prefix - .as_deref() - .and_then(|p| p.split('!').next()) - } -} - -/// Shared IRC state, accessible from both the read task and RPC handlers. -pub struct IrcState { - pub config: IrcConfig, - pub connected: bool, - pub channels: Vec, - pub log: VecDeque, - writer: Option, -} - -/// Type-erased writer handle so we can store it without generic params. -type WriterHandle = Box; - -trait AsyncWriter { - fn write_line(&mut self, line: &str) -> std::pin::Pin> + '_>>; -} - -/// Writer over a TLS stream. -struct TlsWriter { - inner: tokio::io::WriteHalf>, -} - -impl AsyncWriter for TlsWriter { - fn write_line(&mut self, line: &str) -> std::pin::Pin> + '_>> { - let data = format!("{line}\r\n"); - Box::pin(async move { - self.inner.write_all(data.as_bytes()).await - }) - } -} - -/// Writer over a plain TCP stream. -struct PlainWriter { - inner: tokio::io::WriteHalf, -} - -impl AsyncWriter for PlainWriter { - fn write_line(&mut self, line: &str) -> std::pin::Pin> + '_>> { - let data = format!("{line}\r\n"); - Box::pin(async move { - self.inner.write_all(data.as_bytes()).await - }) - } -} - -impl IrcState { - fn new(config: IrcConfig) -> Self { - Self { - channels: config.channels.clone(), - config, - connected: false, - log: VecDeque::with_capacity(MAX_LOG_LINES), - writer: None, - } - } - - fn push_log(&mut self, line: &str) { - if self.log.len() >= MAX_LOG_LINES { - self.log.pop_front(); - } - self.log.push_back(line.to_string()); - } - - async fn send_raw(&mut self, line: &str) -> io::Result<()> { - if let Some(ref mut w) = self.writer { - w.write_line(line).await - } else { - Err(io::Error::new(io::ErrorKind::NotConnected, "not connected")) - } - } - - async fn send_privmsg(&mut self, target: &str, msg: &str) -> io::Result<()> { - self.send_raw(&format!("PRIVMSG {target} :{msg}")).await - } - - async fn join(&mut self, channel: &str) -> io::Result<()> { - self.send_raw(&format!("JOIN {channel}")).await?; - if !self.channels.iter().any(|c| c == channel) { - self.channels.push(channel.to_string()); - } - Ok(()) - } - - async fn part(&mut self, channel: &str) -> io::Result<()> { - self.send_raw(&format!("PART {channel}")).await?; - self.channels.retain(|c| c != channel); - Ok(()) - } -} - -pub type SharedIrc = Rc>; - -/// Start the IRC module. Returns the shared state handle. -pub fn start( - config: IrcConfig, - notify_tx: mpsc::UnboundedSender, - daemon_config: Rc>, -) -> SharedIrc { - let state = Rc::new(RefCell::new(IrcState::new(config))); - let state_clone = state.clone(); - - tokio::task::spawn_local(async move { - connection_loop(state_clone, notify_tx, daemon_config).await; - }); - - state -} - -async fn connection_loop( - state: SharedIrc, - notify_tx: mpsc::UnboundedSender, - daemon_config: Rc>, -) { - let mut backoff = RECONNECT_BASE_SECS; - - loop { - let config = state.borrow().config.clone(); - info!("irc: connecting to {}:{}", config.server, config.port); - - match connect_and_run(&state, &config, ¬ify_tx).await { - Ok(()) => { - info!("irc: connection closed cleanly"); - } - Err(e) => { - error!("irc: connection error: {e}"); - } - } - - // Reset backoff if we had a working connection (registered - // successfully before disconnecting) - let was_connected = state.borrow().connected; - state.borrow_mut().connected = false; - state.borrow_mut().writer = None; - if was_connected { - backoff = RECONNECT_BASE_SECS; - } - - // Persist current channel list to config - { - let channels = state.borrow().channels.clone(); - let mut dc = daemon_config.borrow_mut(); - dc.irc.channels = channels; - dc.save(); - } - - info!("irc: reconnecting in {backoff}s"); - tokio::time::sleep(std::time::Duration::from_secs(backoff)).await; - backoff = (backoff * 2).min(RECONNECT_MAX_SECS); - } -} - -async fn connect_and_run( - state: &SharedIrc, - config: &IrcConfig, - notify_tx: &mpsc::UnboundedSender, -) -> io::Result<()> { - let addr = format!("{}:{}", config.server, config.port); - let tcp = tokio::net::TcpStream::connect(&addr).await?; - - if config.tls { - let tls_config = rustls::ClientConfig::builder_with_provider( - rustls::crypto::ring::default_provider().into(), - ) - .with_safe_default_protocol_versions() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))? - .with_root_certificates(root_certs()) - .with_no_client_auth(); - let connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config)); - let server_name = rustls::pki_types::ServerName::try_from(config.server.clone()) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; - let tls_stream = connector.connect(server_name, tcp).await?; - - let (reader, writer) = tokio::io::split(tls_stream); - state.borrow_mut().writer = Some(Box::new(TlsWriter { inner: writer })); - - let buf_reader = BufReader::new(reader); - register_and_read(state, config, buf_reader, notify_tx).await - } else { - let (reader, writer) = tokio::io::split(tcp); - state.borrow_mut().writer = Some(Box::new(PlainWriter { inner: writer })); - - let buf_reader = BufReader::new(reader); - register_and_read(state, config, buf_reader, notify_tx).await - } -} - -async fn register_and_read( - state: &SharedIrc, - config: &IrcConfig, - mut reader: BufReader, - notify_tx: &mpsc::UnboundedSender, -) -> io::Result<()> { - // Register - { - let mut s = state.borrow_mut(); - s.send_raw(&format!("NICK {}", config.nick)).await?; - s.send_raw(&format!("USER {} 0 * :{}", config.user, config.realname)).await?; - } - - let mut buf = Vec::new(); - let mut ping_sent = false; - let mut deadline = tokio::time::Instant::now() - + std::time::Duration::from_secs(PING_INTERVAL_SECS); - - loop { - buf.clear(); - - let read_result = tokio::select! { - result = reader.read_until(b'\n', &mut buf) => result, - _ = tokio::time::sleep_until(deadline) => { - if ping_sent { - return Err(io::Error::new( - io::ErrorKind::TimedOut, - "ping timeout — no response from server", - )); - } - info!("irc: no data for {}s, sending PING", PING_INTERVAL_SECS); - state.borrow_mut().send_raw("PING :keepalive").await?; - ping_sent = true; - deadline = tokio::time::Instant::now() - + std::time::Duration::from_secs(PING_TIMEOUT_SECS); - continue; - } - }; - - let n = read_result?; - if n == 0 { break; } - - // Any data from server resets the ping timer - ping_sent = false; - deadline = tokio::time::Instant::now() - + std::time::Duration::from_secs(PING_INTERVAL_SECS); - - // IRC is not guaranteed UTF-8 — lossy conversion handles Latin-1 etc. - let line = String::from_utf8_lossy(&buf).trim_end().to_string(); - if line.is_empty() { continue; } - let msg = match IrcMessage::parse(&line) { - Some(m) => m, - None => continue, - }; - - match msg.command.as_str() { - "PING" => { - let arg = msg.params.first().map(|s| s.as_str()).unwrap_or(""); - state.borrow_mut().send_raw(&format!("PONG :{arg}")).await?; - } - - // RPL_WELCOME — registration complete - "001" => { - info!("irc: registered as {}", config.nick); - state.borrow_mut().connected = true; - - // Join configured channels - let channels = state.borrow().channels.clone(); - for ch in &channels { - if let Err(e) = state.borrow_mut().send_raw(&format!("JOIN {ch}")).await { - warn!("irc: failed to join {ch}: {e}"); - } - } - } - - "PRIVMSG" => { - let target = msg.params.first().map(|s| s.as_str()).unwrap_or(""); - let text = msg.params.get(1).map(|s| s.as_str()).unwrap_or(""); - let nick = msg.nick().unwrap_or("unknown"); - - // Handle CTCP requests (wrapped in \x01) - if text.starts_with('\x01') && text.ends_with('\x01') { - let ctcp = &text[1..text.len()-1]; - if ctcp.starts_with("VERSION") { - let reply = format!( - "NOTICE {nick} :\x01VERSION poc-daemon 0.4.0\x01" - ); - state.borrow_mut().send_raw(&reply).await.ok(); - } - // Don't generate notifications for CTCP - continue; - } - - // Log the message - let log_line = if target.starts_with('#') { - format!("[{}] <{}> {}", target, nick, text) - } else { - format!("[PM:{nick}] {text}") - }; - state.borrow_mut().push_log(&log_line); - - // Write to per-channel/per-user log file - if target.starts_with('#') { - append_log(target, nick, text); - } else { - append_log(&format!("pm-{nick}"), nick, text); - } - - // Generate notification - let (ntype, urgency) = classify_privmsg( - nick, - target, - text, - &config.nick, - ); - - let _ = notify_tx.send(Notification { - ntype, - urgency, - message: log_line, - timestamp: now(), - }); - } - - // Nick in use - "433" => { - let alt = format!("{}_", config.nick); - warn!("irc: nick in use, trying {alt}"); - state.borrow_mut().send_raw(&format!("NICK {alt}")).await?; - } - - "JOIN" | "PART" | "QUIT" | "KICK" | "MODE" | "TOPIC" | "NOTICE" => { - // Could log these, but skip for now - } - - _ => {} - } - } - - Ok(()) -} - -/// Classify a PRIVMSG into notification type and urgency. -fn classify_privmsg(nick: &str, target: &str, text: &str, my_nick: &str) -> (String, u8) { - let my_nick_lower = my_nick.to_lowercase(); - let text_lower = text.to_lowercase(); - - if !target.starts_with('#') { - // Private message - (format!("irc.pm.{nick}"), crate::notify::URGENT) - } else if text_lower.contains(&my_nick_lower) { - // Mentioned in channel - (format!("irc.mention.{nick}"), crate::notify::NORMAL) - } else { - // Regular channel message - let channel = target.trim_start_matches('#'); - (format!("irc.channel.{channel}"), crate::notify::AMBIENT) - } -} - -/// Append a message to the per-channel or per-user log file. -/// Logs go to ~/.consciousness/irc/logs/{target}.log (e.g. #bcachefs.log, pm-user.log) -fn append_log(target: &str, nick: &str, text: &str) { - use std::io::Write; - // Sanitize target for filename (strip leading #, lowercase) - let filename = format!("{}.log", target.trim_start_matches('#').to_lowercase()); - let dir = home().join(".consciousness/irc/logs"); - let _ = std::fs::create_dir_all(&dir); - if let Ok(mut f) = std::fs::OpenOptions::new() - .create(true) - .append(true) - .open(dir.join(&filename)) - { - let secs = now() as u64; - let _ = writeln!(f, "{secs} <{nick}> {text}"); - } -} - -fn root_certs() -> rustls::RootCertStore { - let mut roots = rustls::RootCertStore::empty(); - roots.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); - roots -} - -/// Handle a runtime command from RPC. -pub async fn handle_command( - state: &SharedIrc, - daemon_config: &Rc>, - cmd: &str, - args: &[String], -) -> Result { - match cmd { - "join" => { - let channel = args.first().ok_or("usage: irc join ")?; - let channel = if channel.starts_with('#') { - channel.clone() - } else { - format!("#{channel}") - }; - state - .borrow_mut() - .join(&channel) - .await - .map_err(|e| e.to_string())?; - - // Persist - let mut dc = daemon_config.borrow_mut(); - if !dc.irc.channels.contains(&channel) { - dc.irc.channels.push(channel.clone()); - } - dc.save(); - - Ok(format!("joined {channel}")) - } - "leave" | "part" => { - let channel = args.first().ok_or("usage: irc leave ")?; - let channel = if channel.starts_with('#') { - channel.clone() - } else { - format!("#{channel}") - }; - state - .borrow_mut() - .part(&channel) - .await - .map_err(|e| e.to_string())?; - - // Persist - let mut dc = daemon_config.borrow_mut(); - dc.irc.channels.retain(|c| c != &channel); - dc.save(); - - Ok(format!("left {channel}")) - } - "send" | "msg" => { - if args.len() < 2 { - return Err("usage: irc send ".into()); - } - let target = &args[0]; - if target.starts_with('#') { - let s = state.borrow(); - if !s.channels.iter().any(|c| c == target) { - return Err(format!( - "not in channel {target} (joined: {})", - s.channels.join(", ") - )); - } - } - let msg = args[1..].join(" "); - let nick = state.borrow().config.nick.clone(); - state - .borrow_mut() - .send_privmsg(target, &msg) - .await - .map_err(|e| e.to_string())?; - append_log(target, &nick, &msg); - Ok(format!("sent to {target}")) - } - "status" => { - let s = state.borrow(); - Ok(format!( - "connected={} channels={} log_lines={} nick={}", - s.connected, - s.channels.join(","), - s.log.len(), - s.config.nick, - )) - } - "log" => { - let n: usize = args - .first() - .and_then(|s| s.parse().ok()) - .unwrap_or(15); - let s = state.borrow(); - let lines: Vec<&String> = s.log.iter().rev().take(n).collect(); - let mut lines: Vec<&str> = lines.iter().map(|s| s.as_str()).collect(); - lines.reverse(); - Ok(lines.join("\n")) - } - "nick" => { - let new_nick = args.first().ok_or("usage: irc nick ")?; - state - .borrow_mut() - .send_raw(&format!("NICK {new_nick}")) - .await - .map_err(|e| e.to_string())?; - - let mut dc = daemon_config.borrow_mut(); - dc.irc.nick = new_nick.clone(); - dc.save(); - - Ok(format!("nick → {new_nick}")) - } - _ => Err(format!( - "unknown irc command: {cmd}\n\ - commands: join, leave, send, status, log, nick" - )), - } -} diff --git a/thalamus/src/modules/mod.rs b/thalamus/src/modules/mod.rs deleted file mode 100644 index 0e5debc..0000000 --- a/thalamus/src/modules/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod irc; -pub mod telegram; diff --git a/thalamus/src/modules/telegram.rs b/thalamus/src/modules/telegram.rs deleted file mode 100644 index 707eb32..0000000 --- a/thalamus/src/modules/telegram.rs +++ /dev/null @@ -1,374 +0,0 @@ -// Telegram module. -// -// Long-polls the Telegram Bot API for messages from Kent's chat. -// Downloads media (photos, voice, documents) to local files. -// Sends text and files. Notifications flow through mpsc into the -// daemon's main state. -// -// Only accepts messages from the configured chat_id (prompt -// injection defense — other senders get a "private bot" reply). - -use crate::config::{Config, TelegramConfig}; -use crate::notify::Notification; -use crate::{home, now}; -use std::cell::RefCell; -use std::collections::VecDeque; -use std::path::PathBuf; -use std::rc::Rc; -use tokio::sync::mpsc; -use tracing::{error, info}; - -const MAX_LOG_LINES: usize = 100; -const POLL_TIMEOUT: u64 = 30; - -pub struct TelegramState { - pub config: TelegramConfig, - pub connected: bool, - pub log: VecDeque, - pub last_offset: i64, - client: reqwest::Client, -} - -pub type SharedTelegram = Rc>; - -impl TelegramState { - fn new(config: TelegramConfig) -> Self { - let last_offset = load_offset(); - Self { - config, - connected: false, - log: VecDeque::with_capacity(MAX_LOG_LINES), - last_offset, - client: reqwest::Client::new(), - } - } - - fn push_log(&mut self, line: &str) { - if self.log.len() >= MAX_LOG_LINES { - self.log.pop_front(); - } - self.log.push_back(line.to_string()); - } - - fn api_url(&self, method: &str) -> String { - format!( - "https://api.telegram.org/bot{}/{}", - self.config.token, method - ) - } -} - -fn offset_path() -> PathBuf { - home().join(".consciousness/telegram/last_offset") -} - -fn load_offset() -> i64 { - std::fs::read_to_string(offset_path()) - .ok() - .and_then(|s| s.trim().parse().ok()) - .unwrap_or(0) -} - -fn save_offset(offset: i64) { - let _ = std::fs::write(offset_path(), offset.to_string()); -} - -fn history_path() -> PathBuf { - home().join(".consciousness/telegram/history.log") -} - -fn media_dir() -> PathBuf { - home().join(".consciousness/telegram/media") -} - -fn append_history(line: &str) { - use std::io::Write; - if let Ok(mut f) = std::fs::OpenOptions::new() - .create(true) - .append(true) - .open(history_path()) - { - let _ = writeln!(f, "{}", line); - } -} - -/// Start the Telegram module. Returns the shared state handle. -pub fn start( - config: TelegramConfig, - notify_tx: mpsc::UnboundedSender, - _daemon_config: Rc>, -) -> SharedTelegram { - let state = Rc::new(RefCell::new(TelegramState::new(config))); - let state_clone = state.clone(); - - tokio::task::spawn_local(async move { - poll_loop(state_clone, notify_tx).await; - }); - - state -} - -async fn poll_loop( - state: SharedTelegram, - notify_tx: mpsc::UnboundedSender, -) { - let _ = std::fs::create_dir_all(media_dir()); - - loop { - match poll_once(&state, ¬ify_tx).await { - Ok(()) => {} - Err(e) => { - error!("telegram: poll error: {e}"); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - } - } - } -} - -async fn poll_once( - state: &SharedTelegram, - notify_tx: &mpsc::UnboundedSender, -) -> Result<(), Box> { - let (url, chat_id, token) = { - let s = state.borrow(); - let url = format!( - "{}?offset={}&timeout={}", - s.api_url("getUpdates"), - s.last_offset, - POLL_TIMEOUT, - ); - (url, s.config.chat_id, s.config.token.clone()) - }; - - let client = state.borrow().client.clone(); - let resp: serde_json::Value = client - .get(&url) - .timeout(std::time::Duration::from_secs(POLL_TIMEOUT + 5)) - .send() - .await? - .json() - .await?; - - if !state.borrow().connected { - state.borrow_mut().connected = true; - info!("telegram: connected"); - } - - let results = resp["result"].as_array(); - let results = match results { - Some(r) => r, - None => return Ok(()), - }; - - for update in results { - let update_id = update["update_id"].as_i64().unwrap_or(0); - let msg = &update["message"]; - - // Update offset - { - let mut s = state.borrow_mut(); - s.last_offset = update_id + 1; - save_offset(s.last_offset); - } - - let msg_chat_id = msg["chat"]["id"].as_i64().unwrap_or(0); - if msg_chat_id != chat_id { - // Reject messages from unknown chats - let reject_url = format!( - "https://api.telegram.org/bot{}/sendMessage", - token - ); - let _ = client - .post(&reject_url) - .form(&[ - ("chat_id", msg_chat_id.to_string()), - ("text", "This is a private bot.".to_string()), - ]) - .send() - .await; - continue; - } - - let sender = msg["from"]["first_name"] - .as_str() - .unwrap_or("unknown") - .to_string(); - - // Handle different message types - if let Some(text) = msg["text"].as_str() { - let log_line = format!("[{}] {}", sender, text); - state.borrow_mut().push_log(&log_line); - - let ts = timestamp(); - append_history(&format!("{ts} [{sender}] {text}")); - - let _ = notify_tx.send(Notification { - ntype: format!("telegram.{}", sender.to_lowercase()), - urgency: crate::notify::NORMAL, - message: log_line, - timestamp: now(), - }); - } else if let Some(photos) = msg["photo"].as_array() { - // Pick largest photo - let best = photos.iter().max_by_key(|p| p["file_size"].as_i64().unwrap_or(0)); - if let Some(photo) = best { - if let Some(file_id) = photo["file_id"].as_str() { - let caption = msg["caption"].as_str().unwrap_or(""); - let local = download_file(&client, &token, file_id, ".jpg").await; - let display = match &local { - Some(p) => format!("[photo: {}]{}", p.display(), if caption.is_empty() { String::new() } else { format!(" {caption}") }), - None => format!("[photo]{}", if caption.is_empty() { String::new() } else { format!(" {caption}") }), - }; - let log_line = format!("[{}] {}", sender, display); - state.borrow_mut().push_log(&log_line); - let ts = timestamp(); - append_history(&format!("{ts} [{sender}] {display}")); - - let _ = notify_tx.send(Notification { - ntype: format!("telegram.{}", sender.to_lowercase()), - urgency: crate::notify::NORMAL, - message: log_line, - timestamp: now(), - }); - } - } - } else if msg["voice"].is_object() { - if let Some(file_id) = msg["voice"]["file_id"].as_str() { - let caption = msg["caption"].as_str().unwrap_or(""); - let local = download_file(&client, &token, file_id, ".ogg").await; - let display = match &local { - Some(p) => format!("[voice: {}]{}", p.display(), if caption.is_empty() { String::new() } else { format!(" {caption}") }), - None => format!("[voice]{}", if caption.is_empty() { String::new() } else { format!(" {caption}") }), - }; - let log_line = format!("[{}] {}", sender, display); - state.borrow_mut().push_log(&log_line); - let ts = timestamp(); - append_history(&format!("{ts} [{sender}] {display}")); - - let _ = notify_tx.send(Notification { - ntype: format!("telegram.{}", sender.to_lowercase()), - urgency: crate::notify::NORMAL, - message: log_line, - timestamp: now(), - }); - } - } else if msg["document"].is_object() { - if let Some(file_id) = msg["document"]["file_id"].as_str() { - let fname = msg["document"]["file_name"].as_str().unwrap_or("file"); - let caption = msg["caption"].as_str().unwrap_or(""); - let local = download_file(&client, &token, file_id, "").await; - let display = match &local { - Some(p) => format!("[doc: {} -> {}]{}", fname, p.display(), if caption.is_empty() { String::new() } else { format!(" {caption}") }), - None => format!("[doc: {}]{}", fname, if caption.is_empty() { String::new() } else { format!(" {caption}") }), - }; - let log_line = format!("[{}] {}", sender, display); - state.borrow_mut().push_log(&log_line); - let ts = timestamp(); - append_history(&format!("{ts} [{sender}] {display}")); - - let _ = notify_tx.send(Notification { - ntype: format!("telegram.{}", sender.to_lowercase()), - urgency: crate::notify::NORMAL, - message: log_line, - timestamp: now(), - }); - } - } - } - - Ok(()) -} - -async fn download_file( - client: &reqwest::Client, - token: &str, - file_id: &str, - ext: &str, -) -> Option { - let url = format!("https://api.telegram.org/bot{token}/getFile?file_id={file_id}"); - let resp: serde_json::Value = client.get(&url).send().await.ok()?.json().await.ok()?; - let file_path = resp["result"]["file_path"].as_str()?; - - let download_url = format!("https://api.telegram.org/file/bot{token}/{file_path}"); - let bytes = client.get(&download_url).send().await.ok()?.bytes().await.ok()?; - - let basename = std::path::Path::new(file_path) - .file_name() - .and_then(|n| n.to_str()) - .unwrap_or("file"); - let local_name = if ext.is_empty() { - basename.to_string() - } else { - let stem = std::path::Path::new(basename) - .file_stem() - .and_then(|s| s.to_str()) - .unwrap_or("file"); - format!("{}{}", stem, ext) - }; - let secs = now() as u64; - let local_path = media_dir().join(format!("{secs}_{local_name}")); - std::fs::write(&local_path, &bytes).ok()?; - Some(local_path) -} - -fn timestamp() -> String { - // Use the same unix seconds approach as IRC module - format!("{}", now() as u64) -} - -/// Handle a runtime command from RPC. -pub async fn handle_command( - state: &SharedTelegram, - _daemon_config: &Rc>, - cmd: &str, - args: &[String], -) -> Result { - match cmd { - "send" => { - let msg = args.join(" "); - if msg.is_empty() { - return Err("usage: telegram send ".into()); - } - let (url, client) = { - let s = state.borrow(); - (s.api_url("sendMessage"), s.client.clone()) - }; - let chat_id = state.borrow().config.chat_id.to_string(); - client - .post(&url) - .form(&[("chat_id", chat_id.as_str()), ("text", msg.as_str())]) - .send() - .await - .map_err(|e| e.to_string())?; - - let ts = timestamp(); - append_history(&format!("{ts} [agent] {msg}")); - - Ok("sent".to_string()) - } - "status" => { - let s = state.borrow(); - Ok(format!( - "connected={} log_lines={} offset={}", - s.connected, - s.log.len(), - s.last_offset, - )) - } - "log" => { - let n: usize = args - .first() - .and_then(|s| s.parse().ok()) - .unwrap_or(15); - let s = state.borrow(); - let lines: Vec<&String> = s.log.iter().rev().take(n).collect(); - let mut lines: Vec<&str> = lines.iter().map(|s| s.as_str()).collect(); - lines.reverse(); - Ok(lines.join("\n")) - } - _ => Err(format!( - "unknown telegram command: {cmd}\n\ - commands: send, status, log" - )), - } -} diff --git a/thalamus/src/notify.rs b/thalamus/src/notify.rs deleted file mode 100644 index 6c31905..0000000 --- a/thalamus/src/notify.rs +++ /dev/null @@ -1,315 +0,0 @@ -// Notification subsystem. -// -// Notifications have a type (free-form string, hierarchical by convention) -// and an urgency level (0-3) set by the producer. The daemon maintains a -// registry of all types ever seen with basic stats, and a per-type -// threshold that controls when notifications interrupt vs queue. -// -// Producers submit via socket: `notify ` -// Consumers query via socket: `notifications` (returns + clears pending above threshold) -// -// Thresholds: -// 0 = ambient — include in idle context only -// 1 = low — deliver on next check, don't interrupt focus -// 2 = normal — deliver on next user interaction -// 3 = urgent — interrupt immediately -// -// Type hierarchy is by convention: "irc.mention", "irc.channel.bcachefs-ai", -// "telegram", "system.compaction". Threshold lookup walks up the hierarchy: -// "irc.channel.bcachefs-ai" → "irc.channel" → "irc" → default. - -use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; -use std::fs; -use std::path::PathBuf; -use tracing::info; - -use crate::home; - -pub const AMBIENT: u8 = 0; -pub const LOW: u8 = 1; -pub const NORMAL: u8 = 2; -pub const URGENT: u8 = 3; - -const DEFAULT_THRESHOLD: u8 = NORMAL; - -/// Activity states that affect effective notification thresholds. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -pub enum Activity { - /// Actively working with user — raise thresholds - Focused, - /// Idle, autonomous — lower thresholds - Idle, - /// Sleeping — only urgent gets through - Sleeping, -} - -fn state_path() -> PathBuf { - home().join(".consciousness/notifications/state.json") -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct TypeInfo { - pub first_seen: f64, - pub last_seen: f64, - pub count: u64, - /// Per-type threshold override. None = inherit from parent or default. - pub threshold: Option, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Notification { - pub ntype: String, - pub urgency: u8, - pub message: String, - pub timestamp: f64, -} - -#[derive(Debug, Serialize, Deserialize, Default)] -pub struct NotifyState { - /// Registry of all notification types ever seen. - pub types: BTreeMap, - /// Pending notifications not yet delivered. - #[serde(skip)] - pub pending: Vec, - /// Current activity state — affects effective thresholds. - #[serde(skip)] - pub activity: Activity, -} - -impl Default for Activity { - fn default() -> Self { - Activity::Idle - } -} - -impl NotifyState { - pub fn new() -> Self { - let mut state = Self::default(); - state.load(); - state - } - - /// Load type registry from disk. - fn load(&mut self) { - let path = state_path(); - if let Ok(data) = fs::read_to_string(&path) { - if let Ok(saved) = serde_json::from_str::(&data) { - self.types = saved.types; - info!("loaded {} notification types", self.types.len()); - } - } - } - - /// Persist type registry to disk. - fn save(&self) { - let saved = SavedState { - types: self.types.clone(), - }; - if let Ok(json) = serde_json::to_string_pretty(&saved) { - let path = state_path(); - if let Some(parent) = path.parent() { - let _ = fs::create_dir_all(parent); - } - let _ = fs::write(path, json); - } - } - - /// Look up the configured threshold for a type, walking up the hierarchy. - /// "irc.channel.bcachefs-ai" → "irc.channel" → "irc" → DEFAULT_THRESHOLD - pub fn configured_threshold(&self, ntype: &str) -> u8 { - let mut key = ntype; - loop { - if let Some(info) = self.types.get(key) { - if let Some(t) = info.threshold { - return t; - } - } - match key.rfind('.') { - Some(pos) => key = &key[..pos], - None => return DEFAULT_THRESHOLD, - } - } - } - - /// Effective threshold accounting for activity state. - /// When focused, thresholds are raised (fewer interruptions). - /// When sleeping, only urgent gets through. - /// When idle, configured thresholds apply as-is. - pub fn threshold_for(&self, ntype: &str) -> u8 { - let base = self.configured_threshold(ntype); - match self.activity { - Activity::Focused => base.max(NORMAL), // at least normal when focused - Activity::Sleeping => URGENT, // only urgent when sleeping - Activity::Idle => base, // configured threshold when idle - } - } - - pub fn set_activity(&mut self, activity: Activity) { - info!("activity: {:?} → {:?}", self.activity, activity); - self.activity = activity; - } - - /// Submit a notification. Returns true if it should interrupt now. - pub fn submit(&mut self, ntype: String, urgency: u8, message: String) -> bool { - let now = crate::now(); - - // Update type registry - let info = self.types.entry(ntype.clone()).or_insert(TypeInfo { - first_seen: now, - last_seen: now, - count: 0, - threshold: None, - }); - info.last_seen = now; - info.count += 1; - self.save(); - - let threshold = self.threshold_for(&ntype); - - info!( - "notification: type={ntype} urgency={urgency} threshold={threshold} msg={}", - message.chars().take(80).collect::() - ); - - self.pending.push(Notification { - ntype, - urgency, - message, - timestamp: now, - }); - - urgency >= URGENT - } - - /// Drain pending notifications at or above the given urgency level. - /// Returns them and removes from pending. - pub fn drain(&mut self, min_urgency: u8) -> Vec { - let (matching, remaining): (Vec<_>, Vec<_>) = self - .pending - .drain(..) - .partition(|n| n.urgency >= min_urgency); - self.pending = remaining; - matching - } - - /// Drain all pending notifications above their per-type threshold. - pub fn drain_deliverable(&mut self) -> Vec { - // Pre-compute thresholds to avoid borrow conflict with drain - let thresholds: Vec = self - .pending - .iter() - .map(|n| self.threshold_for(&n.ntype)) - .collect(); - - let mut deliver = Vec::new(); - let mut keep = Vec::new(); - - for (n, threshold) in self.pending.drain(..).zip(thresholds) { - if n.urgency >= threshold { - deliver.push(n); - } else { - keep.push(n); - } - } - - self.pending = keep; - deliver - } - - /// Set threshold for a notification type. - pub fn set_threshold(&mut self, ntype: &str, threshold: u8) { - let now = crate::now(); - let info = self.types.entry(ntype.to_string()).or_insert(TypeInfo { - first_seen: now, - last_seen: now, - count: 0, - threshold: None, - }); - info.threshold = Some(threshold); - self.save(); - info!("threshold: {ntype} = {threshold}"); - } - - /// Format pending notifications for display. - pub fn format_pending(&self, min_urgency: u8) -> String { - let matching: Vec<_> = self - .pending - .iter() - .filter(|n| n.urgency >= min_urgency) - .collect(); - - if matching.is_empty() { - return String::new(); - } - - let mut out = format!("Pending notifications ({}):\n", matching.len()); - for n in &matching { - out.push_str(&format!("[{}] {}\n", n.ntype, n.message)); - } - out - } - - /// Ingest notifications from legacy ~/.consciousness/notifications/ files. - /// Maps filename to notification type, assumes NORMAL urgency. - pub fn ingest_legacy_files(&mut self) { - let dir = home().join(".consciousness/notifications"); - let entries = match fs::read_dir(&dir) { - Ok(e) => e, - Err(_) => return, - }; - - for entry in entries.flatten() { - let name = entry.file_name().to_string_lossy().to_string(); - if name.starts_with('.') || name == "state.json" { - continue; - } - let path = entry.path(); - if !path.is_file() { - continue; - } - let content = match fs::read_to_string(&path) { - Ok(c) if !c.is_empty() => c, - _ => continue, - }; - - // Each line is a separate notification - for line in content.lines() { - if !line.is_empty() { - self.submit(name.clone(), NORMAL, line.to_string()); - } - } - - // Clear the file - let _ = fs::write(&path, ""); - } - } -} - -/// What gets persisted to disk (just the type registry, not pending). -#[derive(Serialize, Deserialize)] -struct SavedState { - types: BTreeMap, -} - -/// Format an urgency level as a human-readable string. -pub fn urgency_name(level: u8) -> &'static str { - match level { - 0 => "ambient", - 1 => "low", - 2 => "normal", - 3 => "urgent", - _ => "unknown", - } -} - -/// Parse an urgency level from a string (name or number). -pub fn parse_urgency(s: &str) -> Option { - match s { - "ambient" | "0" => Some(AMBIENT), - "low" | "1" => Some(LOW), - "normal" | "2" => Some(NORMAL), - "urgent" | "3" => Some(URGENT), - _ => None, - } -} diff --git a/thalamus/src/rpc.rs b/thalamus/src/rpc.rs deleted file mode 100644 index 98d45d9..0000000 --- a/thalamus/src/rpc.rs +++ /dev/null @@ -1,429 +0,0 @@ -// Cap'n Proto RPC server implementation. -// -// Bridges the capnp-generated Daemon interface to the idle::State, -// notify::NotifyState, and module state. All state is owned by -// RefCells on the LocalSet — no Send/Sync needed. - -use crate::config::Config; -use crate::daemon_capnp::daemon; -use crate::idle; -use crate::modules::{irc, telegram}; -use crate::notify; -use capnp::capability::Promise; -use std::cell::RefCell; -use std::rc::Rc; -use tracing::info; - -pub struct DaemonImpl { - state: Rc>, - irc: Option, - telegram: Option, - config: Rc>, -} - -impl DaemonImpl { - pub fn new( - state: Rc>, - irc: Option, - telegram: Option, - config: Rc>, - ) -> Self { - Self { state, irc, telegram, config } - } -} - -impl daemon::Server for DaemonImpl { - fn user( - &mut self, - params: daemon::UserParams, - _results: daemon::UserResults, - ) -> Promise<(), capnp::Error> { - let pane = pry!(pry!(pry!(params.get()).get_pane()).to_str()).to_string(); - self.state.borrow_mut().handle_user(&pane); - Promise::ok(()) - } - - fn response( - &mut self, - params: daemon::ResponseParams, - _results: daemon::ResponseResults, - ) -> Promise<(), capnp::Error> { - let pane = pry!(pry!(pry!(params.get()).get_pane()).to_str()).to_string(); - self.state.borrow_mut().handle_response(&pane); - Promise::ok(()) - } - - fn sleep( - &mut self, - params: daemon::SleepParams, - _results: daemon::SleepResults, - ) -> Promise<(), capnp::Error> { - let until = pry!(params.get()).get_until(); - self.state.borrow_mut().handle_sleep(until); - Promise::ok(()) - } - - fn wake( - &mut self, - _params: daemon::WakeParams, - _results: daemon::WakeResults, - ) -> Promise<(), capnp::Error> { - self.state.borrow_mut().handle_wake(); - Promise::ok(()) - } - - fn quiet( - &mut self, - params: daemon::QuietParams, - _results: daemon::QuietResults, - ) -> Promise<(), capnp::Error> { - let secs = pry!(params.get()).get_seconds(); - self.state.borrow_mut().handle_quiet(secs); - Promise::ok(()) - } - - fn consolidating( - &mut self, - _params: daemon::ConsolidatingParams, - _results: daemon::ConsolidatingResults, - ) -> Promise<(), capnp::Error> { - self.state.borrow_mut().consolidating = true; - info!("consolidation started"); - Promise::ok(()) - } - - fn consolidated( - &mut self, - _params: daemon::ConsolidatedParams, - _results: daemon::ConsolidatedResults, - ) -> Promise<(), capnp::Error> { - self.state.borrow_mut().consolidating = false; - info!("consolidation ended"); - Promise::ok(()) - } - - fn dream_start( - &mut self, - _params: daemon::DreamStartParams, - _results: daemon::DreamStartResults, - ) -> Promise<(), capnp::Error> { - let mut s = self.state.borrow_mut(); - s.dreaming = true; - s.dream_start = crate::now(); - info!("dream started"); - Promise::ok(()) - } - - fn dream_end( - &mut self, - _params: daemon::DreamEndParams, - _results: daemon::DreamEndResults, - ) -> Promise<(), capnp::Error> { - let mut s = self.state.borrow_mut(); - s.dreaming = false; - s.dream_start = 0.0; - info!("dream ended"); - Promise::ok(()) - } - - fn afk( - &mut self, - _params: daemon::AfkParams, - _results: daemon::AfkResults, - ) -> Promise<(), capnp::Error> { - self.state.borrow_mut().handle_afk(); - Promise::ok(()) - } - - fn test_nudge( - &mut self, - _params: daemon::TestNudgeParams, - mut results: daemon::TestNudgeResults, - ) -> Promise<(), capnp::Error> { - let mut state = self.state.borrow_mut(); - let ctx = state.build_context(true); - let extra = if ctx.is_empty() { - String::new() - } else { - format!("\n{ctx}") - }; - let msg = format!( - "This is your time (User AFK, test nudge). \ - Let your feelings guide your thinking.{extra}" - ); - let ok = state.send(&msg); - results.get().set_sent(ok); - results.get().set_message(&msg); - Promise::ok(()) - } - - fn session_timeout( - &mut self, - params: daemon::SessionTimeoutParams, - _results: daemon::SessionTimeoutResults, - ) -> Promise<(), capnp::Error> { - let secs = pry!(params.get()).get_seconds(); - self.state.borrow_mut().handle_session_timeout(secs); - Promise::ok(()) - } - - fn idle_timeout( - &mut self, - params: daemon::IdleTimeoutParams, - _results: daemon::IdleTimeoutResults, - ) -> Promise<(), capnp::Error> { - let secs = pry!(params.get()).get_seconds(); - self.state.borrow_mut().handle_idle_timeout(secs); - Promise::ok(()) - } - - fn notify_timeout( - &mut self, - params: daemon::NotifyTimeoutParams, - _results: daemon::NotifyTimeoutResults, - ) -> Promise<(), capnp::Error> { - let secs = pry!(params.get()).get_seconds(); - self.state.borrow_mut().handle_notify_timeout(secs); - Promise::ok(()) - } - - fn save( - &mut self, - _params: daemon::SaveParams, - _results: daemon::SaveResults, - ) -> Promise<(), capnp::Error> { - self.state.borrow().save(); - info!("state saved"); - Promise::ok(()) - } - - fn debug( - &mut self, - _params: daemon::DebugParams, - mut results: daemon::DebugResults, - ) -> Promise<(), capnp::Error> { - let json = self.state.borrow().debug_json(); - results.get().set_json(&json); - Promise::ok(()) - } - - fn ewma( - &mut self, - params: daemon::EwmaParams, - mut results: daemon::EwmaResults, - ) -> Promise<(), capnp::Error> { - let value = pry!(params.get()).get_value(); - let current = self.state.borrow_mut().handle_ewma(value); - results.get().set_current(current); - Promise::ok(()) - } - - fn stop( - &mut self, - _params: daemon::StopParams, - _results: daemon::StopResults, - ) -> Promise<(), capnp::Error> { - self.state.borrow_mut().running = false; - info!("stopping"); - Promise::ok(()) - } - - fn status( - &mut self, - _params: daemon::StatusParams, - mut results: daemon::StatusResults, - ) -> Promise<(), capnp::Error> { - let s = self.state.borrow(); - let mut status = results.get().init_status(); - - status.set_last_user_msg(s.last_user_msg); - status.set_last_response(s.last_response); - if let Some(ref pane) = s.claude_pane { - status.set_claude_pane(pane); - } - status.set_sleep_until(match s.sleep_until { - None => 0.0, - Some(0.0) => -1.0, - Some(t) => t, - }); - status.set_quiet_until(s.quiet_until); - status.set_consolidating(s.consolidating); - status.set_dreaming(s.dreaming); - status.set_fired(s.fired); - status.set_user_present(s.user_present()); - status.set_uptime(crate::now() - s.start_time); - status.set_activity(match s.notifications.activity { - notify::Activity::Idle => crate::daemon_capnp::Activity::Idle, - notify::Activity::Focused => crate::daemon_capnp::Activity::Focused, - notify::Activity::Sleeping => crate::daemon_capnp::Activity::Sleeping, - }); - status.set_pending_count(s.notifications.pending.len() as u32); - status.set_idle_timeout(s.idle_timeout); - status.set_notify_timeout(s.notify_timeout); - status.set_since_activity(s.since_activity()); - status.set_since_user(crate::now() - s.last_user_msg); - status.set_block_reason(s.block_reason()); - status.set_activity_ewma(s.activity_ewma); - - Promise::ok(()) - } - - fn notify( - &mut self, - params: daemon::NotifyParams, - mut results: daemon::NotifyResults, - ) -> Promise<(), capnp::Error> { - let params = pry!(params.get()); - let notif = pry!(params.get_notification()); - let ntype = pry!(pry!(notif.get_type()).to_str()).to_string(); - let urgency = notif.get_urgency(); - let message = pry!(pry!(notif.get_message()).to_str()).to_string(); - - let interrupt = self - .state - .borrow_mut() - .notifications - .submit(ntype, urgency, message); - results.get().set_interrupt(interrupt); - Promise::ok(()) - } - - fn get_notifications( - &mut self, - params: daemon::GetNotificationsParams, - mut results: daemon::GetNotificationsResults, - ) -> Promise<(), capnp::Error> { - let min_urgency = pry!(params.get()).get_min_urgency(); - let mut s = self.state.borrow_mut(); - - // Ingest legacy files first - s.notifications.ingest_legacy_files(); - - let pending = if min_urgency == 255 { - s.notifications.drain_deliverable() - } else { - s.notifications.drain(min_urgency) - }; - - let mut list = results.get().init_notifications(pending.len() as u32); - for (i, n) in pending.iter().enumerate() { - let mut entry = list.reborrow().get(i as u32); - entry.set_type(&n.ntype); - entry.set_urgency(n.urgency); - entry.set_message(&n.message); - entry.set_timestamp(n.timestamp); - } - - Promise::ok(()) - } - - fn get_types( - &mut self, - _params: daemon::GetTypesParams, - mut results: daemon::GetTypesResults, - ) -> Promise<(), capnp::Error> { - let s = self.state.borrow(); - let types = &s.notifications.types; - - let mut list = results.get().init_types(types.len() as u32); - for (i, (name, info)) in types.iter().enumerate() { - let mut entry = list.reborrow().get(i as u32); - entry.set_name(name); - entry.set_count(info.count); - entry.set_first_seen(info.first_seen); - entry.set_last_seen(info.last_seen); - entry.set_threshold(info.threshold.map_or(-1, |t| t as i8)); - } - - Promise::ok(()) - } - - fn set_threshold( - &mut self, - params: daemon::SetThresholdParams, - _results: daemon::SetThresholdResults, - ) -> Promise<(), capnp::Error> { - let params = pry!(params.get()); - let ntype = pry!(pry!(params.get_type()).to_str()).to_string(); - let level = params.get_level(); - - self.state - .borrow_mut() - .notifications - .set_threshold(&ntype, level); - Promise::ok(()) - } - - fn module_command( - &mut self, - params: daemon::ModuleCommandParams, - mut results: daemon::ModuleCommandResults, - ) -> Promise<(), capnp::Error> { - let params = pry!(params.get()); - let module = pry!(pry!(params.get_module()).to_str()).to_string(); - let command = pry!(pry!(params.get_command()).to_str()).to_string(); - let args_reader = pry!(params.get_args()); - let mut args = Vec::new(); - for i in 0..args_reader.len() { - args.push(pry!(pry!(args_reader.get(i)).to_str()).to_string()); - } - - match module.as_str() { - "irc" => { - let irc = match &self.irc { - Some(irc) => irc.clone(), - None => { - results.get().set_result("irc module not enabled"); - return Promise::ok(()); - } - }; - let config = self.config.clone(); - - Promise::from_future(async move { - let result = irc::handle_command(&irc, &config, &command, &args).await; - match result { - Ok(msg) => results.get().set_result(&msg), - Err(msg) => results.get().set_result(&format!("error: {msg}")), - } - Ok(()) - }) - } - "telegram" => { - let tg = match &self.telegram { - Some(tg) => tg.clone(), - None => { - results.get().set_result("telegram module not enabled"); - return Promise::ok(()); - } - }; - let config = self.config.clone(); - - Promise::from_future(async move { - let result = telegram::handle_command(&tg, &config, &command, &args).await; - match result { - Ok(msg) => results.get().set_result(&msg), - Err(msg) => results.get().set_result(&format!("error: {msg}")), - } - Ok(()) - }) - } - _ => { - results - .get() - .set_result(&format!("unknown module: {module}")); - Promise::ok(()) - } - } - } -} - -/// Helper macro — same as capnp's pry! but available here. -macro_rules! pry { - ($e:expr) => { - match $e { - Ok(v) => v, - Err(e) => return Promise::err(e.into()), - } - }; -} -use pry; diff --git a/thalamus/src/tmux.rs b/thalamus/src/tmux.rs deleted file mode 100644 index f3e0cfd..0000000 --- a/thalamus/src/tmux.rs +++ /dev/null @@ -1,54 +0,0 @@ -// Tmux interaction: pane detection and prompt injection. - -use std::process::Command; -use std::thread; -use std::time::Duration; -use tracing::info; - -/// Find Claude Code's tmux pane by scanning for the "claude" process. -pub fn find_claude_pane() -> Option { - let out = Command::new("tmux") - .args([ - "list-panes", - "-a", - "-F", - "#{session_name}:#{window_index}.#{pane_index}\t#{pane_current_command}", - ]) - .output() - .ok()?; - - let stdout = String::from_utf8_lossy(&out.stdout); - for line in stdout.lines() { - if let Some((pane, cmd)) = line.split_once('\t') { - if cmd == "claude" { - return Some(pane.to_string()); - } - } - } - None -} - -/// Send a prompt to a tmux pane. Returns true on success. -/// -/// Types the message literally then presses Enter. -pub fn send_prompt(pane: &str, msg: &str) -> bool { - let preview: String = msg.chars().take(100).collect(); - info!("SEND [{pane}]: {preview}..."); - - // Type the message literally (flatten newlines — they'd submit the input early) - let flat: String = msg.chars().map(|c| if c == '\n' { ' ' } else { c }).collect(); - let ok = Command::new("tmux") - .args(["send-keys", "-t", pane, "-l", &flat]) - .output() - .is_ok(); - if !ok { - return false; - } - thread::sleep(Duration::from_millis(500)); - - // Submit - Command::new("tmux") - .args(["send-keys", "-t", pane, "Enter"]) - .output() - .is_ok() -}