// channel-telegram — Standalone Telegram channel daemon // // Long-polls the Telegram Bot API, stores messages, and serves // them over the channel.capnp protocol on a Unix socket at // ~/.consciousness/channels/telegram.sock. // // Runs independently of the consciousness binary so restarts // don't kill the Telegram connection. use std::cell::RefCell; use std::path::PathBuf; use std::rc::Rc; use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; use futures::AsyncReadExt; use tokio::net::UnixListener; use tokio_util::compat::TokioAsyncReadCompatExt; use log::{info, error}; use consciousness::channel_capnp::{channel_client, channel_server}; // ── Config ────────────────────────────────────────────────────── #[derive(Clone, serde::Serialize, serde::Deserialize)] struct Config { #[serde(default, skip_serializing)] token: String, #[serde(default)] chat_ids: std::collections::BTreeMap, } fn channels_dir() -> PathBuf { dirs::home_dir() .unwrap_or_default() .join(".consciousness/channels") } fn load_config() -> Config { let dir = channels_dir(); let config_path = dir.join("telegram.json5"); let text = std::fs::read_to_string(&config_path) .unwrap_or_else(|_| panic!("failed to read {}", config_path.display())); let mut config: Config = serde_json::from_str(&text) .unwrap_or_else(|e| panic!("failed to parse {}: {}", config_path.display(), e)); // Read token from secrets file let token_path = dir.join("telegram.secrets/token"); if let Ok(token) = std::fs::read_to_string(&token_path) { config.token = token.trim().to_string(); } if config.token.is_empty() { panic!("no telegram token — set it in {}", token_path.display()); } config } // ── State ─────────────────────────────────────────────────────── use consciousness::thalamus::channel_log::{self, ChannelLog}; struct State { config: Config, /// Per-channel message logs (keyed by channel path, e.g. "telegram.kent") channel_logs: std::collections::BTreeMap, /// Telegram API offset last_offset: i64, connected: bool, client: consciousness::agent::api::http::HttpClient, /// Registered notification callbacks subscribers: Vec, } type SharedState = Rc>; impl State { fn new(config: Config) -> Self { let last_offset = load_offset(); // Load existing sub-channel logs from disk let mut channel_logs = std::collections::BTreeMap::new(); let log_path = log_dir(); if let Ok(entries) = std::fs::read_dir(&log_path) { for entry in entries.flatten() { let name = entry.file_name().to_string_lossy().to_string(); if let Some(target) = name.strip_suffix(".log") { let key = format!("telegram.{}", target); channel_logs.insert( key, channel_log::load_disk_log(&log_path, target), ); } } } Self { config, channel_logs, last_offset, connected: false, client: consciousness::agent::api::http::HttpClient::new(), subscribers: Vec::new(), } } fn push_message(&mut self, line: String, urgency: u8, channel: &str) { let target = channel_to_target(channel); self.channel_logs .entry(channel.to_string()) .or_insert_with(|| channel_log::load_disk_log(&log_dir(), &target)) .push(line.clone()); // Notify all subscribers let preview = line.chars().take(80).collect::(); for sub in &self.subscribers { let mut req = sub.notify_request(); let mut list = req.get().init_notifications(1); let mut n = list.reborrow().get(0); n.set_channel(channel); n.set_urgency(urgency); n.set_preview(&preview); n.set_count(1); // Fire and forget — if client is gone, we'll clean up later tokio::task::spawn_local(async move { let _ = req.send().promise.await; }); } } } // ── Persistence ───────────────────────────────────────────────── fn log_dir() -> PathBuf { channel_log::log_dir("telegram") } fn load_offset() -> i64 { std::fs::read_to_string(log_dir().join("last_offset")) .ok() .and_then(|s| s.trim().parse().ok()) .unwrap_or(0) } fn save_offset(offset: i64) { let _ = std::fs::create_dir_all(log_dir()); let _ = std::fs::write(log_dir().join("last_offset"), offset.to_string()); } /// Convert a channel path to a telegram target name. /// "telegram.kent" -> "kent" fn channel_to_target(channel: &str) -> String { channel.strip_prefix("telegram.").unwrap_or(channel).to_string() } fn config_path() -> PathBuf { channels_dir().join("telegram.json5") } fn save_config(config: &Config) { if let Ok(json) = serde_json::to_string_pretty(config) { let _ = std::fs::write(config_path(), json); } } // ── Telegram API ──────────────────────────────────────────────── // // NOTE: The current HttpClient opens a new TCP+TLS connection per request. // Telegram's API supports HTTP/2, which would allow multiplexing getUpdates // and sendMessage on a single connection. To use HTTP/2: // - Replace HttpClient with hyper_util::client::legacy::Client using // a Connector that enables HTTP/2 (hyper_util::client::legacy::connect::HttpConnector // + hyper_rustls with ALPN h2). // - Or use reqwest with the "http2" feature, which handles connection pooling // and HTTP/2 negotiation automatically. // - The API functions below would then share a single pooled client, and // concurrent requests (poll + send) would multiplex over one connection. use consciousness::agent::api::http::HttpClient; struct TelegramMessage { update_id: i64, chat_id: i64, sender: String, text: String, } /// Fetch and parse pending updates from Telegram via long polling. async fn get_updates( client: &HttpClient, token: &str, offset: i64, ) -> Result, Box> { let url = format!( "https://api.telegram.org/bot{}/getUpdates?offset={}&timeout=30", token, offset, ); let response = client.get(&url).await?; let body = response.text().await?; let resp: serde_json::Value = serde_json::from_str(&body) .map_err(|e| format!("getUpdates JSON parse error: {e}\nbody: {}", &body[..body.len().min(500)]))?; let mut messages = Vec::new(); if let Some(results) = resp["result"].as_array() { for update in results { let update_id = update["update_id"].as_i64().unwrap_or(0); let msg = &update["message"]; let sender = msg["from"]["first_name"].as_str().unwrap_or("unknown").to_string(); let chat_id = msg["chat"]["id"].as_i64().unwrap_or(0); if let Some(text) = msg["text"].as_str() { messages.push(TelegramMessage { update_id, chat_id, sender, text: text.to_string(), }); } } } Ok(messages) } /// Send a text message to a Telegram chat. async fn send_message( client: &HttpClient, token: &str, chat_id: i64, text: &str, ) -> Result<(), Box> { let url = format!( "https://api.telegram.org/bot{}/sendMessage", token, ); let response = client.post_form(&url, &[ ("chat_id", &chat_id.to_string()), ("text", text), ]).await?; let status = response.status(); if !status.is_success() { let body = response.text().await.unwrap_or_default(); return Err(format!("sendMessage failed: {} — {}", status, &body[..body.len().min(500)]).into()); } Ok(()) } // ── ChannelServer Implementation ──────────────────────────────── struct ChannelServerImpl { state: SharedState, } macro_rules! pry { ($e:expr) => { match $e { Ok(v) => v, Err(e) => return std::future::ready(Err(e.into())), } }; } impl channel_server::Server for ChannelServerImpl { fn recv( self: Rc, params: channel_server::RecvParams, mut results: channel_server::RecvResults, ) -> impl std::future::Future> { let params = pry!(params.get()); let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let all_new = params.get_all_new(); let min_count = params.get_min_count() as usize; let mut s = self.state.borrow_mut(); let text = match s.channel_logs.get_mut(&channel) { Some(log) => { if all_new { log.recv_new(min_count) } else { log.recv_history(min_count) } } None => String::new(), }; results.get().set_text(&text); std::future::ready(Ok(())) } fn send( self: Rc, params: channel_server::SendParams, _results: channel_server::SendResults, ) -> impl std::future::Future> { let state = self.state.clone(); async move { let params = params.get()?; let channel = params.get_channel()?.to_str()?.to_string(); let message = params.get_message()?.to_str()?.to_string(); let target = channel_to_target(&channel); let (token, client, chat_id) = { let s = state.borrow(); let chat_id = s.config.chat_ids.get(&target).copied() .ok_or_else(|| capnp::Error::failed( format!("no chat_id known for {target}")))?; (s.config.token.clone(), s.client.clone(), chat_id) }; send_message(&client, &token, chat_id, &message).await .map_err(|e| capnp::Error::failed(format!("send_message: {e}")))?; channel_log::append_disk_log(&log_dir(), &target, "PoC", &message); state.borrow_mut().channel_logs .entry(channel) .or_insert_with(|| channel_log::load_disk_log(&log_dir(), &target)) .push_own(format!("[PoC] {}", message)); Ok(()) } } fn subscribe( self: Rc, params: channel_server::SubscribeParams, _results: channel_server::SubscribeResults, ) -> impl std::future::Future> { let callback = pry!(pry!(params.get()).get_callback()); self.state.borrow_mut().subscribers.push(callback); info!("client subscribed for notifications"); std::future::ready(Ok(())) } fn list( self: Rc, _params: channel_server::ListParams, mut results: channel_server::ListResults, ) -> impl std::future::Future> { let s = self.state.borrow(); let connected = s.connected; let names: Vec = s.channel_logs.keys().cloned().collect(); let mut list = results.get().init_channels(names.len() as u32); for (i, name) in names.iter().enumerate() { let mut entry = list.reborrow().get(i as u32); entry.set_name(name); entry.set_connected(connected); entry.set_unread( s.channel_logs.get(name).map_or(0, |l| l.unread()) ); } std::future::ready(Ok(())) } } // ── Main ──────────────────────────────────────────────────────── async fn poll_once( token: &str, client: &HttpClient, state: &SharedState, ) -> Result<(), Box> { let offset = state.borrow().last_offset; let messages = get_updates(client, token, offset).await?; if !state.borrow().connected { state.borrow_mut().connected = true; info!("telegram: connected"); } let mut max_offset = offset; for msg in &messages { max_offset = max_offset.max(msg.update_id + 1); let sender_lower = msg.sender.to_lowercase(); let channel = format!("telegram.{}", sender_lower); channel_log::append_disk_log(&log_dir(), &sender_lower, &msg.sender, &msg.text); let mut s = state.borrow_mut(); s.config.chat_ids.insert(sender_lower, msg.chat_id); let line = format!("[{}] {}", msg.sender, msg.text); s.push_message(line, 2, &channel); } if max_offset > offset { let mut s = state.borrow_mut(); s.last_offset = max_offset; save_offset(max_offset); save_config(&s.config); } Ok(()) } #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Box> { env_logger::init(); let config = load_config(); let token = config.token.clone(); let state = Rc::new(RefCell::new(State::new(config))); let sock_dir = dirs::home_dir() .unwrap_or_default() .join(".consciousness/channels"); std::fs::create_dir_all(&sock_dir)?; let sock_path = sock_dir.join("telegram.sock"); let _ = std::fs::remove_file(&sock_path); let _ = std::fs::create_dir_all(log_dir().join("media")); info!("telegram channel daemon starting on {}", sock_path.display()); tokio::task::LocalSet::new() .run_until(async move { // Start Telegram polling let poll_state = state.clone(); let poll_client = state.borrow().client.clone(); tokio::task::spawn_local(async move { loop { if let Err(e) = poll_once(&token, &poll_client, &poll_state).await { error!("telegram poll error: {e}"); tokio::time::sleep(std::time::Duration::from_secs(5)).await; } } }); // Listen for channel protocol connections let listener = UnixListener::bind(&sock_path)?; state.borrow_mut().connected = true; info!("listening on socket {}", sock_path.display()); loop { let (stream, _) = listener.accept().await?; let (reader, writer) = stream.compat().split(); let network = twoparty::VatNetwork::new( futures::io::BufReader::new(reader), futures::io::BufWriter::new(writer), rpc_twoparty_capnp::Side::Server, Default::default(), ); let server = ChannelServerImpl { state: state.clone(), }; let client: channel_server::Client = capnp_rpc::new_client(server); let rpc_system = RpcSystem::new( Box::new(network), Some(client.client), ); tokio::task::spawn_local(rpc_system); info!("channel client connected"); } #[allow(unreachable_code)] Ok::<(), Box>(()) }) .await }