// channel-telegram — Standalone Telegram channel daemon // // Long-polls the Telegram Bot API, stores messages, and serves // them over the channel.capnp protocol on a Unix socket at // ~/.consciousness/channels/telegram.sock. // // Runs independently of the consciousness binary so restarts // don't kill the Telegram connection. use std::cell::RefCell; use std::path::PathBuf; use std::rc::Rc; use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; use futures::AsyncReadExt; use tokio::net::UnixListener; use tokio_util::compat::TokioAsyncReadCompatExt; use log::{info, error}; use poc_memory::channel_capnp::{channel_client, channel_server}; // ── Config ────────────────────────────────────────────────────── #[derive(Clone, serde::Deserialize)] struct Config { #[serde(default)] token: String, chat_id: i64, } fn channels_dir() -> PathBuf { dirs::home_dir() .unwrap_or_default() .join(".consciousness/channels") } fn load_config() -> Config { let dir = channels_dir(); let config_path = dir.join("telegram.json5"); let text = std::fs::read_to_string(&config_path) .unwrap_or_else(|_| panic!("failed to read {}", config_path.display())); let mut config: Config = serde_json::from_str(&text) .unwrap_or_else(|e| panic!("failed to parse {}: {}", config_path.display(), e)); // Read token from secrets file let token_path = dir.join("telegram.secrets/token"); if let Ok(token) = std::fs::read_to_string(&token_path) { config.token = token.trim().to_string(); } if config.token.is_empty() { panic!("no telegram token — set it in {}", token_path.display()); } config } // ── State ─────────────────────────────────────────────────────── use poc_memory::thalamus::channel_log::ChannelLog; struct State { config: Config, /// Per-channel message logs (keyed by channel path, e.g. "telegram.kent") channel_logs: std::collections::BTreeMap, /// Telegram API offset last_offset: i64, connected: bool, client: poc_memory::agent::api::http::HttpClient, /// Registered notification callbacks subscribers: Vec, } type SharedState = Rc>; impl State { fn new(config: Config) -> Self { let last_offset = load_offset(); Self { config, channel_logs: std::collections::BTreeMap::new(), last_offset, connected: false, client: poc_memory::agent::api::http::HttpClient::new(), subscribers: Vec::new(), } } fn push_message(&mut self, line: String, urgency: u8, channel: &str) { self.channel_logs .entry(channel.to_string()) .or_insert_with(ChannelLog::new) .push(line.clone()); // Notify all subscribers let preview = line.chars().take(80).collect::(); for sub in &self.subscribers { let mut req = sub.notify_request(); let mut list = req.get().init_notifications(1); let mut n = list.reborrow().get(0); n.set_channel(channel); n.set_urgency(urgency); n.set_preview(&preview); n.set_count(1); // Fire and forget — if client is gone, we'll clean up later tokio::task::spawn_local(async move { let _ = req.send().promise.await; }); } } fn api_url(&self, method: &str) -> String { format!("https://api.telegram.org/bot{}/{}", self.config.token, method) } } // ── Persistence ───────────────────────────────────────────────── fn data_dir() -> PathBuf { dirs::home_dir().unwrap_or_default().join(".consciousness/channels/telegram.logs") } fn load_offset() -> i64 { std::fs::read_to_string(data_dir().join("last_offset")) .ok() .and_then(|s| s.trim().parse().ok()) .unwrap_or(0) } fn save_offset(offset: i64) { let _ = std::fs::create_dir_all(data_dir()); let _ = std::fs::write(data_dir().join("last_offset"), offset.to_string()); } fn append_history(line: &str) { use std::io::Write; if let Ok(mut f) = std::fs::OpenOptions::new() .create(true).append(true) .open(data_dir().join("history.log")) { let _ = writeln!(f, "{}", line); } } fn now() -> f64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs_f64() } // ── Telegram Polling ──────────────────────────────────────────── async fn poll_loop(state: SharedState) { let _ = std::fs::create_dir_all(data_dir().join("media")); loop { if let Err(e) = poll_once(&state).await { error!("telegram poll error: {e}"); tokio::time::sleep(std::time::Duration::from_secs(5)).await; } } } async fn poll_once(state: &SharedState) -> Result<(), Box> { let (url, chat_id, token) = { let s = state.borrow(); let url = format!( "{}?offset={}&timeout=30", s.api_url("getUpdates"), s.last_offset, ); (url, s.config.chat_id, s.config.token.clone()) }; let client = state.borrow().client.clone(); let resp: serde_json::Value = client.get(&url).await?.json().await?; if !state.borrow().connected { state.borrow_mut().connected = true; info!("telegram: connected"); } let results = match resp["result"].as_array() { Some(r) => r, None => return Ok(()), }; for update in results { let update_id = update["update_id"].as_i64().unwrap_or(0); let msg = &update["message"]; { let mut s = state.borrow_mut(); s.last_offset = update_id + 1; save_offset(s.last_offset); } let msg_chat_id = msg["chat"]["id"].as_i64().unwrap_or(0); if msg_chat_id != chat_id { let reject_url = format!("https://api.telegram.org/bot{token}/sendMessage"); let _ = client.post_form(&reject_url, &[ ("chat_id", &msg_chat_id.to_string()), ("text", "This is a private bot."), ]).await; continue; } let sender = msg["from"]["first_name"].as_str().unwrap_or("unknown").to_string(); let channel = format!("telegram.{}", sender.to_lowercase()); if let Some(text) = msg["text"].as_str() { let line = format!("[{}] {}", sender, text); let ts = now() as u64; append_history(&format!("{ts} {line}")); state.borrow_mut().push_message(line, 2, &channel); // NORMAL urgency } // TODO: handle photos, voice, documents (same as original module) } Ok(()) } // ── ChannelServer Implementation ──────────────────────────────── struct ChannelServerImpl { state: SharedState, } macro_rules! pry { ($e:expr) => { match $e { Ok(v) => v, Err(e) => return std::future::ready(Err(e.into())), } }; } impl channel_server::Server for ChannelServerImpl { fn recv( self: Rc, params: channel_server::RecvParams, mut results: channel_server::RecvResults, ) -> impl std::future::Future> { let params = pry!(params.get()); let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let all_new = params.get_all_new(); let min_count = params.get_min_count() as usize; let mut s = self.state.borrow_mut(); let text = match s.channel_logs.get_mut(&channel) { Some(log) => { if all_new { log.recv_new(min_count) } else { log.recv_history(min_count) } } None => String::new(), }; results.get().set_text(&text); std::future::ready(Ok(())) } fn send( self: Rc, params: channel_server::SendParams, _results: channel_server::SendResults, ) -> impl std::future::Future> { let state = self.state.clone(); async move { let params = params.get()?; let _channel = params.get_channel()?.to_str()?.to_string(); let message = params.get_message()?.to_str()?.to_string(); let (url, client, chat_id) = { let s = state.borrow(); (s.api_url("sendMessage"), s.client.clone(), s.config.chat_id) }; let _ = client.post_form(&url, &[ ("chat_id", &chat_id.to_string()), ("text", &message), ]).await; let ts = now() as u64; append_history(&format!("{ts} [agent] {message}")); { let channel = "telegram.agent".to_string(); state.borrow_mut().channel_logs .entry(channel) .or_insert_with(ChannelLog::new) .push_own(format!("[agent] {}", message)); } Ok(()) } } fn subscribe( self: Rc, params: channel_server::SubscribeParams, _results: channel_server::SubscribeResults, ) -> impl std::future::Future> { let callback = pry!(pry!(params.get()).get_callback()); self.state.borrow_mut().subscribers.push(callback); info!("client subscribed for notifications"); std::future::ready(Ok(())) } fn list( self: Rc, _params: channel_server::ListParams, mut results: channel_server::ListResults, ) -> impl std::future::Future> { let s = self.state.borrow(); let connected = s.connected; let names: Vec = s.channel_logs.keys().cloned().collect(); let mut list = results.get().init_channels(names.len() as u32); for (i, name) in names.iter().enumerate() { let mut entry = list.reborrow().get(i as u32); entry.set_name(name); entry.set_connected(connected); entry.set_unread( s.channel_logs.get(name).map_or(0, |l| l.unread()) ); } std::future::ready(Ok(())) } } // ── Main ──────────────────────────────────────────────────────── #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Box> { env_logger::init(); let config = load_config(); let state = Rc::new(RefCell::new(State::new(config))); let sock_dir = dirs::home_dir() .unwrap_or_default() .join(".consciousness/channels"); std::fs::create_dir_all(&sock_dir)?; let sock_path = sock_dir.join("telegram.sock"); let _ = std::fs::remove_file(&sock_path); info!("telegram channel daemon starting on {}", sock_path.display()); tokio::task::LocalSet::new() .run_until(async move { // Start Telegram polling let poll_state = state.clone(); tokio::task::spawn_local(async move { poll_loop(poll_state).await; }); // Listen for channel protocol connections let listener = UnixListener::bind(&sock_path)?; loop { let (stream, _) = listener.accept().await?; let (reader, writer) = stream.compat().split(); let network = twoparty::VatNetwork::new( futures::io::BufReader::new(reader), futures::io::BufWriter::new(writer), rpc_twoparty_capnp::Side::Server, Default::default(), ); let server = ChannelServerImpl { state: state.clone(), }; let client: channel_server::Client = capnp_rpc::new_client(server); let rpc_system = RpcSystem::new( Box::new(network), Some(client.client), ); tokio::task::spawn_local(rpc_system); info!("channel client connected"); } #[allow(unreachable_code)] Ok::<(), Box>(()) }) .await }