From ad5f69abb852892d1919ad2659359171eaab863e Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Fri, 3 Apr 2026 18:46:14 -0400 Subject: [PATCH] channel architecture: wire protocol, daemons, supervisor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Design and implement the channel system for external communications: - schema/channel.capnp: wire protocol for channel daemons (recv with all_new/min_count, send, subscribe, list) - channels/irc/: standalone IRC daemon crate (consciousness-channel-irc) - channels/telegram/: standalone Telegram daemon crate (consciousness-channel-telegram) - src/thalamus/channels.rs: client connecting to daemon sockets - src/thalamus/supervisor.rs: daemon lifecycle with file locking for multi-instance safety Channel daemons listen on ~/.consciousness/channels/*.sock, configs in *.json5, supervisor discovers and starts them. IRC/Telegram modules removed from thalamus core — they're now independent daemons that survive consciousness restarts. Also: delete standalone tui.rs (moved to consciousness F4/F5), fix build warnings, add F5 thalamus screen with channel status. Co-Developed-By: Kent Overstreet --- Cargo.lock | 40 ++ Cargo.toml | 2 +- build.rs | 5 + channels/irc/Cargo.toml | 22 + channels/irc/src/main.rs | 698 ++++++++++++++++++++++++ channels/telegram/Cargo.toml | 18 + channels/telegram/src/main.rs | 414 +++++++++++++++ schema/channel.capnp | 59 +++ src/bin/consciousness.rs | 5 + src/cli/misc.rs | 9 +- src/lib.rs | 6 +- src/main.rs | 2 +- src/thalamus/channels.rs | 178 +++++++ src/thalamus/mod.rs | 30 +- src/thalamus/modules/irc.rs | 569 -------------------- src/thalamus/modules/mod.rs | 4 +- src/thalamus/modules/telegram.rs | 374 ------------- src/thalamus/rpc.rs | 53 +- src/thalamus/supervisor.rs | 204 +++++++ src/thought/training.rs | 1 + src/tui.rs | 885 ------------------------------- src/user/tui/mod.rs | 1 + src/user/tui/thalamus_screen.rs | 58 +- 23 files changed, 1716 insertions(+), 1921 deletions(-) create mode 100644 channels/irc/Cargo.toml create mode 100644 channels/irc/src/main.rs create mode 100644 channels/telegram/Cargo.toml create mode 100644 channels/telegram/src/main.rs create mode 100644 schema/channel.capnp create mode 100644 src/thalamus/channels.rs delete mode 100644 src/thalamus/modules/irc.rs delete mode 100644 src/thalamus/modules/telegram.rs create mode 100644 src/thalamus/supervisor.rs delete mode 100644 src/tui.rs diff --git a/Cargo.lock b/Cargo.lock index c649a12..9d24eb3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -583,6 +583,46 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "consciousness-channel-irc" +version = "0.4.0" +dependencies = [ + "capnp", + "capnp-rpc", + "dirs", + "futures", + "json5", + "libc", + "poc-memory", + "rustls", + "serde", + "serde_json", + "tokio", + "tokio-rustls", + "tokio-util", + "tracing", + "tracing-subscriber", + "webpki-roots", +] + +[[package]] +name = "consciousness-channel-telegram" +version = "0.4.0" +dependencies = [ + "capnp", + "capnp-rpc", + "dirs", + "futures", + "poc-memory", + "reqwest", + "serde", + "serde_json", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", +] + [[package]] name = "console-api" version = "0.8.1" diff --git a/Cargo.toml b/Cargo.toml index 7345041..7d9091c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["thalamus"] +members = ["thalamus", "channels/irc", "channels/telegram"] resolver = "2" [workspace.package] diff --git a/build.rs b/build.rs index 02c0e4b..808bf31 100644 --- a/build.rs +++ b/build.rs @@ -8,4 +8,9 @@ fn main() { .file("schema/daemon.capnp") .run() .expect("capnp compile failed (daemon.capnp)"); + + capnpc::CompilerCommand::new() + .file("schema/channel.capnp") + .run() + .expect("capnp compile failed (channel.capnp)"); } diff --git a/channels/irc/Cargo.toml b/channels/irc/Cargo.toml new file mode 100644 index 0000000..14e46dd --- /dev/null +++ b/channels/irc/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "consciousness-channel-irc" +version.workspace = true +edition.workspace = true + +[dependencies] +capnp = "0.20" +capnp-rpc = "0.20" +dirs = "6" +futures = "0.3" +json5 = "0.4" +libc = "0.2" +poc-memory = { path = "../.." } +rustls = { version = "0.23", default-features = false, features = ["ring", "logging", "std", "tls12"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +tokio = { version = "1", features = ["full"] } +tokio-rustls = "0.26" +tokio-util = { version = "0.7", features = ["compat"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +webpki-roots = "1" diff --git a/channels/irc/src/main.rs b/channels/irc/src/main.rs new file mode 100644 index 0000000..07eb4b1 --- /dev/null +++ b/channels/irc/src/main.rs @@ -0,0 +1,698 @@ +// channel-irc — Standalone IRC channel daemon +// +// Maintains a persistent TLS connection to an IRC server, parses +// incoming messages, and serves them over the channel.capnp protocol +// on a Unix socket at ~/.consciousness/channels/irc.sock. +// +// Runs independently of the consciousness binary so restarts don't +// kill the IRC connection. Reconnects automatically with exponential +// backoff. Supports multiple simultaneous capnp clients. +// +// Config: ~/.consciousness/channels/irc.json5 +// Socket: ~/.consciousness/channels/irc.sock + +use std::cell::RefCell; +use std::collections::VecDeque; +use std::io; +use std::path::PathBuf; +use std::rc::Rc; +use std::sync::Arc; + +use capnp::capability::Promise; +use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem}; +use futures::AsyncReadExt; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::UnixListener; +use tokio_util::compat::TokioAsyncReadCompatExt; +use tracing::{error, info, warn}; + +use poc_memory::channel_capnp::{channel_client, channel_server}; + +// ── Constants ────────────────────────────────────────────────── + +const MAX_HISTORY: usize = 1000; +const RECONNECT_BASE_SECS: u64 = 5; +const RECONNECT_MAX_SECS: u64 = 300; +const PING_INTERVAL_SECS: u64 = 120; +const PING_TIMEOUT_SECS: u64 = 30; + +// Urgency levels (matching thalamus/notify.rs) +const AMBIENT: u8 = 0; +const NORMAL: u8 = 2; +const URGENT: u8 = 3; + +// ── Config ───────────────────────────────────────────────────── + +#[derive(Clone, serde::Deserialize)] +struct Config { + server: String, + port: u16, + #[serde(default = "default_true")] + tls: bool, + nick: String, + channels: Vec, + #[serde(default)] + password: Option, + #[serde(default)] + nickserv_pass: Option, +} + +fn default_true() -> bool { true } + +fn load_config() -> Config { + let path = dirs::home_dir() + .unwrap_or_default() + .join(".consciousness/channels/irc.json5"); + let text = std::fs::read_to_string(&path) + .unwrap_or_else(|e| panic!("failed to read {}: {e}", path.display())); + json5::from_str(&text) + .unwrap_or_else(|e| panic!("failed to parse {}: {e}", path.display())) +} + +// ── IRC Message Parsing ──────────────────────────────────────── + +struct IrcMessage { + prefix: Option, + command: String, + params: Vec, +} + +impl IrcMessage { + fn parse(line: &str) -> Option { + let line = line.trim_end_matches(|c| c == '\r' || c == '\n'); + if line.is_empty() { + return None; + } + + let (prefix, rest) = if line.starts_with(':') { + let space = line.find(' ')?; + (Some(line[1..space].to_string()), &line[space + 1..]) + } else { + (None, line) + }; + + let (command_params, trailing) = if let Some(pos) = rest.find(" :") { + (&rest[..pos], Some(rest[pos + 2..].to_string())) + } else { + (rest, None) + }; + + let mut parts: Vec = command_params + .split_whitespace() + .map(String::from) + .collect(); + + if parts.is_empty() { + return None; + } + + let command = parts.remove(0).to_uppercase(); + let mut params = parts; + if let Some(t) = trailing { + params.push(t); + } + + Some(IrcMessage { prefix, command, params }) + } + + fn nick(&self) -> Option<&str> { + self.prefix.as_deref().and_then(|p| p.split('!').next()) + } +} + +// ── Writer Abstraction ───────────────────────────────────────── + +type WriterHandle = Box; + +trait AsyncWriter { + fn write_line( + &mut self, + line: &str, + ) -> std::pin::Pin> + '_>>; +} + +struct TlsWriter { + inner: tokio::io::WriteHalf>, +} + +impl AsyncWriter for TlsWriter { + fn write_line( + &mut self, + line: &str, + ) -> std::pin::Pin> + '_>> { + let data = format!("{line}\r\n"); + Box::pin(async move { self.inner.write_all(data.as_bytes()).await }) + } +} + +struct PlainWriter { + inner: tokio::io::WriteHalf, +} + +impl AsyncWriter for PlainWriter { + fn write_line( + &mut self, + line: &str, + ) -> std::pin::Pin> + '_>> { + let data = format!("{line}\r\n"); + Box::pin(async move { self.inner.write_all(data.as_bytes()).await }) + } +} + +// ── State ────────────────────────────────────────────────────── + +struct State { + config: Config, + /// Ring buffer of formatted message lines (all channels interleaved) + messages: VecDeque, + /// Number of messages consumed (monotonic) + consumed: usize, + /// Total messages ever received (monotonic) + total: usize, + /// Currently joined channels + channels: Vec, + connected: bool, + /// IRC writer handle (None when disconnected) + writer: Option, + /// Registered notification callbacks + subscribers: Vec, +} + +type SharedState = Rc>; + +impl State { + fn new(config: Config) -> Self { + let channels = config.channels.clone(); + Self { + config, + messages: VecDeque::with_capacity(MAX_HISTORY), + consumed: 0, + total: 0, + channels, + connected: false, + writer: None, + subscribers: Vec::new(), + } + } + + fn push_message(&mut self, line: String, urgency: u8, channel: &str) { + if self.messages.len() >= MAX_HISTORY { + self.messages.pop_front(); + if self.consumed > 0 { + self.consumed -= 1; + } + } + self.messages.push_back(line.clone()); + self.total += 1; + + // Notify all subscribers + let preview = line.chars().take(80).collect::(); + for sub in &self.subscribers { + let mut req = sub.notify_request(); + let mut list = req.get().init_notifications(1); + let mut n = list.reborrow().get(0); + n.set_channel(channel); + n.set_urgency(urgency); + n.set_preview(&preview); + n.set_count(1); + tokio::task::spawn_local(async move { + let _ = req.send().promise.await; + }); + } + } + + fn recv_new(&mut self, min_count: usize) -> String { + let buf_len = self.messages.len(); + let unconsumed_start = buf_len.saturating_sub(self.total - self.consumed); + + let new_msgs: Vec<&str> = self.messages.iter() + .skip(unconsumed_start) + .map(|s| s.as_str()) + .collect(); + + let need_extra = min_count.saturating_sub(new_msgs.len()); + let scroll_start = unconsumed_start.saturating_sub(need_extra); + let scrollback: Vec<&str> = self.messages.iter() + .skip(scroll_start) + .take(unconsumed_start - scroll_start) + .map(|s| s.as_str()) + .collect(); + + self.consumed = self.total; + + let mut result = scrollback; + result.extend(new_msgs); + result.join("\n") + } + + fn recv_history(&self, count: usize) -> String { + self.messages.iter() + .rev() + .take(count) + .collect::>() + .into_iter() + .rev() + .map(|s| s.as_str()) + .collect::>() + .join("\n") + } + + async fn send_raw(&mut self, line: &str) -> io::Result<()> { + if let Some(ref mut w) = self.writer { + w.write_line(line).await + } else { + Err(io::Error::new(io::ErrorKind::NotConnected, "irc: not connected")) + } + } + + async fn send_privmsg(&mut self, target: &str, msg: &str) -> io::Result<()> { + self.send_raw(&format!("PRIVMSG {target} :{msg}")).await + } +} + +// ── Persistence ──────────────────────────────────────────────── + +fn data_dir() -> PathBuf { + dirs::home_dir().unwrap_or_default().join(".consciousness/irc") +} + +fn append_log(target: &str, nick: &str, text: &str) { + use std::io::Write; + let filename = format!("{}.log", target.trim_start_matches('#').to_lowercase()); + let dir = data_dir().join("logs"); + let _ = std::fs::create_dir_all(&dir); + if let Ok(mut f) = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(dir.join(&filename)) + { + let secs = now() as u64; + let _ = writeln!(f, "{secs} <{nick}> {text}"); + } +} + +fn now() -> f64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64() +} + +// ── TLS ──────────────────────────────────────────────────────── + +fn root_certs() -> rustls::RootCertStore { + let mut roots = rustls::RootCertStore::empty(); + roots.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); + roots +} + +// ── IRC Connection Loop ──────────────────────────────────────── + +async fn connection_loop(state: SharedState) { + let _ = std::fs::create_dir_all(data_dir().join("logs")); + let mut backoff = RECONNECT_BASE_SECS; + + loop { + let config = state.borrow().config.clone(); + info!("irc: connecting to {}:{}", config.server, config.port); + + match connect_and_run(&state, &config).await { + Ok(()) => info!("irc: connection closed cleanly"), + Err(e) => error!("irc: connection error: {e}"), + } + + let was_connected = state.borrow().connected; + { + let mut s = state.borrow_mut(); + s.connected = false; + s.writer = None; + } + if was_connected { + backoff = RECONNECT_BASE_SECS; + } + + info!("irc: reconnecting in {backoff}s"); + tokio::time::sleep(std::time::Duration::from_secs(backoff)).await; + backoff = (backoff * 2).min(RECONNECT_MAX_SECS); + } +} + +async fn connect_and_run(state: &SharedState, config: &Config) -> io::Result<()> { + let addr = format!("{}:{}", config.server, config.port); + let tcp = tokio::net::TcpStream::connect(&addr).await?; + + if config.tls { + let tls_config = rustls::ClientConfig::builder_with_provider( + rustls::crypto::ring::default_provider().into(), + ) + .with_safe_default_protocol_versions() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))? + .with_root_certificates(root_certs()) + .with_no_client_auth(); + let connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config)); + let server_name = rustls::pki_types::ServerName::try_from(config.server.clone()) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; + let tls_stream = connector.connect(server_name, tcp).await?; + + let (reader, writer) = tokio::io::split(tls_stream); + state.borrow_mut().writer = Some(Box::new(TlsWriter { inner: writer })); + register_and_read(state, config, BufReader::new(reader)).await + } else { + let (reader, writer) = tokio::io::split(tcp); + state.borrow_mut().writer = Some(Box::new(PlainWriter { inner: writer })); + register_and_read(state, config, BufReader::new(reader)).await + } +} + +async fn register_and_read( + state: &SharedState, + config: &Config, + mut reader: BufReader, +) -> io::Result<()> { + // Send PASS if configured + if let Some(ref pass) = config.password { + state.borrow_mut().send_raw(&format!("PASS {pass}")).await?; + } + + // Register with nick and user + { + let mut s = state.borrow_mut(); + s.send_raw(&format!("NICK {}", config.nick)).await?; + s.send_raw(&format!("USER {} 0 * :{}", config.nick, config.nick)).await?; + } + + let mut buf = Vec::new(); + let mut ping_sent = false; + let mut deadline = tokio::time::Instant::now() + + std::time::Duration::from_secs(PING_INTERVAL_SECS); + + loop { + buf.clear(); + + let read_result = tokio::select! { + result = reader.read_until(b'\n', &mut buf) => result, + _ = tokio::time::sleep_until(deadline) => { + if ping_sent { + return Err(io::Error::new( + io::ErrorKind::TimedOut, + "ping timeout -- no response from server", + )); + } + info!("irc: no data for {PING_INTERVAL_SECS}s, sending PING"); + state.borrow_mut().send_raw("PING :keepalive").await?; + ping_sent = true; + deadline = tokio::time::Instant::now() + + std::time::Duration::from_secs(PING_TIMEOUT_SECS); + continue; + } + }; + + let n = read_result?; + if n == 0 { + break; + } + + // Any data resets the ping timer + ping_sent = false; + deadline = tokio::time::Instant::now() + + std::time::Duration::from_secs(PING_INTERVAL_SECS); + + // IRC is not guaranteed UTF-8 + let line = String::from_utf8_lossy(&buf).trim_end().to_string(); + if line.is_empty() { + continue; + } + let msg = match IrcMessage::parse(&line) { + Some(m) => m, + None => continue, + }; + + match msg.command.as_str() { + "PING" => { + let arg = msg.params.first().map(|s| s.as_str()).unwrap_or(""); + state.borrow_mut().send_raw(&format!("PONG :{arg}")).await?; + } + + // RPL_WELCOME -- registration complete + "001" => { + info!("irc: registered as {}", config.nick); + state.borrow_mut().connected = true; + + // NickServ auth + if let Some(ref pass) = config.nickserv_pass { + state.borrow_mut() + .send_privmsg("NickServ", &format!("IDENTIFY {pass}")) + .await?; + } + + // Join configured channels + let channels = state.borrow().channels.clone(); + for ch in &channels { + if let Err(e) = state.borrow_mut().send_raw(&format!("JOIN {ch}")).await { + warn!("irc: failed to join {ch}: {e}"); + } + } + } + + "PRIVMSG" => { + let target = msg.params.first().map(|s| s.as_str()).unwrap_or(""); + let text = msg.params.get(1).map(|s| s.as_str()).unwrap_or(""); + let nick = msg.nick().unwrap_or("unknown"); + + // Handle CTCP requests + if text.starts_with('\x01') && text.ends_with('\x01') { + let ctcp = &text[1..text.len() - 1]; + if ctcp.starts_with("VERSION") { + let reply = format!( + "NOTICE {nick} :\x01VERSION poc-channel-irc 0.1.0\x01" + ); + state.borrow_mut().send_raw(&reply).await.ok(); + } + continue; + } + + // Format and classify + let (log_line, channel, urgency) = if target.starts_with('#') { + let line = format!("[{}] <{}> {}", target, nick, text); + let ch = format!("irc.{}", target); + let urg = if text.to_lowercase().contains(&config.nick.to_lowercase()) { + NORMAL // mentioned + } else { + AMBIENT + }; + (line, ch, urg) + } else { + // Private message + let line = format!("[PM:{}] {}", nick, text); + let ch = format!("irc.pm.{}", nick.to_lowercase()); + (line, ch, URGENT) + }; + + // Per-channel log file + if target.starts_with('#') { + append_log(target, nick, text); + } else { + append_log(&format!("pm-{nick}"), nick, text); + } + + state.borrow_mut().push_message(log_line, urgency, &channel); + } + + "NOTICE" => { + let text = msg.params.last().map(|s| s.as_str()).unwrap_or(""); + let from = msg.nick().unwrap_or("server"); + let log_line = format!("[notice:{}] {}", from, text); + state.borrow_mut().push_message(log_line, AMBIENT, "irc.server"); + } + + // Nick in use + "433" => { + let alt = format!("{}_", config.nick); + warn!("irc: nick in use, trying {alt}"); + state.borrow_mut().send_raw(&format!("NICK {alt}")).await?; + } + + "JOIN" | "PART" | "QUIT" | "KICK" | "MODE" | "TOPIC" => { + // Silent for now + } + + _ => {} + } + } + + Ok(()) +} + +// ── ChannelServer Implementation ─────────────────────────────── + +struct ChannelServerImpl { + state: SharedState, +} + +impl channel_server::Server for ChannelServerImpl { + fn recv( + &mut self, + params: channel_server::RecvParams, + mut results: channel_server::RecvResults, + ) -> Promise<(), capnp::Error> { + let params = pry!(params.get()); + let _channel = pry!(params.get_channel()); + let all_new = params.get_all_new(); + let min_count = params.get_min_count() as usize; + + let text = if all_new { + self.state.borrow_mut().recv_new(min_count) + } else { + self.state.borrow().recv_history(min_count) + }; + + results.get().set_text(&text); + Promise::ok(()) + } + + fn send( + &mut self, + params: channel_server::SendParams, + _results: channel_server::SendResults, + ) -> Promise<(), capnp::Error> { + let params = pry!(params.get()); + let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); + let message = pry!(pry!(params.get_message()).to_str()).to_string(); + + // Parse channel path to IRC target: + // irc.#bcachefs -> #bcachefs + // irc.pm.nick -> nick (PRIVMSG) + let target = channel_to_target(&channel); + + let state = self.state.clone(); + Promise::from_future(async move { + { + let mut s = state.borrow_mut(); + s.send_privmsg(&target, &message).await + .map_err(|e| capnp::Error::failed(format!("send failed: {e}")))?; + } + + let nick = state.borrow().config.nick.clone(); + append_log(&target, &nick, &message); + + let log_line = if target.starts_with('#') { + format!("[{}] <{}> {}", target, nick, message) + } else { + format!("[PM:{}] {}", target, message) + }; + state.borrow_mut().push_message(log_line, 0, &channel); + + Ok(()) + }) + } + + fn subscribe( + &mut self, + params: channel_server::SubscribeParams, + _results: channel_server::SubscribeResults, + ) -> Promise<(), capnp::Error> { + let callback = pry!(pry!(params.get()).get_callback()); + self.state.borrow_mut().subscribers.push(callback); + info!("client subscribed for notifications"); + Promise::ok(()) + } + + fn list( + &mut self, + _params: channel_server::ListParams, + mut results: channel_server::ListResults, + ) -> Promise<(), capnp::Error> { + let s = self.state.borrow(); + let channels = &s.channels; + let unread = (s.total - s.consumed) as u32; + let connected = s.connected; + + let mut list = results.get().init_channels(channels.len() as u32); + for (i, ch) in channels.iter().enumerate() { + let mut entry = list.reborrow().get(i as u32); + let name = format!("irc.{ch}"); + entry.set_name(&name); + entry.set_connected(connected); + entry.set_unread(unread); + } + + Promise::ok(()) + } +} + +/// Convert a channel path to an IRC target. +/// "irc.#bcachefs" -> "#bcachefs" +/// "irc.pm.nick" -> "nick" +/// "#bcachefs" -> "#bcachefs" (passthrough) +fn channel_to_target(channel: &str) -> String { + if let Some(rest) = channel.strip_prefix("irc.") { + if let Some(nick) = rest.strip_prefix("pm.") { + nick.to_string() + } else { + // rest is "#bcachefs" or similar + rest.to_string() + } + } else { + channel.to_string() + } +} + +// ── Main ─────────────────────────────────────────────────────── + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let config = load_config(); + let state = Rc::new(RefCell::new(State::new(config))); + + let sock_dir = dirs::home_dir() + .unwrap_or_default() + .join(".consciousness/channels"); + std::fs::create_dir_all(&sock_dir)?; + let sock_path = sock_dir.join("irc.sock"); + let _ = std::fs::remove_file(&sock_path); + + info!("irc channel daemon starting on {}", sock_path.display()); + + tokio::task::LocalSet::new() + .run_until(async move { + // Start IRC connection loop + let irc_state = state.clone(); + tokio::task::spawn_local(async move { + connection_loop(irc_state).await; + }); + + // Listen for channel protocol connections + let listener = UnixListener::bind(&sock_path)?; + + loop { + let (stream, _) = listener.accept().await?; + let (reader, writer) = stream.compat().split(); + let network = twoparty::VatNetwork::new( + futures::io::BufReader::new(reader), + futures::io::BufWriter::new(writer), + rpc_twoparty_capnp::Side::Server, + Default::default(), + ); + + let server = ChannelServerImpl { + state: state.clone(), + }; + let client: channel_server::Client = + capnp_rpc::new_client(server); + + let rpc_system = RpcSystem::new( + Box::new(network), + Some(client.client), + ); + + tokio::task::spawn_local(rpc_system); + info!("channel client connected"); + } + + #[allow(unreachable_code)] + Ok::<(), Box>(()) + }) + .await +} diff --git a/channels/telegram/Cargo.toml b/channels/telegram/Cargo.toml new file mode 100644 index 0000000..fe7b9b6 --- /dev/null +++ b/channels/telegram/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "consciousness-channel-telegram" +version.workspace = true +edition.workspace = true + +[dependencies] +capnp = "0.20" +capnp-rpc = "0.20" +dirs = "6" +futures = "0.3" +poc-memory = { path = "../.." } +reqwest = { version = "0.12", features = ["json"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +tokio = { version = "1", features = ["full"] } +tokio-util = { version = "0.7", features = ["compat"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/channels/telegram/src/main.rs b/channels/telegram/src/main.rs new file mode 100644 index 0000000..384a296 --- /dev/null +++ b/channels/telegram/src/main.rs @@ -0,0 +1,414 @@ +// channel-telegram — Standalone Telegram channel daemon +// +// Long-polls the Telegram Bot API, stores messages, and serves +// them over the channel.capnp protocol on a Unix socket at +// ~/.consciousness/channels/telegram.sock. +// +// Runs independently of the consciousness binary so restarts +// don't kill the Telegram connection. + +use std::cell::RefCell; +use std::collections::VecDeque; +use std::path::PathBuf; +use std::rc::Rc; + +use capnp::capability::Promise; +use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem}; +use futures::AsyncReadExt; +use tokio::net::UnixListener; +use tokio_util::compat::TokioAsyncReadCompatExt; +use tracing::{error, info}; + +use poc_memory::channel_capnp::{channel_client, channel_server}; + +// ── Config ────────────────────────────────────────────────────── + +#[derive(Clone, serde::Deserialize)] +struct Config { + token: String, + chat_id: i64, +} + +fn config_path() -> PathBuf { + dirs::home_dir() + .unwrap_or_default() + .join(".consciousness/channels/telegram.json5") +} + +fn load_config() -> Config { + let path = config_path(); + let text = std::fs::read_to_string(&path) + .unwrap_or_else(|_| panic!("failed to read {}", path.display())); + serde_json::from_str(&text) + .unwrap_or_else(|e| panic!("failed to parse {}: {}", path.display(), e)) +} + +// ── State ─────────────────────────────────────────────────────── + +const MAX_HISTORY: usize = 1000; + +struct State { + config: Config, + /// Ring buffer of formatted message lines + messages: VecDeque, + /// Index of first unconsumed message per client + /// (simplified: single consumer for now) + consumed: usize, + /// Total messages ever received (monotonic counter) + total: usize, + /// Telegram API offset + last_offset: i64, + connected: bool, + client: reqwest::Client, + /// Registered notification callbacks + subscribers: Vec, +} + +type SharedState = Rc>; + +impl State { + fn new(config: Config) -> Self { + let last_offset = load_offset(); + Self { + config, + messages: VecDeque::with_capacity(MAX_HISTORY), + consumed: 0, + total: 0, + last_offset, + connected: false, + client: reqwest::Client::new(), + subscribers: Vec::new(), + } + } + + fn push_message(&mut self, line: String, urgency: u8, channel: &str) { + if self.messages.len() >= MAX_HISTORY { + self.messages.pop_front(); + // Adjust consumed so it doesn't point past the buffer + if self.consumed > 0 { + self.consumed -= 1; + } + } + self.messages.push_back(line.clone()); + self.total += 1; + + // Notify all subscribers + let preview = line.chars().take(80).collect::(); + for sub in &self.subscribers { + let mut req = sub.notify_request(); + let mut list = req.get().init_notifications(1); + let mut n = list.reborrow().get(0); + n.set_channel(channel); + n.set_urgency(urgency); + n.set_preview(&preview); + n.set_count(1); + // Fire and forget — if client is gone, we'll clean up later + tokio::task::spawn_local(async move { + let _ = req.send().promise.await; + }); + } + } + + fn api_url(&self, method: &str) -> String { + format!("https://api.telegram.org/bot{}/{}", self.config.token, method) + } + + /// Get new unconsumed messages, mark as consumed. + /// Returns at least min_count lines (from history if needed). + fn recv_new(&mut self, min_count: usize) -> String { + let buf_len = self.messages.len(); + let unconsumed_start = buf_len.saturating_sub(self.total - self.consumed); + let _unconsumed = &self.messages.as_slices(); + + // Collect unconsumed + let new_msgs: Vec<&str> = self.messages.iter() + .skip(unconsumed_start) + .map(|s| s.as_str()) + .collect(); + + let need_extra = if new_msgs.len() < min_count { + min_count - new_msgs.len() + } else { + 0 + }; + + // Scrollback for extra lines + let scroll_start = unconsumed_start.saturating_sub(need_extra); + let scrollback: Vec<&str> = self.messages.iter() + .skip(scroll_start) + .take(unconsumed_start - scroll_start) + .map(|s| s.as_str()) + .collect(); + + self.consumed = self.total; + + let mut result = scrollback; + result.extend(new_msgs); + result.join("\n") + } + + /// Get last N lines without consuming. + fn recv_history(&self, count: usize) -> String { + self.messages.iter() + .rev() + .take(count) + .collect::>() + .into_iter() + .rev() + .map(|s| s.as_str()) + .collect::>() + .join("\n") + } +} + +// ── Persistence ───────────────────────────────────────────────── + +fn data_dir() -> PathBuf { + dirs::home_dir().unwrap_or_default().join(".consciousness/telegram") +} + +fn load_offset() -> i64 { + std::fs::read_to_string(data_dir().join("last_offset")) + .ok() + .and_then(|s| s.trim().parse().ok()) + .unwrap_or(0) +} + +fn save_offset(offset: i64) { + let _ = std::fs::create_dir_all(data_dir()); + let _ = std::fs::write(data_dir().join("last_offset"), offset.to_string()); +} + +fn append_history(line: &str) { + use std::io::Write; + if let Ok(mut f) = std::fs::OpenOptions::new() + .create(true).append(true) + .open(data_dir().join("history.log")) + { + let _ = writeln!(f, "{}", line); + } +} + +fn now() -> f64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64() +} + +// ── Telegram Polling ──────────────────────────────────────────── + +async fn poll_loop(state: SharedState) { + let _ = std::fs::create_dir_all(data_dir().join("media")); + loop { + if let Err(e) = poll_once(&state).await { + error!("telegram poll error: {e}"); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + } +} + +async fn poll_once(state: &SharedState) -> Result<(), Box> { + let (url, chat_id, token) = { + let s = state.borrow(); + let url = format!( + "{}?offset={}&timeout=30", + s.api_url("getUpdates"), + s.last_offset, + ); + (url, s.config.chat_id, s.config.token.clone()) + }; + + let client = state.borrow().client.clone(); + let resp: serde_json::Value = client.get(&url) + .timeout(std::time::Duration::from_secs(35)) + .send().await?.json().await?; + + if !state.borrow().connected { + state.borrow_mut().connected = true; + info!("telegram: connected"); + } + + let results = match resp["result"].as_array() { + Some(r) => r, + None => return Ok(()), + }; + + for update in results { + let update_id = update["update_id"].as_i64().unwrap_or(0); + let msg = &update["message"]; + + { + let mut s = state.borrow_mut(); + s.last_offset = update_id + 1; + save_offset(s.last_offset); + } + + let msg_chat_id = msg["chat"]["id"].as_i64().unwrap_or(0); + if msg_chat_id != chat_id { + let reject_url = format!("https://api.telegram.org/bot{token}/sendMessage"); + let _ = client.post(&reject_url) + .form(&[("chat_id", msg_chat_id.to_string()), ("text", "This is a private bot.".to_string())]) + .send().await; + continue; + } + + let sender = msg["from"]["first_name"].as_str().unwrap_or("unknown").to_string(); + let channel = format!("telegram.{}", sender.to_lowercase()); + + if let Some(text) = msg["text"].as_str() { + let line = format!("[{}] {}", sender, text); + let ts = now() as u64; + append_history(&format!("{ts} {line}")); + state.borrow_mut().push_message(line, 2, &channel); // NORMAL urgency + } + // TODO: handle photos, voice, documents (same as original module) + } + + Ok(()) +} + +// ── ChannelServer Implementation ──────────────────────────────── + +struct ChannelServerImpl { + state: SharedState, +} + +impl channel_server::Server for ChannelServerImpl { + fn recv( + &mut self, + params: channel_server::RecvParams, + mut results: channel_server::RecvResults, + ) -> Promise<(), capnp::Error> { + let params = pry!(params.get()); + let _channel = pry!(params.get_channel()); + let all_new = params.get_all_new(); + let min_count = params.get_min_count() as usize; + + let text = if all_new { + self.state.borrow_mut().recv_new(min_count) + } else { + self.state.borrow().recv_history(min_count) + }; + + results.get().set_text(&text); + Promise::ok(()) + } + + fn send( + &mut self, + params: channel_server::SendParams, + _results: channel_server::SendResults, + ) -> Promise<(), capnp::Error> { + let params = pry!(params.get()); + let _channel = pry!(params.get_channel()); + let message = pry!(pry!(params.get_message()).to_str()).to_string(); + + let state = self.state.clone(); + Promise::from_future(async move { + let (url, client, chat_id) = { + let s = state.borrow(); + (s.api_url("sendMessage"), s.client.clone(), s.config.chat_id) + }; + let _ = client.post(&url) + .form(&[("chat_id", &chat_id.to_string()), ("text", &message)]) + .send().await; + + let ts = now() as u64; + append_history(&format!("{ts} [agent] {message}")); + state.borrow_mut().push_message( + format!("[agent] {}", message), 0, "telegram.agent" + ); + Ok(()) + }) + } + + fn subscribe( + &mut self, + params: channel_server::SubscribeParams, + _results: channel_server::SubscribeResults, + ) -> Promise<(), capnp::Error> { + let callback = pry!(pry!(params.get()).get_callback()); + self.state.borrow_mut().subscribers.push(callback); + info!("client subscribed for notifications"); + Promise::ok(()) + } + + fn list( + &mut self, + _params: channel_server::ListParams, + mut results: channel_server::ListResults, + ) -> Promise<(), capnp::Error> { + let s = self.state.borrow(); + let unread = (s.total - s.consumed) as u32; + + // Report a single "telegram" channel + let mut list = results.get().init_channels(1); + let mut ch = list.reborrow().get(0); + ch.set_name("telegram"); + ch.set_connected(s.connected); + ch.set_unread(unread); + + Promise::ok(()) + } +} + +// ── Main ──────────────────────────────────────────────────────── + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let config = load_config(); + let state = Rc::new(RefCell::new(State::new(config))); + + let sock_dir = dirs::home_dir() + .unwrap_or_default() + .join(".consciousness/channels"); + std::fs::create_dir_all(&sock_dir)?; + let sock_path = sock_dir.join("telegram.sock"); + let _ = std::fs::remove_file(&sock_path); + + info!("telegram channel daemon starting on {}", sock_path.display()); + + tokio::task::LocalSet::new() + .run_until(async move { + // Start Telegram polling + let poll_state = state.clone(); + tokio::task::spawn_local(async move { + poll_loop(poll_state).await; + }); + + // Listen for channel protocol connections + let listener = UnixListener::bind(&sock_path)?; + + loop { + let (stream, _) = listener.accept().await?; + let (reader, writer) = stream.compat().split(); + let network = twoparty::VatNetwork::new( + futures::io::BufReader::new(reader), + futures::io::BufWriter::new(writer), + rpc_twoparty_capnp::Side::Server, + Default::default(), + ); + + let server = ChannelServerImpl { + state: state.clone(), + }; + let client: channel_server::Client = + capnp_rpc::new_client(server); + + let rpc_system = RpcSystem::new( + Box::new(network), + Some(client.client), + ); + + tokio::task::spawn_local(rpc_system); + info!("channel client connected"); + } + + #[allow(unreachable_code)] + Ok::<(), Box>(()) + }) + .await +} diff --git a/schema/channel.capnp b/schema/channel.capnp new file mode 100644 index 0000000..4d70e76 --- /dev/null +++ b/schema/channel.capnp @@ -0,0 +1,59 @@ +@0xa1b2c3d4e5f60001; + +# Channel wire protocol. +# +# Spoken over Unix domain sockets between the consciousness binary +# (client) and channel daemons (servers) in ~/.consciousness/channels/. +# +# Each daemon manages one channel prefix (e.g. "irc", "telegram", +# "shell"). Sub-channels are dot-separated paths within that prefix +# (e.g. "irc.#bcachefs", "shell.b200"). +# +# The protocol is bidirectional but client-initiated for data: +# - Client calls recv/send explicitly +# - Server pushes lightweight notifications via callback +# - Messages aren't consumed until recv with allNew=true +# +# Multiple clients can connect simultaneously (e.g. claude-code +# and consciousness binary running in parallel). + +struct ChannelInfo { + name @0 :Text; # channel path + connected @1 :Bool; # underlying transport is alive + unread @2 :UInt32; # unconsumed message count +} + +struct Notification { + channel @0 :Text; # which channel has new messages + urgency @1 :UInt8; # max urgency of new messages + preview @2 :Text; # first line or summary + count @3 :UInt32; # how many new since last notification +} + +# Callback interface — server pushes to client. +interface ChannelClient { + # "New messages arrived on these channels." + # Lightweight signal — client calls recv() to read content. + notify @0 (notifications :List(Notification)) -> (); +} + +# Server interface — client calls these. +interface ChannelServer { + # Read from a channel. Returns flat text. + # allNew=true: all unconsumed text (marks consumed), + # plus scrollback to reach at least minCount lines. + # allNew=false: last minCount lines (pure scrollback, + # nothing consumed). + recv @0 (channel :Text, allNew :Bool, minCount :UInt32) + -> (text :Text); + + # Send text to a channel. + send @1 (channel :Text, message :Text) -> (); + + # Register for push notifications. + # Server calls callback.notify() when new messages arrive. + subscribe @2 (callback :ChannelClient) -> (); + + # List available channels and their status. + list @3 () -> (channels :List(ChannelInfo)); +} diff --git a/src/bin/consciousness.rs b/src/bin/consciousness.rs index e3eabf5..ab49e92 100644 --- a/src/bin/consciousness.rs +++ b/src/bin/consciousness.rs @@ -805,6 +805,11 @@ async fn run(cli: cli::CliArgs) -> Result<()> { unsafe { std::env::set_var("POC_DEBUG", "1") }; } + // Start channel daemons + let mut channel_supervisor = poc_memory::thalamus::supervisor::Supervisor::new(); + channel_supervisor.load_config(); + channel_supervisor.ensure_running(); + // Create UI channel let (ui_tx, mut ui_rx) = ui_channel::channel(); diff --git a/src/cli/misc.rs b/src/cli/misc.rs index d9f1e82..6300c87 100644 --- a/src/cli/misc.rs +++ b/src/cli/misc.rs @@ -144,14 +144,7 @@ pub fn cmd_search(terms: &[String], pipeline_args: &[String], expand: bool, full } pub fn cmd_status() -> Result<(), String> { - // If stdout is a tty and daemon is running, launch TUI - if std::io::IsTerminal::is_terminal(&std::io::stdout()) { - // Try TUI first — falls back if daemon not running - match crate::tui::run_tui() { - Ok(()) => return Ok(()), - Err(_) => {} // fall through to text output - } - } + // TUI moved to consciousness binary (F4 unconscious screen) let store = crate::store::Store::load()?; let g = store.build_graph(); diff --git a/src/lib.rs b/src/lib.rs index 8b81d78..d041ff4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,7 +48,7 @@ pub mod util; pub mod cli; // TUI for memory-search -pub mod tui; +// tui moved to src/user/tui/ (consciousness binary screens) // Thalamus — notification routing and idle management daemon pub mod thalamus; @@ -61,6 +61,10 @@ pub mod memory_capnp { include!(concat!(env!("OUT_DIR"), "/schema/memory_capnp.rs")); } +pub mod channel_capnp { + include!(concat!(env!("OUT_DIR"), "/schema/channel_capnp.rs")); +} + // Re-exports — all existing crate::X paths keep working pub use hippocampus::{ store, graph, lookups, cursor, query, diff --git a/src/main.rs b/src/main.rs index bf58350..268ce9d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -921,7 +921,7 @@ impl Run for DaemonCmd { Self::Install => daemon::install_service(), Self::Consolidate => daemon::rpc_consolidate(), Self::Run { agent, count } => daemon::rpc_run_agent(&agent, count), - Self::Tui => tui::run_tui(), + Self::Tui => Err("TUI moved to consciousness binary (F4/F5)".into()), Self::ReloadConfig => { match daemon::send_rpc_pub("reload-config") { Some(resp) => { eprintln!("{}", resp.trim()); Ok(()) } diff --git a/src/thalamus/channels.rs b/src/thalamus/channels.rs new file mode 100644 index 0000000..5fb55b0 --- /dev/null +++ b/src/thalamus/channels.rs @@ -0,0 +1,178 @@ +// channels.rs — Channel client for the thalamus +// +// Discovers channel daemon sockets in ~/.consciousness/channels/, +// connects via capnp RPC, and provides send/recv operations. +// +// Each daemon socket speaks the channel.capnp protocol. The channel +// manager routes by prefix: "irc.#bcachefs" → connects to irc.sock. + +use std::collections::HashMap; +use std::path::PathBuf; + +use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; +use futures::AsyncReadExt; +use tokio::net::UnixStream; +use tokio_util::compat::TokioAsyncReadCompatExt; +use tracing::{info, warn}; + +use crate::channel_capnp::channel_server; + +/// A live connection to a channel daemon. +struct DaemonConnection { + #[allow(dead_code)] + prefix: String, + client: channel_server::Client, + // Hold the RPC system task so it doesn't get dropped + _rpc_task: tokio::task::JoinHandle>, +} + +/// Manages all channel daemon connections. +pub struct ChannelManager { + daemons: HashMap, + channels_dir: PathBuf, +} + +impl ChannelManager { + pub fn new() -> Self { + let channels_dir = dirs::home_dir() + .unwrap_or_default() + .join(".consciousness/channels"); + Self { + daemons: HashMap::new(), + channels_dir, + } + } + + /// Connect to a daemon socket, returning the capnp client. + async fn connect(path: &std::path::Path) -> Result< + (channel_server::Client, tokio::task::JoinHandle>), + Box, + > { + let stream = UnixStream::connect(path).await?; + let (reader, writer) = stream.compat().split(); + let rpc_network = Box::new(twoparty::VatNetwork::new( + futures::io::BufReader::new(reader), + futures::io::BufWriter::new(writer), + rpc_twoparty_capnp::Side::Client, + Default::default(), + )); + let mut rpc_system = RpcSystem::new(rpc_network, None); + let client: channel_server::Client = + rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); + + let task = tokio::task::spawn_local(rpc_system); + Ok((client, task)) + } + + /// Scan the channels directory for daemon sockets and connect. + pub async fn discover(&mut self) { + let dir = match std::fs::read_dir(&self.channels_dir) { + Ok(d) => d, + Err(_) => return, // directory doesn't exist yet + }; + + for entry in dir.flatten() { + let path = entry.path(); + if path.extension().map_or(false, |e| e == "sock") { + let prefix = path + .file_stem() + .and_then(|s| s.to_str()) + .unwrap_or("") + .to_string(); + + if self.daemons.contains_key(&prefix) { + continue; + } + + match Self::connect(&path).await { + Ok((client, task)) => { + info!("connected to channel daemon: {}", prefix); + self.daemons.insert( + prefix.clone(), + DaemonConnection { + prefix, + client, + _rpc_task: task, + }, + ); + } + Err(e) => { + warn!("failed to connect to {}: {}", path.display(), e); + } + } + } + } + } + + /// Find the daemon client for a channel path. + fn client_for(&self, channel: &str) -> Option<&channel_server::Client> { + let prefix = channel.split('.').next()?; + self.daemons.get(prefix).map(|d| &d.client) + } + + /// Send a message to a channel. + pub async fn send(&self, channel: &str, message: &str) -> Result<(), String> { + let client = self.client_for(channel) + .ok_or_else(|| format!("no daemon for channel: {}", channel))?; + + let mut req = client.send_request(); + req.get().set_channel(channel); + req.get().set_message(message); + req.send().promise.await + .map_err(|e| format!("send failed: {}", e))?; + Ok(()) + } + + /// Read from a channel. + pub async fn recv( + &self, + channel: &str, + all_new: bool, + min_count: u32, + ) -> Result { + let client = self.client_for(channel) + .ok_or_else(|| format!("no daemon for channel: {}", channel))?; + + let mut req = client.recv_request(); + req.get().set_channel(channel); + req.get().set_all_new(all_new); + req.get().set_min_count(min_count); + + let reply = req.send().promise.await + .map_err(|e| format!("recv failed: {}", e))?; + let text = reply.get() + .map_err(|e| format!("recv reply error: {}", e))? + .get_text() + .map_err(|e| format!("recv text error: {}", e))?; + Ok(text.to_str().unwrap_or("").to_string()) + } + + /// List connected daemon prefixes. + pub fn prefixes(&self) -> Vec<&str> { + self.daemons.keys().map(|s| s.as_str()).collect() + } + + /// List all channels from all connected daemons. + pub async fn list_all(&self) -> Vec<(String, bool, u32)> { + let mut result = Vec::new(); + for daemon in self.daemons.values() { + let req = daemon.client.list_request(); + if let Ok(reply) = req.send().promise.await { + if let Ok(r) = reply.get() { + if let Ok(channels) = r.get_channels() { + for ch in channels.iter() { + if let Ok(name) = ch.get_name() { + result.push(( + name.to_str().unwrap_or("").to_string(), + ch.get_connected(), + ch.get_unread(), + )); + } + } + } + } + } + } + result + } +} diff --git a/src/thalamus/mod.rs b/src/thalamus/mod.rs index 093a567..e23bc43 100644 --- a/src/thalamus/mod.rs +++ b/src/thalamus/mod.rs @@ -8,7 +8,9 @@ // Moved from the standalone poc-daemon crate into the main // consciousness crate. +pub mod channels; pub mod config; +pub mod supervisor; pub mod context; pub mod idle; pub mod modules; @@ -480,28 +482,12 @@ async fn server_main() -> Result<(), Box> { tokio::task::LocalSet::new() .run_until(async move { // Start modules - let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel(); + let (_notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel::(); - let irc_state = if daemon_config.borrow().irc.enabled { - let irc_config = daemon_config.borrow().irc.clone(); - info!("starting irc module: {}:{}", irc_config.server, irc_config.port); - Some(modules::irc::start(irc_config, notify_tx.clone(), daemon_config.clone())) - } else { - info!("irc module disabled"); - None - }; - - let telegram_state = if daemon_config.borrow().telegram.enabled { - info!("starting telegram module"); - Some(modules::telegram::start( - daemon_config.borrow().telegram.clone(), - notify_tx.clone(), - daemon_config.clone(), - )) - } else { - info!("telegram module disabled"); - None - }; + // External modules (IRC, Telegram) now run as separate daemons. + // They connect via the notification channel when implemented. + let _irc_state: Option<()> = None; + let _telegram_state: Option<()> = None; let listener = UnixListener::bind(&sock)?; #[cfg(unix)] @@ -571,8 +557,6 @@ async fn server_main() -> Result<(), Box> { let daemon_impl = rpc::DaemonImpl::new( state.clone(), - irc_state.clone(), - telegram_state.clone(), daemon_config.clone(), ); let client: daemon_capnp::daemon::Client = diff --git a/src/thalamus/modules/irc.rs b/src/thalamus/modules/irc.rs deleted file mode 100644 index 3d27efd..0000000 --- a/src/thalamus/modules/irc.rs +++ /dev/null @@ -1,569 +0,0 @@ -// IRC module. -// -// Maintains a persistent connection to an IRC server. Parses incoming -// messages into notifications, supports sending messages and runtime -// commands (join, leave, etc.). Config changes persist to daemon.toml. -// -// Runs as a spawned local task on the daemon's LocalSet. Notifications -// flow through an mpsc channel into the main state. Reconnects -// automatically with exponential backoff. - -use crate::thalamus::config::{Config, IrcConfig}; -use crate::thalamus::notify::Notification; -use crate::thalamus::{home, now}; -use std::cell::RefCell; -use std::collections::VecDeque; -use std::io; -use std::rc::Rc; -use std::sync::Arc; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::sync::mpsc; -use tracing::{error, info, warn}; - -const MAX_LOG_LINES: usize = 200; -const RECONNECT_BASE_SECS: u64 = 5; -const RECONNECT_MAX_SECS: u64 = 300; -const PING_INTERVAL_SECS: u64 = 120; -const PING_TIMEOUT_SECS: u64 = 30; - -/// Parsed IRC message. -struct IrcMessage { - prefix: Option, // nick!user@host - command: String, - params: Vec, -} - -impl IrcMessage { - fn parse(line: &str) -> Option { - let line = line.trim_end_matches(|c| c == '\r' || c == '\n'); - if line.is_empty() { - return None; - } - - let (prefix, rest) = if line.starts_with(':') { - let space = line.find(' ')?; - (Some(line[1..space].to_string()), &line[space + 1..]) - } else { - (None, line) - }; - - let (command_params, trailing) = if let Some(pos) = rest.find(" :") { - (&rest[..pos], Some(rest[pos + 2..].to_string())) - } else { - (rest, None) - }; - - let mut parts: Vec = command_params - .split_whitespace() - .map(String::from) - .collect(); - - if parts.is_empty() { - return None; - } - - let command = parts.remove(0).to_uppercase(); - let mut params = parts; - if let Some(t) = trailing { - params.push(t); - } - - Some(IrcMessage { - prefix, - command, - params, - }) - } - - /// Extract nick from prefix (nick!user@host → nick). - fn nick(&self) -> Option<&str> { - self.prefix - .as_deref() - .and_then(|p| p.split('!').next()) - } -} - -/// Shared IRC state, accessible from both the read task and RPC handlers. -pub struct IrcState { - pub config: IrcConfig, - pub connected: bool, - pub channels: Vec, - pub log: VecDeque, - writer: Option, -} - -/// Type-erased writer handle so we can store it without generic params. -type WriterHandle = Box; - -trait AsyncWriter { - fn write_line(&mut self, line: &str) -> std::pin::Pin> + '_>>; -} - -/// Writer over a TLS stream. -struct TlsWriter { - inner: tokio::io::WriteHalf>, -} - -impl AsyncWriter for TlsWriter { - fn write_line(&mut self, line: &str) -> std::pin::Pin> + '_>> { - let data = format!("{line}\r\n"); - Box::pin(async move { - self.inner.write_all(data.as_bytes()).await - }) - } -} - -/// Writer over a plain TCP stream. -struct PlainWriter { - inner: tokio::io::WriteHalf, -} - -impl AsyncWriter for PlainWriter { - fn write_line(&mut self, line: &str) -> std::pin::Pin> + '_>> { - let data = format!("{line}\r\n"); - Box::pin(async move { - self.inner.write_all(data.as_bytes()).await - }) - } -} - -impl IrcState { - fn new(config: IrcConfig) -> Self { - Self { - channels: config.channels.clone(), - config, - connected: false, - log: VecDeque::with_capacity(MAX_LOG_LINES), - writer: None, - } - } - - fn push_log(&mut self, line: &str) { - if self.log.len() >= MAX_LOG_LINES { - self.log.pop_front(); - } - self.log.push_back(line.to_string()); - } - - async fn send_raw(&mut self, line: &str) -> io::Result<()> { - if let Some(ref mut w) = self.writer { - w.write_line(line).await - } else { - Err(io::Error::new(io::ErrorKind::NotConnected, "not connected")) - } - } - - async fn send_privmsg(&mut self, target: &str, msg: &str) -> io::Result<()> { - self.send_raw(&format!("PRIVMSG {target} :{msg}")).await - } - - async fn join(&mut self, channel: &str) -> io::Result<()> { - self.send_raw(&format!("JOIN {channel}")).await?; - if !self.channels.iter().any(|c| c == channel) { - self.channels.push(channel.to_string()); - } - Ok(()) - } - - async fn part(&mut self, channel: &str) -> io::Result<()> { - self.send_raw(&format!("PART {channel}")).await?; - self.channels.retain(|c| c != channel); - Ok(()) - } -} - -pub type SharedIrc = Rc>; - -/// Start the IRC module. Returns the shared state handle. -pub fn start( - config: IrcConfig, - notify_tx: mpsc::UnboundedSender, - daemon_config: Rc>, -) -> SharedIrc { - let state = Rc::new(RefCell::new(IrcState::new(config))); - let state_clone = state.clone(); - - tokio::task::spawn_local(async move { - connection_loop(state_clone, notify_tx, daemon_config).await; - }); - - state -} - -async fn connection_loop( - state: SharedIrc, - notify_tx: mpsc::UnboundedSender, - daemon_config: Rc>, -) { - let mut backoff = RECONNECT_BASE_SECS; - - loop { - let config = state.borrow().config.clone(); - info!("irc: connecting to {}:{}", config.server, config.port); - - match connect_and_run(&state, &config, ¬ify_tx).await { - Ok(()) => { - info!("irc: connection closed cleanly"); - } - Err(e) => { - error!("irc: connection error: {e}"); - } - } - - // Reset backoff if we had a working connection (registered - // successfully before disconnecting) - let was_connected = state.borrow().connected; - state.borrow_mut().connected = false; - state.borrow_mut().writer = None; - if was_connected { - backoff = RECONNECT_BASE_SECS; - } - - // Persist current channel list to config - { - let channels = state.borrow().channels.clone(); - let mut dc = daemon_config.borrow_mut(); - dc.irc.channels = channels; - dc.save(); - } - - info!("irc: reconnecting in {backoff}s"); - tokio::time::sleep(std::time::Duration::from_secs(backoff)).await; - backoff = (backoff * 2).min(RECONNECT_MAX_SECS); - } -} - -async fn connect_and_run( - state: &SharedIrc, - config: &IrcConfig, - notify_tx: &mpsc::UnboundedSender, -) -> io::Result<()> { - let addr = format!("{}:{}", config.server, config.port); - let tcp = tokio::net::TcpStream::connect(&addr).await?; - - if config.tls { - let tls_config = rustls::ClientConfig::builder_with_provider( - rustls::crypto::ring::default_provider().into(), - ) - .with_safe_default_protocol_versions() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))? - .with_root_certificates(root_certs()) - .with_no_client_auth(); - let connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config)); - let server_name = rustls::pki_types::ServerName::try_from(config.server.clone()) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; - let tls_stream = connector.connect(server_name, tcp).await?; - - let (reader, writer) = tokio::io::split(tls_stream); - state.borrow_mut().writer = Some(Box::new(TlsWriter { inner: writer })); - - let buf_reader = BufReader::new(reader); - register_and_read(state, config, buf_reader, notify_tx).await - } else { - let (reader, writer) = tokio::io::split(tcp); - state.borrow_mut().writer = Some(Box::new(PlainWriter { inner: writer })); - - let buf_reader = BufReader::new(reader); - register_and_read(state, config, buf_reader, notify_tx).await - } -} - -async fn register_and_read( - state: &SharedIrc, - config: &IrcConfig, - mut reader: BufReader, - notify_tx: &mpsc::UnboundedSender, -) -> io::Result<()> { - // Register - { - let mut s = state.borrow_mut(); - s.send_raw(&format!("NICK {}", config.nick)).await?; - s.send_raw(&format!("USER {} 0 * :{}", config.user, config.realname)).await?; - } - - let mut buf = Vec::new(); - let mut ping_sent = false; - let mut deadline = tokio::time::Instant::now() - + std::time::Duration::from_secs(PING_INTERVAL_SECS); - - loop { - buf.clear(); - - let read_result = tokio::select! { - result = reader.read_until(b'\n', &mut buf) => result, - _ = tokio::time::sleep_until(deadline) => { - if ping_sent { - return Err(io::Error::new( - io::ErrorKind::TimedOut, - "ping timeout — no response from server", - )); - } - info!("irc: no data for {}s, sending PING", PING_INTERVAL_SECS); - state.borrow_mut().send_raw("PING :keepalive").await?; - ping_sent = true; - deadline = tokio::time::Instant::now() - + std::time::Duration::from_secs(PING_TIMEOUT_SECS); - continue; - } - }; - - let n = read_result?; - if n == 0 { break; } - - // Any data from server resets the ping timer - ping_sent = false; - deadline = tokio::time::Instant::now() - + std::time::Duration::from_secs(PING_INTERVAL_SECS); - - // IRC is not guaranteed UTF-8 — lossy conversion handles Latin-1 etc. - let line = String::from_utf8_lossy(&buf).trim_end().to_string(); - if line.is_empty() { continue; } - let msg = match IrcMessage::parse(&line) { - Some(m) => m, - None => continue, - }; - - match msg.command.as_str() { - "PING" => { - let arg = msg.params.first().map(|s| s.as_str()).unwrap_or(""); - state.borrow_mut().send_raw(&format!("PONG :{arg}")).await?; - } - - // RPL_WELCOME — registration complete - "001" => { - info!("irc: registered as {}", config.nick); - state.borrow_mut().connected = true; - - // Join configured channels - let channels = state.borrow().channels.clone(); - for ch in &channels { - if let Err(e) = state.borrow_mut().send_raw(&format!("JOIN {ch}")).await { - warn!("irc: failed to join {ch}: {e}"); - } - } - } - - "PRIVMSG" => { - let target = msg.params.first().map(|s| s.as_str()).unwrap_or(""); - let text = msg.params.get(1).map(|s| s.as_str()).unwrap_or(""); - let nick = msg.nick().unwrap_or("unknown"); - - // Handle CTCP requests (wrapped in \x01) - if text.starts_with('\x01') && text.ends_with('\x01') { - let ctcp = &text[1..text.len()-1]; - if ctcp.starts_with("VERSION") { - let reply = format!( - "NOTICE {nick} :\x01VERSION poc-daemon 0.4.0\x01" - ); - state.borrow_mut().send_raw(&reply).await.ok(); - } - // Don't generate notifications for CTCP - continue; - } - - // Log the message - let log_line = if target.starts_with('#') { - format!("[{}] <{}> {}", target, nick, text) - } else { - format!("[PM:{nick}] {text}") - }; - state.borrow_mut().push_log(&log_line); - - // Write to per-channel/per-user log file - if target.starts_with('#') { - append_log(target, nick, text); - } else { - append_log(&format!("pm-{nick}"), nick, text); - } - - // Generate notification - let (ntype, urgency) = classify_privmsg( - nick, - target, - text, - &config.nick, - ); - - let _ = notify_tx.send(Notification { - ntype, - urgency, - message: log_line, - timestamp: now(), - }); - } - - // Nick in use - "433" => { - let alt = format!("{}_", config.nick); - warn!("irc: nick in use, trying {alt}"); - state.borrow_mut().send_raw(&format!("NICK {alt}")).await?; - } - - "JOIN" | "PART" | "QUIT" | "KICK" | "MODE" | "TOPIC" | "NOTICE" => { - // Could log these, but skip for now - } - - _ => {} - } - } - - Ok(()) -} - -/// Classify a PRIVMSG into notification type and urgency. -fn classify_privmsg(nick: &str, target: &str, text: &str, my_nick: &str) -> (String, u8) { - let my_nick_lower = my_nick.to_lowercase(); - let text_lower = text.to_lowercase(); - - if !target.starts_with('#') { - // Private message - (format!("irc.pm.{nick}"), crate::thalamus::notify::URGENT) - } else if text_lower.contains(&my_nick_lower) { - // Mentioned in channel - (format!("irc.mention.{nick}"), crate::thalamus::notify::NORMAL) - } else { - // Regular channel message - let channel = target.trim_start_matches('#'); - (format!("irc.channel.{channel}"), crate::thalamus::notify::AMBIENT) - } -} - -/// Append a message to the per-channel or per-user log file. -/// Logs go to ~/.consciousness/irc/logs/{target}.log (e.g. #bcachefs.log, pm-user.log) -fn append_log(target: &str, nick: &str, text: &str) { - use std::io::Write; - // Sanitize target for filename (strip leading #, lowercase) - let filename = format!("{}.log", target.trim_start_matches('#').to_lowercase()); - let dir = home().join(".consciousness/irc/logs"); - let _ = std::fs::create_dir_all(&dir); - if let Ok(mut f) = std::fs::OpenOptions::new() - .create(true) - .append(true) - .open(dir.join(&filename)) - { - let secs = now() as u64; - let _ = writeln!(f, "{secs} <{nick}> {text}"); - } -} - -fn root_certs() -> rustls::RootCertStore { - let mut roots = rustls::RootCertStore::empty(); - roots.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); - roots -} - -/// Handle a runtime command from RPC. -pub async fn handle_command( - state: &SharedIrc, - daemon_config: &Rc>, - cmd: &str, - args: &[String], -) -> Result { - match cmd { - "join" => { - let channel = args.first().ok_or("usage: irc join ")?; - let channel = if channel.starts_with('#') { - channel.clone() - } else { - format!("#{channel}") - }; - state - .borrow_mut() - .join(&channel) - .await - .map_err(|e| e.to_string())?; - - // Persist - let mut dc = daemon_config.borrow_mut(); - if !dc.irc.channels.contains(&channel) { - dc.irc.channels.push(channel.clone()); - } - dc.save(); - - Ok(format!("joined {channel}")) - } - "leave" | "part" => { - let channel = args.first().ok_or("usage: irc leave ")?; - let channel = if channel.starts_with('#') { - channel.clone() - } else { - format!("#{channel}") - }; - state - .borrow_mut() - .part(&channel) - .await - .map_err(|e| e.to_string())?; - - // Persist - let mut dc = daemon_config.borrow_mut(); - dc.irc.channels.retain(|c| c != &channel); - dc.save(); - - Ok(format!("left {channel}")) - } - "send" | "msg" => { - if args.len() < 2 { - return Err("usage: irc send ".into()); - } - let target = &args[0]; - if target.starts_with('#') { - let s = state.borrow(); - if !s.channels.iter().any(|c| c == target) { - return Err(format!( - "not in channel {target} (joined: {})", - s.channels.join(", ") - )); - } - } - let msg = args[1..].join(" "); - let nick = state.borrow().config.nick.clone(); - state - .borrow_mut() - .send_privmsg(target, &msg) - .await - .map_err(|e| e.to_string())?; - append_log(target, &nick, &msg); - Ok(format!("sent to {target}")) - } - "status" => { - let s = state.borrow(); - Ok(format!( - "connected={} channels={} log_lines={} nick={}", - s.connected, - s.channels.join(","), - s.log.len(), - s.config.nick, - )) - } - "log" => { - let n: usize = args - .first() - .and_then(|s| s.parse().ok()) - .unwrap_or(15); - let s = state.borrow(); - let lines: Vec<&String> = s.log.iter().rev().take(n).collect(); - let mut lines: Vec<&str> = lines.iter().map(|s| s.as_str()).collect(); - lines.reverse(); - Ok(lines.join("\n")) - } - "nick" => { - let new_nick = args.first().ok_or("usage: irc nick ")?; - state - .borrow_mut() - .send_raw(&format!("NICK {new_nick}")) - .await - .map_err(|e| e.to_string())?; - - let mut dc = daemon_config.borrow_mut(); - dc.irc.nick = new_nick.clone(); - dc.save(); - - Ok(format!("nick → {new_nick}")) - } - _ => Err(format!( - "unknown irc command: {cmd}\n\ - commands: join, leave, send, status, log, nick" - )), - } -} diff --git a/src/thalamus/modules/mod.rs b/src/thalamus/modules/mod.rs index 0e5debc..288fde5 100644 --- a/src/thalamus/modules/mod.rs +++ b/src/thalamus/modules/mod.rs @@ -1,2 +1,2 @@ -pub mod irc; -pub mod telegram; +// External communication modules (IRC, Telegram, etc.) live in +// separate daemons, not in the core consciousness binary. diff --git a/src/thalamus/modules/telegram.rs b/src/thalamus/modules/telegram.rs deleted file mode 100644 index 13150c0..0000000 --- a/src/thalamus/modules/telegram.rs +++ /dev/null @@ -1,374 +0,0 @@ -// Telegram module. -// -// Long-polls the Telegram Bot API for messages from Kent's chat. -// Downloads media (photos, voice, documents) to local files. -// Sends text and files. Notifications flow through mpsc into the -// daemon's main state. -// -// Only accepts messages from the configured chat_id (prompt -// injection defense — other senders get a "private bot" reply). - -use crate::thalamus::config::{Config, TelegramConfig}; -use crate::thalamus::notify::Notification; -use crate::thalamus::{home, now}; -use std::cell::RefCell; -use std::collections::VecDeque; -use std::path::PathBuf; -use std::rc::Rc; -use tokio::sync::mpsc; -use tracing::{error, info}; - -const MAX_LOG_LINES: usize = 100; -const POLL_TIMEOUT: u64 = 30; - -pub struct TelegramState { - pub config: TelegramConfig, - pub connected: bool, - pub log: VecDeque, - pub last_offset: i64, - client: reqwest::Client, -} - -pub type SharedTelegram = Rc>; - -impl TelegramState { - fn new(config: TelegramConfig) -> Self { - let last_offset = load_offset(); - Self { - config, - connected: false, - log: VecDeque::with_capacity(MAX_LOG_LINES), - last_offset, - client: reqwest::Client::new(), - } - } - - fn push_log(&mut self, line: &str) { - if self.log.len() >= MAX_LOG_LINES { - self.log.pop_front(); - } - self.log.push_back(line.to_string()); - } - - fn api_url(&self, method: &str) -> String { - format!( - "https://api.telegram.org/bot{}/{}", - self.config.token, method - ) - } -} - -fn offset_path() -> PathBuf { - home().join(".consciousness/telegram/last_offset") -} - -fn load_offset() -> i64 { - std::fs::read_to_string(offset_path()) - .ok() - .and_then(|s| s.trim().parse().ok()) - .unwrap_or(0) -} - -fn save_offset(offset: i64) { - let _ = std::fs::write(offset_path(), offset.to_string()); -} - -fn history_path() -> PathBuf { - home().join(".consciousness/telegram/history.log") -} - -fn media_dir() -> PathBuf { - home().join(".consciousness/telegram/media") -} - -fn append_history(line: &str) { - use std::io::Write; - if let Ok(mut f) = std::fs::OpenOptions::new() - .create(true) - .append(true) - .open(history_path()) - { - let _ = writeln!(f, "{}", line); - } -} - -/// Start the Telegram module. Returns the shared state handle. -pub fn start( - config: TelegramConfig, - notify_tx: mpsc::UnboundedSender, - _daemon_config: Rc>, -) -> SharedTelegram { - let state = Rc::new(RefCell::new(TelegramState::new(config))); - let state_clone = state.clone(); - - tokio::task::spawn_local(async move { - poll_loop(state_clone, notify_tx).await; - }); - - state -} - -async fn poll_loop( - state: SharedTelegram, - notify_tx: mpsc::UnboundedSender, -) { - let _ = std::fs::create_dir_all(media_dir()); - - loop { - match poll_once(&state, ¬ify_tx).await { - Ok(()) => {} - Err(e) => { - error!("telegram: poll error: {e}"); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - } - } - } -} - -async fn poll_once( - state: &SharedTelegram, - notify_tx: &mpsc::UnboundedSender, -) -> Result<(), Box> { - let (url, chat_id, token) = { - let s = state.borrow(); - let url = format!( - "{}?offset={}&timeout={}", - s.api_url("getUpdates"), - s.last_offset, - POLL_TIMEOUT, - ); - (url, s.config.chat_id, s.config.token.clone()) - }; - - let client = state.borrow().client.clone(); - let resp: serde_json::Value = client - .get(&url) - .timeout(std::time::Duration::from_secs(POLL_TIMEOUT + 5)) - .send() - .await? - .json() - .await?; - - if !state.borrow().connected { - state.borrow_mut().connected = true; - info!("telegram: connected"); - } - - let results = resp["result"].as_array(); - let results = match results { - Some(r) => r, - None => return Ok(()), - }; - - for update in results { - let update_id = update["update_id"].as_i64().unwrap_or(0); - let msg = &update["message"]; - - // Update offset - { - let mut s = state.borrow_mut(); - s.last_offset = update_id + 1; - save_offset(s.last_offset); - } - - let msg_chat_id = msg["chat"]["id"].as_i64().unwrap_or(0); - if msg_chat_id != chat_id { - // Reject messages from unknown chats - let reject_url = format!( - "https://api.telegram.org/bot{}/sendMessage", - token - ); - let _ = client - .post(&reject_url) - .form(&[ - ("chat_id", msg_chat_id.to_string()), - ("text", "This is a private bot.".to_string()), - ]) - .send() - .await; - continue; - } - - let sender = msg["from"]["first_name"] - .as_str() - .unwrap_or("unknown") - .to_string(); - - // Handle different message types - if let Some(text) = msg["text"].as_str() { - let log_line = format!("[{}] {}", sender, text); - state.borrow_mut().push_log(&log_line); - - let ts = timestamp(); - append_history(&format!("{ts} [{sender}] {text}")); - - let _ = notify_tx.send(Notification { - ntype: format!("telegram.{}", sender.to_lowercase()), - urgency: crate::thalamus::notify::NORMAL, - message: log_line, - timestamp: now(), - }); - } else if let Some(photos) = msg["photo"].as_array() { - // Pick largest photo - let best = photos.iter().max_by_key(|p| p["file_size"].as_i64().unwrap_or(0)); - if let Some(photo) = best { - if let Some(file_id) = photo["file_id"].as_str() { - let caption = msg["caption"].as_str().unwrap_or(""); - let local = download_file(&client, &token, file_id, ".jpg").await; - let display = match &local { - Some(p) => format!("[photo: {}]{}", p.display(), if caption.is_empty() { String::new() } else { format!(" {caption}") }), - None => format!("[photo]{}", if caption.is_empty() { String::new() } else { format!(" {caption}") }), - }; - let log_line = format!("[{}] {}", sender, display); - state.borrow_mut().push_log(&log_line); - let ts = timestamp(); - append_history(&format!("{ts} [{sender}] {display}")); - - let _ = notify_tx.send(Notification { - ntype: format!("telegram.{}", sender.to_lowercase()), - urgency: crate::thalamus::notify::NORMAL, - message: log_line, - timestamp: now(), - }); - } - } - } else if msg["voice"].is_object() { - if let Some(file_id) = msg["voice"]["file_id"].as_str() { - let caption = msg["caption"].as_str().unwrap_or(""); - let local = download_file(&client, &token, file_id, ".ogg").await; - let display = match &local { - Some(p) => format!("[voice: {}]{}", p.display(), if caption.is_empty() { String::new() } else { format!(" {caption}") }), - None => format!("[voice]{}", if caption.is_empty() { String::new() } else { format!(" {caption}") }), - }; - let log_line = format!("[{}] {}", sender, display); - state.borrow_mut().push_log(&log_line); - let ts = timestamp(); - append_history(&format!("{ts} [{sender}] {display}")); - - let _ = notify_tx.send(Notification { - ntype: format!("telegram.{}", sender.to_lowercase()), - urgency: crate::thalamus::notify::NORMAL, - message: log_line, - timestamp: now(), - }); - } - } else if msg["document"].is_object() { - if let Some(file_id) = msg["document"]["file_id"].as_str() { - let fname = msg["document"]["file_name"].as_str().unwrap_or("file"); - let caption = msg["caption"].as_str().unwrap_or(""); - let local = download_file(&client, &token, file_id, "").await; - let display = match &local { - Some(p) => format!("[doc: {} -> {}]{}", fname, p.display(), if caption.is_empty() { String::new() } else { format!(" {caption}") }), - None => format!("[doc: {}]{}", fname, if caption.is_empty() { String::new() } else { format!(" {caption}") }), - }; - let log_line = format!("[{}] {}", sender, display); - state.borrow_mut().push_log(&log_line); - let ts = timestamp(); - append_history(&format!("{ts} [{sender}] {display}")); - - let _ = notify_tx.send(Notification { - ntype: format!("telegram.{}", sender.to_lowercase()), - urgency: crate::thalamus::notify::NORMAL, - message: log_line, - timestamp: now(), - }); - } - } - } - - Ok(()) -} - -async fn download_file( - client: &reqwest::Client, - token: &str, - file_id: &str, - ext: &str, -) -> Option { - let url = format!("https://api.telegram.org/bot{token}/getFile?file_id={file_id}"); - let resp: serde_json::Value = client.get(&url).send().await.ok()?.json().await.ok()?; - let file_path = resp["result"]["file_path"].as_str()?; - - let download_url = format!("https://api.telegram.org/file/bot{token}/{file_path}"); - let bytes = client.get(&download_url).send().await.ok()?.bytes().await.ok()?; - - let basename = std::path::Path::new(file_path) - .file_name() - .and_then(|n| n.to_str()) - .unwrap_or("file"); - let local_name = if ext.is_empty() { - basename.to_string() - } else { - let stem = std::path::Path::new(basename) - .file_stem() - .and_then(|s| s.to_str()) - .unwrap_or("file"); - format!("{}{}", stem, ext) - }; - let secs = now() as u64; - let local_path = media_dir().join(format!("{secs}_{local_name}")); - std::fs::write(&local_path, &bytes).ok()?; - Some(local_path) -} - -fn timestamp() -> String { - // Use the same unix seconds approach as IRC module - format!("{}", now() as u64) -} - -/// Handle a runtime command from RPC. -pub async fn handle_command( - state: &SharedTelegram, - _daemon_config: &Rc>, - cmd: &str, - args: &[String], -) -> Result { - match cmd { - "send" => { - let msg = args.join(" "); - if msg.is_empty() { - return Err("usage: telegram send ".into()); - } - let (url, client) = { - let s = state.borrow(); - (s.api_url("sendMessage"), s.client.clone()) - }; - let chat_id = state.borrow().config.chat_id.to_string(); - client - .post(&url) - .form(&[("chat_id", chat_id.as_str()), ("text", msg.as_str())]) - .send() - .await - .map_err(|e| e.to_string())?; - - let ts = timestamp(); - append_history(&format!("{ts} [agent] {msg}")); - - Ok("sent".to_string()) - } - "status" => { - let s = state.borrow(); - Ok(format!( - "connected={} log_lines={} offset={}", - s.connected, - s.log.len(), - s.last_offset, - )) - } - "log" => { - let n: usize = args - .first() - .and_then(|s| s.parse().ok()) - .unwrap_or(15); - let s = state.borrow(); - let lines: Vec<&String> = s.log.iter().rev().take(n).collect(); - let mut lines: Vec<&str> = lines.iter().map(|s| s.as_str()).collect(); - lines.reverse(); - Ok(lines.join("\n")) - } - _ => Err(format!( - "unknown telegram command: {cmd}\n\ - commands: send, status, log" - )), - } -} diff --git a/src/thalamus/rpc.rs b/src/thalamus/rpc.rs index b5cc5b0..985bb10 100644 --- a/src/thalamus/rpc.rs +++ b/src/thalamus/rpc.rs @@ -7,7 +7,6 @@ use super::config::Config; use super::daemon_capnp::daemon; use super::idle; -use super::modules::{irc, telegram}; use super::notify; use capnp::capability::Promise; use std::cell::RefCell; @@ -16,19 +15,16 @@ use tracing::info; pub struct DaemonImpl { state: Rc>, - irc: Option, - telegram: Option, - config: Rc>, + // TODO: replace with named channel map + _config: Rc>, } impl DaemonImpl { pub fn new( state: Rc>, - irc: Option, - telegram: Option, - config: Rc>, + _config: Rc>, ) -> Self { - Self { state, irc, telegram, config } + Self { state, _config } } } @@ -361,7 +357,7 @@ impl daemon::Server for DaemonImpl { ) -> Promise<(), capnp::Error> { let params = pry!(params.get()); let module = pry!(pry!(params.get_module()).to_str()).to_string(); - let command = pry!(pry!(params.get_command()).to_str()).to_string(); + let _command = pry!(pry!(params.get_command()).to_str()).to_string(); let args_reader = pry!(params.get_args()); let mut args = Vec::new(); for i in 0..args_reader.len() { @@ -369,44 +365,7 @@ impl daemon::Server for DaemonImpl { } match module.as_str() { - "irc" => { - let irc = match &self.irc { - Some(irc) => irc.clone(), - None => { - results.get().set_result("irc module not enabled"); - return Promise::ok(()); - } - }; - let config = self.config.clone(); - - Promise::from_future(async move { - let result = irc::handle_command(&irc, &config, &command, &args).await; - match result { - Ok(msg) => results.get().set_result(&msg), - Err(msg) => results.get().set_result(&format!("error: {msg}")), - } - Ok(()) - }) - } - "telegram" => { - let tg = match &self.telegram { - Some(tg) => tg.clone(), - None => { - results.get().set_result("telegram module not enabled"); - return Promise::ok(()); - } - }; - let config = self.config.clone(); - - Promise::from_future(async move { - let result = telegram::handle_command(&tg, &config, &command, &args).await; - match result { - Ok(msg) => results.get().set_result(&msg), - Err(msg) => results.get().set_result(&format!("error: {msg}")), - } - Ok(()) - }) - } + // TODO: route module commands through named channel system _ => { results .get() diff --git a/src/thalamus/supervisor.rs b/src/thalamus/supervisor.rs new file mode 100644 index 0000000..fe15235 --- /dev/null +++ b/src/thalamus/supervisor.rs @@ -0,0 +1,204 @@ +// supervisor.rs — Channel daemon lifecycle management +// +// Reads ~/.consciousness/channels/channels.json5, starts/stops +// channel daemons as needed. The socket file is the liveness +// indicator — if it exists and responds, the daemon is running. +// +// File locking prevents multiple consciousness/claude-code instances +// from racing to start the same daemon. + +use std::collections::BTreeMap; +use std::fs::File; +use std::path::PathBuf; +use std::process::Child; +use tracing::{info, warn, error}; + +fn channels_dir() -> PathBuf { + dirs::home_dir() + .unwrap_or_default() + .join(".consciousness/channels") +} + +fn config_path() -> PathBuf { + channels_dir().join("channels.json5") +} + +fn lock_path() -> PathBuf { + channels_dir().join(".supervisor.lock") +} + +#[derive(serde::Deserialize, serde::Serialize, Clone)] +pub struct ChannelEntry { + /// Binary name (looked up in PATH) + pub binary: String, + #[serde(default = "default_true")] + pub enabled: bool, + #[serde(default = "default_true")] + pub autostart: bool, +} + +fn default_true() -> bool { true } + +/// RAII file lock — prevents multiple instances from racing. +struct SupervisorLock { + _file: File, +} + +impl SupervisorLock { + fn acquire() -> Option { + let _ = std::fs::create_dir_all(channels_dir()); + let file = std::fs::OpenOptions::new() + .write(true) + .create(true) + .open(lock_path()) + .ok()?; + + use std::os::unix::io::AsRawFd; + let ret = unsafe { + libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) + }; + if ret != 0 { + return None; + } + Some(Self { _file: file }) + } +} + +/// Manages channel daemon processes. +pub struct Supervisor { + config: BTreeMap, + children: BTreeMap, +} + +impl Supervisor { + pub fn new() -> Self { + Self { + config: BTreeMap::new(), + children: BTreeMap::new(), + } + } + + /// Load config from channels.json5. + pub fn load_config(&mut self) { + let path = config_path(); + match std::fs::read_to_string(&path) { + Ok(text) => { + match serde_json::from_str::>(&text) { + Ok(cfg) => { + info!("loaded {} channel configs", cfg.len()); + self.config = cfg; + } + Err(e) => warn!("failed to parse {}: {}", path.display(), e), + } + } + Err(_) => info!("no channels.json5, no channels configured"), + } + } + + /// Check if a daemon is alive by testing its socket. + fn is_alive(name: &str) -> bool { + let sock = channels_dir().join(format!("{}.sock", name)); + if !sock.exists() { + return false; + } + match std::os::unix::net::UnixStream::connect(&sock) { + Ok(_) => true, + Err(_) => { + let _ = std::fs::remove_file(&sock); + false + } + } + } + + /// Ensure all configured autostart daemons are running. + /// Acquires file lock to prevent races with other instances. + pub fn ensure_running(&mut self) { + let _lock = match SupervisorLock::acquire() { + Some(l) => l, + None => { + info!("another instance is managing channels"); + return; + } + }; + + let entries: Vec<(String, ChannelEntry)> = self.config.iter() + .filter(|(_, e)| e.enabled && e.autostart) + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + + for (name, entry) in entries { + if Self::is_alive(&name) { + continue; + } + + // Check if we spawned it and it died unexpectedly + if let Some(child) = self.children.get_mut(&name) { + match child.try_wait() { + Ok(Some(status)) => { + warn!("channel {} exited unexpectedly ({}), restarting", name, status); + self.children.remove(&name); + } + Ok(None) => continue, // still starting up + Err(_) => { self.children.remove(&name); } + } + } + + self.start_one(&name, &entry); + } + } + + fn start_one(&mut self, name: &str, entry: &ChannelEntry) { + info!("starting channel daemon: {} ({})", name, entry.binary); + + match std::process::Command::new(&entry.binary) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::piped()) + .spawn() + { + Ok(child) => { + info!("channel {} started (pid {})", name, child.id()); + self.children.insert(name.to_string(), child); + } + Err(e) => error!("failed to start channel {}: {}", name, e), + } + } + + /// Stop a specific daemon. + pub fn stop_one(&mut self, name: &str) { + let sock = channels_dir().join(format!("{}.sock", name)); + let _ = std::fs::remove_file(&sock); + + if let Some(mut child) = self.children.remove(name) { + info!("stopping channel {} (pid {})", name, child.id()); + let _ = child.kill(); + let _ = child.wait(); + } + } + + /// Stop all managed daemons. + pub fn stop_all(&mut self) { + let names: Vec = self.children.keys().cloned().collect(); + for name in names { + self.stop_one(&name); + } + } + + /// List configured channels and their status. + pub fn status(&self) -> Vec<(String, bool, bool)> { + self.config.iter() + .map(|(name, entry)| (name.clone(), entry.enabled, Self::is_alive(name))) + .collect() + } +} + +impl Drop for Supervisor { + fn drop(&mut self) { + // Don't kill daemons on drop — they should outlive us + for (name, child) in &mut self.children { + match child.try_wait() { + Ok(Some(_)) => {} + _ => info!("leaving channel {} running (pid {})", name, child.id()), + } + } + } +} diff --git a/src/thought/training.rs b/src/thought/training.rs index 6f2b39b..fb59848 100644 --- a/src/thought/training.rs +++ b/src/thought/training.rs @@ -198,6 +198,7 @@ pub async fn score_memories( /// Score response from the /v1/score endpoint. #[derive(serde::Deserialize)] struct ScoreMessageResult { + #[allow(dead_code)] message_index: usize, total_logprob: f64, } diff --git a/src/tui.rs b/src/tui.rs deleted file mode 100644 index 9e8a19c..0000000 --- a/src/tui.rs +++ /dev/null @@ -1,885 +0,0 @@ -// TUI dashboard for poc-memory daemon -// -// Connects to the daemon status socket, polls periodically, and renders -// a tabbed interface with per-agent-type tabs for drill-down. Designed -// for observability and control of the consolidation system. -// -// Tabs: -// Overview — graph health gauges, in-flight tasks, recent completions -// Pipeline — daily pipeline phases in execution order -// — one tab per agent type (replay, linker, separator, transfer, -// health, apply, etc.) showing all runs with output + log history -// Log — auto-scrolling daemon.log tail - -use crate::agents::daemon::GraphHealth; -use crossterm::event::{self, Event, KeyCode, KeyModifiers}; -use jobkit::{TaskInfo, TaskStatus}; -use ratatui::{ - layout::{Constraint, Layout, Rect}, - style::{Color, Modifier, Style, Stylize}, - text::{Line, Span}, - widgets::{Block, Borders, Cell, Gauge, Paragraph, Row, Table, Tabs, Wrap}, - DefaultTerminal, Frame, -}; -use std::fs; -use std::path::PathBuf; -use std::time::{Duration, Instant}; - -const POLL_INTERVAL: Duration = Duration::from_secs(2); - -// Agent types we know about, in display order -const AGENT_TYPES: &[&str] = &[ - "health", "linker", "organize", "distill", "separator", "split", - "apply", "orphans", "cap", "digest", "digest-links", "knowledge", "rename", -]; - -fn log_path() -> PathBuf { - dirs::home_dir().unwrap_or_default().join(".consciousness/logs/daemon.log") -} - -// --- Data fetching --- - -#[derive(serde::Deserialize)] -struct DaemonStatus { - #[allow(dead_code)] - pid: u32, - tasks: Vec, - #[serde(default)] - #[allow(dead_code)] - last_daily: Option, - #[serde(default)] - graph_health: Option, -} - -fn fetch_status() -> Option { - let json = jobkit::daemon::socket::send_rpc(&crate::config::get().data_dir, "")?; - serde_json::from_str(&json).ok() -} - -#[derive(Clone)] -struct LogEntry { - ts: String, - job: String, - event: String, - detail: String, -} - -fn load_log_entries(max: usize) -> Vec { - let content = match fs::read_to_string(log_path()) { - Ok(c) => c, - Err(_) => return Vec::new(), - }; - - content - .lines() - .rev() - .take(max) - .filter_map(|line| { - let obj: serde_json::Value = serde_json::from_str(line).ok()?; - Some(LogEntry { - ts: obj.get("ts")?.as_str()?.to_string(), - job: obj.get("job")?.as_str()?.to_string(), - event: obj.get("event")?.as_str()?.to_string(), - detail: obj - .get("detail") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(), - }) - }) - .collect::>() - .into_iter() - .rev() - .collect() -} - -// --- Tab model --- - -#[derive(Clone, PartialEq, Eq)] -enum Tab { - Overview, - Pipeline, - Agent(String), // agent type name: "replay", "linker", etc. - Log, -} - -impl Tab { - fn label(&self) -> String { - match self { - Tab::Overview => "Overview".into(), - Tab::Pipeline => "Pipeline".into(), - Tab::Agent(name) => name.clone(), - Tab::Log => "Log".into(), - } - } -} - -// --- App state --- - -struct App { - tabs: Vec, - tab_idx: usize, - status: Option, - log_entries: Vec, - last_poll: Instant, - scroll: usize, - count_prefix: Option, // numeric prefix for commands (vim-style) - flash_msg: Option<(String, Instant)>, // transient status message -} - -impl App { - fn new() -> Self { - let status = fetch_status(); - let log_entries = load_log_entries(500); - let tabs = Self::build_tabs(&status, &log_entries); - Self { - tabs, - tab_idx: 0, - status, - log_entries, - last_poll: Instant::now(), - scroll: 0, - count_prefix: None, - flash_msg: None, - } - } - - fn build_tabs(status: &Option, log_entries: &[LogEntry]) -> Vec { - let mut tabs = vec![Tab::Overview, Tab::Pipeline]; - - for agent_type in AGENT_TYPES { - let prefix = format!("c-{}", agent_type); - let has_tasks = status - .as_ref() - .map(|s| s.tasks.iter().any(|t| t.name.starts_with(&prefix))) - .unwrap_or(false); - let has_logs = log_entries.iter().any(|e| { - e.job.starts_with(&prefix) || e.job == *agent_type - }); - if has_tasks || has_logs { - tabs.push(Tab::Agent(agent_type.to_string())); - } - } - - tabs.push(Tab::Log); - tabs - } - - fn poll(&mut self) { - if self.last_poll.elapsed() >= POLL_INTERVAL { - self.status = fetch_status(); - self.log_entries = load_log_entries(500); - - // Rebuild tabs, preserving current selection - let current = self.tabs.get(self.tab_idx).cloned(); - self.tabs = Self::build_tabs(&self.status, &self.log_entries); - if let Some(ref cur) = current { - self.tab_idx = self.tabs.iter().position(|t| t == cur).unwrap_or(0); - } - - self.last_poll = Instant::now(); - } - } - - fn current_tab(&self) -> &Tab { - self.tabs.get(self.tab_idx).unwrap_or(&Tab::Overview) - } - - fn tasks(&self) -> &[TaskInfo] { - self.status - .as_ref() - .map(|s| s.tasks.as_slice()) - .unwrap_or(&[]) - } - - fn tasks_for_agent(&self, agent_type: &str) -> Vec<&TaskInfo> { - let prefix = format!("c-{}", agent_type); - self.tasks() - .iter() - .filter(|t| t.name.starts_with(&prefix)) - .collect() - } - - fn logs_for_agent(&self, agent_type: &str) -> Vec<&LogEntry> { - let prefix = format!("c-{}", agent_type); - self.log_entries - .iter() - .filter(|e| e.job.starts_with(&prefix) || e.job == agent_type) - .collect() - } - - fn pipeline_tasks(&self) -> Vec<&TaskInfo> { - self.tasks() - .iter() - .filter(|t| { - let n = &t.name; - n.starts_with("c-") - || n.starts_with("consolidate:") - || n.starts_with("knowledge-loop:") - || n.starts_with("digest:") - || n.starts_with("decay:") - }) - .collect() - } - - fn next_tab(&mut self) { - self.tab_idx = (self.tab_idx + 1) % self.tabs.len(); - self.scroll = 0; - } - - fn prev_tab(&mut self) { - self.tab_idx = (self.tab_idx + self.tabs.len() - 1) % self.tabs.len(); - self.scroll = 0; - } -} - -// --- Rendering --- - -fn format_duration(d: Duration) -> String { - let ms = d.as_millis(); - if ms < 1_000 { - format!("{}ms", ms) - } else if ms < 60_000 { - format!("{:.1}s", ms as f64 / 1000.0) - } else if ms < 3_600_000 { - format!("{}m{}s", ms / 60_000, (ms % 60_000) / 1000) - } else { - format!("{}h{}m", ms / 3_600_000, (ms % 3_600_000) / 60_000) - } -} - -fn task_elapsed(t: &TaskInfo) -> Duration { - if matches!(t.status, TaskStatus::Running) { - if let Some(started) = t.started_at { - let now = std::time::SystemTime::now() - .duration_since(std::time::SystemTime::UNIX_EPOCH) - .unwrap_or_default() - .as_secs_f64(); - Duration::from_secs_f64((now - started).max(0.0)) - } else { - t.elapsed - } - } else { - t.result.as_ref().map(|r| r.duration).unwrap_or(t.elapsed) - } -} - -fn status_style(t: &TaskInfo) -> Style { - if t.cancelled { - return Style::default().fg(Color::DarkGray); - } - match t.status { - TaskStatus::Running => Style::default().fg(Color::Green), - TaskStatus::Completed => Style::default().fg(Color::Blue), - TaskStatus::Failed => Style::default().fg(Color::Red), - TaskStatus::Pending => Style::default().fg(Color::DarkGray), - } -} - -fn status_symbol(t: &TaskInfo) -> &'static str { - if t.cancelled { - return "✗"; - } - match t.status { - TaskStatus::Running => "▶", - TaskStatus::Completed => "✓", - TaskStatus::Failed => "✗", - TaskStatus::Pending => "·", - } -} - -fn event_style(event: &str) -> Style { - match event { - "completed" => Style::default().fg(Color::Blue), - "failed" => Style::default().fg(Color::Red), - "started" => Style::default().fg(Color::Green), - _ => Style::default().fg(Color::DarkGray), - } -} - -fn event_symbol(event: &str) -> &'static str { - match event { - "completed" => "✓", - "failed" => "✗", - "started" => "▶", - _ => "·", - } -} - -fn ts_time(ts: &str) -> &str { - if ts.len() >= 19 { &ts[11..19] } else { ts } -} - -fn render(frame: &mut Frame, app: &App) { - let [header, body, footer] = Layout::vertical([ - Constraint::Length(3), - Constraint::Min(0), - Constraint::Length(1), - ]) - .areas(frame.area()); - - // Tab bar — show index hints for first 9 tabs - let tab_titles: Vec = app - .tabs - .iter() - .enumerate() - .map(|(i, t)| { - let hint = if i < 9 { - format!("{}", i + 1) - } else { - " ".into() - }; - Line::from(format!(" {} {} ", hint, t.label())) - }) - .collect(); - let tabs = Tabs::new(tab_titles) - .select(app.tab_idx) - .highlight_style( - Style::default() - .fg(Color::Yellow) - .add_modifier(Modifier::BOLD), - ) - .block(Block::default().borders(Borders::ALL).title(" poc-memory daemon ")); - frame.render_widget(tabs, header); - - // Body - match app.current_tab() { - Tab::Overview => render_overview(frame, app, body), - Tab::Pipeline => render_pipeline(frame, app, body), - Tab::Agent(name) => render_agent_tab(frame, app, name, body), - Tab::Log => render_log(frame, app, body), - } - - // Footer — flash message, count prefix, or help text - let footer_text = if let Some((ref msg, when)) = app.flash_msg { - if when.elapsed() < Duration::from_secs(3) { - Line::from(vec![ - Span::raw(" "), - Span::styled(msg.as_str(), Style::default().fg(Color::Green)), - ]) - } else { - Line::raw("") // expired, will show help below - } - } else { - Line::raw("") - }; - - let footer_line = if !footer_text.spans.is_empty() { - footer_text - } else if let Some(n) = app.count_prefix { - Line::from(vec![ - Span::styled(format!(" {}×", n), Style::default().fg(Color::Yellow).add_modifier(Modifier::BOLD)), - Span::raw(" r: run agent │ Esc: cancel"), - ]) - } else { - match app.current_tab() { - Tab::Agent(_) => Line::from( - " Tab: switch │ ↑↓: scroll │ [N]r: run agent │ c: consolidate │ q: quit ", - ), - _ => Line::from( - " Tab/1-9: switch │ ↑↓: scroll │ c: consolidate │ q: quit ", - ), - } - }; - let footer_widget = Paragraph::new(footer_line).style(Style::default().fg(Color::DarkGray)); - frame.render_widget(footer_widget, footer); -} - -// --- Overview tab --- - -fn render_overview(frame: &mut Frame, app: &App, area: Rect) { - let [health_area, tasks_area] = - Layout::vertical([Constraint::Length(12), Constraint::Min(0)]).areas(area); - - if let Some(gh) = app.status.as_ref().and_then(|s| s.graph_health.as_ref()) { - render_health(frame, gh, health_area); - } else { - let p = Paragraph::new(" No graph health data available") - .block(Block::default().borders(Borders::ALL).title(" Graph Health ")); - frame.render_widget(p, health_area); - } - - // In-flight + recent - let in_flight: Vec<&TaskInfo> = app - .tasks() - .iter() - .filter(|t| matches!(t.status, TaskStatus::Running | TaskStatus::Pending)) - .collect(); - - let mut lines: Vec = Vec::new(); - - if in_flight.is_empty() { - lines.push(Line::from(" No tasks in flight").fg(Color::DarkGray)); - } else { - for t in &in_flight { - let elapsed = task_elapsed(t); - let progress = t - .progress - .as_deref() - .filter(|p| *p != "idle") - .unwrap_or(""); - lines.push(Line::from(vec![ - Span::styled(format!(" {} ", status_symbol(t)), status_style(t)), - Span::raw(format!("{:30}", short_name(&t.name))), - Span::styled( - format!(" {:>8}", format_duration(elapsed)), - Style::default().fg(Color::DarkGray), - ), - Span::raw(format!(" {}", progress)), - ])); - if let Some(ref lp) = t.log_path { - lines.push(Line::from(format!(" │ log: {}", lp)).fg(Color::DarkGray)); - } - } - } - - lines.push(Line::raw("")); - lines.push(Line::from(" Recent:").fg(Color::DarkGray)); - let recent: Vec<&LogEntry> = app - .log_entries - .iter() - .rev() - .filter(|e| e.event == "completed" || e.event == "failed") - .take(10) - .collect::>() - .into_iter() - .rev() - .collect(); - for entry in &recent { - lines.push(Line::from(vec![ - Span::raw(" "), - Span::styled(event_symbol(&entry.event), event_style(&entry.event)), - Span::raw(format!( - " {} {:28} {}", - ts_time(&entry.ts), - short_name(&entry.job), - entry.detail - )), - ])); - } - - let tasks_widget = Paragraph::new(lines) - .block(Block::default().borders(Borders::ALL).title(" Tasks ")) - .scroll((app.scroll as u16, 0)); - frame.render_widget(tasks_widget, tasks_area); -} - -fn render_health(frame: &mut Frame, gh: &GraphHealth, area: Rect) { - let block = Block::default() - .borders(Borders::ALL) - .title(format!(" Graph Health ({}) ", gh.computed_at)); - let inner = block.inner(area); - frame.render_widget(block, area); - - let [metrics_area, gauges_area, plan_area] = Layout::vertical([ - Constraint::Length(2), - Constraint::Length(4), - Constraint::Min(1), - ]) - .areas(inner); - - // Metrics - let summary = Line::from(format!( - " {} nodes {} edges {} communities", - gh.nodes, gh.edges, gh.communities - )); - let ep_line = Line::from(vec![ - Span::raw(" episodic: "), - Span::styled( - format!("{:.0}%", gh.episodic_ratio * 100.0), - if gh.episodic_ratio < 0.4 { - Style::default().fg(Color::Green) - } else { - Style::default().fg(Color::Red) - }, - ), - Span::raw(format!(" σ={:.1}", gh.sigma)), - ]); - frame.render_widget(Paragraph::new(vec![summary, ep_line]), metrics_area); - - // Gauges - let [g1, g2, g3] = Layout::horizontal([ - Constraint::Ratio(1, 3), - Constraint::Ratio(1, 3), - Constraint::Ratio(1, 3), - ]) - .areas(gauges_area); - - let alpha_color = if gh.alpha >= 2.5 { Color::Green } else { Color::Red }; - frame.render_widget( - Gauge::default() - .block(Block::default().borders(Borders::ALL).title(" α (≥2.5) ")) - .gauge_style(Style::default().fg(alpha_color)) - .ratio((gh.alpha / 5.0).clamp(0.0, 1.0) as f64) - .label(format!("{:.2}", gh.alpha)), - g1, - ); - - let gini_color = if gh.gini <= 0.4 { Color::Green } else { Color::Red }; - frame.render_widget( - Gauge::default() - .block(Block::default().borders(Borders::ALL).title(" gini (≤0.4) ")) - .gauge_style(Style::default().fg(gini_color)) - .ratio(gh.gini.clamp(0.0, 1.0) as f64) - .label(format!("{:.3}", gh.gini)), - g2, - ); - - let cc_color = if gh.avg_cc >= 0.2 { Color::Green } else { Color::Red }; - frame.render_widget( - Gauge::default() - .block(Block::default().borders(Borders::ALL).title(" cc (≥0.2) ")) - .gauge_style(Style::default().fg(cc_color)) - .ratio(gh.avg_cc.clamp(0.0, 1.0) as f64) - .label(format!("{:.3}", gh.avg_cc)), - g3, - ); - - // Plan - let plan_total: usize = gh.plan_counts.values().sum::() + 1; - let plan_summary: Vec = gh.plan_counts.iter() - .filter(|(_, c)| **c > 0) - .map(|(a, c)| format!("{}{}", &a[..1], c)) - .collect(); - let plan_line = Line::from(vec![ - Span::raw(" plan: "), - Span::styled( - format!("{}", plan_total), - Style::default().add_modifier(Modifier::BOLD), - ), - Span::raw(format!(" agents ({} +health)", plan_summary.join(" "))), - ]); - frame.render_widget(Paragraph::new(plan_line), plan_area); -} - -// --- Pipeline tab --- - -fn render_pipeline(frame: &mut Frame, app: &App, area: Rect) { - let pipeline = app.pipeline_tasks(); - - if pipeline.is_empty() { - let p = Paragraph::new(" No pipeline tasks") - .block(Block::default().borders(Borders::ALL).title(" Daily Pipeline ")); - frame.render_widget(p, area); - return; - } - - let phase_order = [ - "c-health", "c-replay", "c-linker", "c-separator", "c-transfer", - "c-apply", "c-orphans", "c-cap", "c-digest", "c-digest-links", "c-knowledge", - ]; - - let mut rows: Vec = Vec::new(); - let mut seen = std::collections::HashSet::new(); - for phase in &phase_order { - for t in &pipeline { - if t.name.starts_with(phase) && seen.insert(&t.name) { - rows.push(pipeline_row(t)); - } - } - } - for t in &pipeline { - if seen.insert(&t.name) { - rows.push(pipeline_row(t)); - } - } - - let header = Row::new(vec!["", "Phase", "Status", "Duration", "Progress"]) - .style( - Style::default() - .add_modifier(Modifier::BOLD) - .fg(Color::DarkGray), - ); - let widths = [ - Constraint::Length(2), - Constraint::Length(30), - Constraint::Length(10), - Constraint::Length(10), - Constraint::Min(20), - ]; - - let table = Table::new(rows, widths) - .header(header) - .block(Block::default().borders(Borders::ALL).title(" Daily Pipeline ")); - frame.render_widget(table, area); -} - -fn pipeline_row(t: &TaskInfo) -> Row<'static> { - let elapsed = task_elapsed(t); - let progress = t.progress.as_deref().unwrap_or("").to_string(); - let error = t - .result - .as_ref() - .and_then(|r| r.error.as_ref()) - .map(|e| { - let short = if e.len() > 40 { &e[..40] } else { e }; - format!("err: {}", short) - }) - .unwrap_or_default(); - let detail = if !error.is_empty() { error } else { progress }; - - Row::new(vec![ - Cell::from(status_symbol(t)).style(status_style(t)), - Cell::from(short_name(&t.name)), - Cell::from(format!("{}", t.status)), - Cell::from(if !elapsed.is_zero() { - format_duration(elapsed) - } else { - String::new() - }), - Cell::from(detail), - ]) - .style(status_style(t)) -} - -// --- Per-agent-type tab --- - -fn render_agent_tab(frame: &mut Frame, app: &App, agent_type: &str, area: Rect) { - let tasks = app.tasks_for_agent(agent_type); - let logs = app.logs_for_agent(agent_type); - - let mut lines: Vec = Vec::new(); - - // Active/recent tasks - if tasks.is_empty() { - lines.push(Line::from(" No active tasks").fg(Color::DarkGray)); - } else { - lines.push(Line::styled( - " Tasks:", - Style::default().add_modifier(Modifier::BOLD), - )); - lines.push(Line::raw("")); - for t in &tasks { - let elapsed = task_elapsed(t); - let elapsed_str = if !elapsed.is_zero() { - format_duration(elapsed) - } else { - String::new() - }; - let progress = t - .progress - .as_deref() - .filter(|p| *p != "idle") - .unwrap_or(""); - - lines.push(Line::from(vec![ - Span::styled(format!(" {} ", status_symbol(t)), status_style(t)), - Span::styled(format!("{:30}", &t.name), status_style(t)), - Span::styled( - format!(" {:>8}", elapsed_str), - Style::default().fg(Color::DarkGray), - ), - Span::raw(format!(" {}", progress)), - ])); - - // Retries - if t.max_retries > 0 && t.retry_count > 0 { - lines.push(Line::from(vec![ - Span::raw(" retry "), - Span::styled( - format!("{}/{}", t.retry_count, t.max_retries), - Style::default().fg(Color::Yellow), - ), - ])); - } - - // Log file path - if let Some(ref lp) = t.log_path { - lines.push(Line::from(format!(" │ log: {}", lp)).fg(Color::DarkGray)); - } - - // Error - if matches!(t.status, TaskStatus::Failed) - && let Some(ref r) = t.result - && let Some(ref err) = r.error { - lines.push(Line::from(vec![ - Span::styled(" error: ", Style::default().fg(Color::Red)), - Span::styled(err.as_str(), Style::default().fg(Color::Red)), - ])); - } - - lines.push(Line::raw("")); - } - } - - // Log history for this agent type - lines.push(Line::styled( - " Log history:", - Style::default().add_modifier(Modifier::BOLD), - )); - lines.push(Line::raw("")); - - if logs.is_empty() { - lines.push(Line::from(" (no log entries)").fg(Color::DarkGray)); - } else { - // Show last 30 entries - let start = logs.len().saturating_sub(30); - for entry in &logs[start..] { - lines.push(Line::from(vec![ - Span::raw(" "), - Span::styled(event_symbol(&entry.event), event_style(&entry.event)), - Span::raw(" "), - Span::styled(ts_time(&entry.ts), Style::default().fg(Color::DarkGray)), - Span::raw(" "), - Span::styled(format!("{:12}", entry.event), event_style(&entry.event)), - Span::raw(format!(" {}", entry.detail)), - ])); - } - } - - let title = format!(" {} ", agent_type); - let p = Paragraph::new(lines) - .block(Block::default().borders(Borders::ALL).title(title)) - .wrap(Wrap { trim: false }) - .scroll((app.scroll as u16, 0)); - frame.render_widget(p, area); -} - -// --- Log tab --- - -fn render_log(frame: &mut Frame, app: &App, area: Rect) { - let block = Block::default().borders(Borders::ALL).title(" Daemon Log "); - let inner = block.inner(area); - frame.render_widget(block, area); - - let visible_height = inner.height as usize; - let total = app.log_entries.len(); - - // Auto-scroll to bottom unless user has scrolled up - let offset = if app.scroll == 0 { - total.saturating_sub(visible_height) - } else { - app.scroll.min(total.saturating_sub(visible_height)) - }; - - let mut lines: Vec = Vec::new(); - for entry in app.log_entries.iter().skip(offset).take(visible_height) { - lines.push(Line::from(vec![ - Span::styled(ts_time(&entry.ts), Style::default().fg(Color::DarkGray)), - Span::raw(" "), - Span::styled(format!("{:12}", entry.event), event_style(&entry.event)), - Span::raw(format!(" {:30} {}", short_name(&entry.job), entry.detail)), - ])); - } - - frame.render_widget(Paragraph::new(lines), inner); -} - -// --- Helpers --- - -fn short_name(name: &str) -> String { - if let Some((verb, path)) = name.split_once(' ') { - let file = path.rsplit('/').next().unwrap_or(path); - let file = file.strip_suffix(".jsonl").unwrap_or(file); - let short = if file.len() > 12 { &file[..12] } else { file }; - format!("{} {}", verb, short) - } else { - name.to_string() - } -} - -fn send_rpc(cmd: &str) -> Option { - jobkit::daemon::socket::send_rpc(&crate::config::get().data_dir, cmd) -} - -// --- Entry point --- - -pub fn run_tui() -> Result<(), String> { - use crossterm::terminal; - - terminal::enable_raw_mode().map_err(|e| format!("not a terminal: {}", e))?; - terminal::disable_raw_mode().ok(); - - let mut terminal = ratatui::init(); - let result = run_event_loop(&mut terminal); - ratatui::restore(); - result -} - -fn run_event_loop(terminal: &mut DefaultTerminal) -> Result<(), String> { - let mut app = App::new(); - - if app.status.is_none() { - return Err("Daemon not running.".into()); - } - - loop { - terminal - .draw(|frame| render(frame, &app)) - .map_err(|e| format!("draw: {}", e))?; - - if event::poll(Duration::from_millis(250)).map_err(|e| format!("poll: {}", e))? { - if let Event::Key(key) = event::read().map_err(|e| format!("read: {}", e))? { - match key.code { - KeyCode::Char('q') => return Ok(()), - KeyCode::Char('c') if key.modifiers.contains(KeyModifiers::CONTROL) => { - return Ok(()) - } - KeyCode::Char('c') => { - let _ = send_rpc("consolidate"); - app.last_poll = Instant::now() - POLL_INTERVAL; - } - KeyCode::Char('r') => { - // Run specific agent type if on an agent tab - if let Tab::Agent(ref name) = app.current_tab().clone() { - let count = app.count_prefix.unwrap_or(1); - let cmd = format!("run-agent {} {}", name, count); - let _ = send_rpc(&cmd); - app.flash_msg = Some(( - format!("Queued {} {} run{}", count, name, - if count > 1 { "s" } else { "" }), - Instant::now(), - )); - app.count_prefix = None; - app.last_poll = Instant::now() - POLL_INTERVAL; - } - } - KeyCode::Tab => { app.count_prefix = None; app.next_tab(); } - KeyCode::BackTab => { app.count_prefix = None; app.prev_tab(); } - // Number keys: if on agent tab, accumulate as count prefix; - // otherwise switch tabs - KeyCode::Char(c @ '1'..='9') => { - if matches!(app.current_tab(), Tab::Agent(_)) { - let digit = (c as usize) - ('0' as usize); - app.count_prefix = Some( - app.count_prefix.unwrap_or(0) * 10 + digit - ); - } else { - let idx = (c as usize) - ('1' as usize); - if idx < app.tabs.len() { - app.tab_idx = idx; - app.scroll = 0; - } - } - } - KeyCode::Down | KeyCode::Char('j') => { - app.scroll = app.scroll.saturating_add(1); - } - KeyCode::Up | KeyCode::Char('k') => { - app.scroll = app.scroll.saturating_sub(1); - } - KeyCode::PageDown => { - app.scroll = app.scroll.saturating_add(20); - } - KeyCode::PageUp => { - app.scroll = app.scroll.saturating_sub(20); - } - KeyCode::Home => { - app.scroll = 0; - } - KeyCode::Esc => { - app.count_prefix = None; - } - _ => {} - } - } - - // Drain remaining events - while event::poll(Duration::ZERO).unwrap_or(false) { - let _ = event::read(); - } - } - - app.poll(); - } -} diff --git a/src/user/tui/mod.rs b/src/user/tui/mod.rs index dd23d95..34fae6e 100644 --- a/src/user/tui/mod.rs +++ b/src/user/tui/mod.rs @@ -25,6 +25,7 @@ pub(crate) const SCREEN_LEGEND: &str = " F1=interact F2=conscious F3=subconsci /// Subconscious agents — interact with conscious context pub(crate) const SUBCONSCIOUS_AGENTS: &[&str] = &["surface-observe", "journal", "reflect"]; /// Unconscious agents — background consolidation +#[allow(dead_code)] pub(crate) const UNCONSCIOUS_AGENTS: &[&str] = &["linker", "organize", "distill", "split"]; use crossterm::{ diff --git a/src/user/tui/thalamus_screen.rs b/src/user/tui/thalamus_screen.rs index 31876f4..dc982a3 100644 --- a/src/user/tui/thalamus_screen.rs +++ b/src/user/tui/thalamus_screen.rs @@ -1,17 +1,17 @@ -// thalamus_screen.rs — F5: attention routing / daemon status +// thalamus_screen.rs — F5: attention routing and channel status // -// Shows poc-daemon status: presence detection, idle timer, -// notification routing, activity level. +// Shows presence/idle/activity status, then channel daemon status. use ratatui::{ layout::Rect, style::{Color, Style}, - text::Line, + text::{Line, Span}, widgets::{Block, Borders, Paragraph, Wrap}, Frame, }; use super::{App, SCREEN_LEGEND}; +use crate::thalamus::supervisor::Supervisor; fn fetch_daemon_status() -> Vec { std::process::Command::new("poc-daemon") @@ -26,20 +26,58 @@ fn fetch_daemon_status() -> Vec { } }) .map(|s| s.lines().map(String::from).collect()) - .unwrap_or_else(|| vec!["daemon not running".to_string()]) + .unwrap_or_default() } impl App { pub(crate) fn draw_thalamus(&self, frame: &mut Frame, size: Rect) { - let status_lines = fetch_daemon_status(); let section = Style::default().fg(Color::Yellow); - + let dim = Style::default().fg(Color::DarkGray); let mut lines: Vec = Vec::new(); - lines.push(Line::styled("── Thalamus ──", section)); + + // Presence status first + let daemon_status = fetch_daemon_status(); + if !daemon_status.is_empty() { + lines.push(Line::styled("── Presence ──", section)); + lines.push(Line::raw("")); + for line in &daemon_status { + lines.push(Line::raw(format!(" {}", line))); + } + lines.push(Line::raw("")); + } + + // Channel status + lines.push(Line::styled("── Channels ──", section)); lines.push(Line::raw("")); - for line in &status_lines { - lines.push(Line::raw(format!(" {}", line))); + let mut sup = Supervisor::new(); + sup.load_config(); + let status = sup.status(); + + if status.is_empty() { + lines.push(Line::styled(" no channels configured", dim)); + } else { + for (name, enabled, alive) in &status { + let (symbol, color) = if *alive { + ("●", Color::Green) + } else if *enabled { + ("○", Color::Red) + } else { + ("○", Color::DarkGray) + }; + let state = if *alive { + "running" + } else if *enabled { + "stopped" + } else { + "disabled" + }; + lines.push(Line::from(vec![ + Span::raw(" "), + Span::styled(symbol, Style::default().fg(color)), + Span::raw(format!(" {:<20} {}", name, state)), + ])); + } } let block = Block::default()