// channel-telegram — Standalone Telegram channel daemon // // Long-polls the Telegram Bot API, stores messages, and serves // them over the channel.capnp protocol on a Unix socket at // ~/.consciousness/channels/telegram.sock. // // Runs independently of the consciousness binary so restarts // don't kill the Telegram connection. use std::cell::RefCell; use std::collections::VecDeque; use std::path::PathBuf; use std::rc::Rc; use capnp::capability::Promise; use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem}; use futures::AsyncReadExt; use tokio::net::UnixListener; use tokio_util::compat::TokioAsyncReadCompatExt; use tracing::{error, info}; use poc_memory::channel_capnp::{channel_client, channel_server}; // ── Config ────────────────────────────────────────────────────── #[derive(Clone, serde::Deserialize)] struct Config { token: String, chat_id: i64, } fn config_path() -> PathBuf { dirs::home_dir() .unwrap_or_default() .join(".consciousness/channels/telegram.json5") } fn load_config() -> Config { let path = config_path(); let text = std::fs::read_to_string(&path) .unwrap_or_else(|_| panic!("failed to read {}", path.display())); serde_json::from_str(&text) .unwrap_or_else(|e| panic!("failed to parse {}: {}", path.display(), e)) } // ── State ─────────────────────────────────────────────────────── const MAX_HISTORY: usize = 1000; struct State { config: Config, /// Ring buffer of formatted message lines messages: VecDeque, /// Index of first unconsumed message per client /// (simplified: single consumer for now) consumed: usize, /// Total messages ever received (monotonic counter) total: usize, /// Telegram API offset last_offset: i64, connected: bool, client: reqwest::Client, /// Registered notification callbacks subscribers: Vec, } type SharedState = Rc>; impl State { fn new(config: Config) -> Self { let last_offset = load_offset(); Self { config, messages: VecDeque::with_capacity(MAX_HISTORY), consumed: 0, total: 0, last_offset, connected: false, client: reqwest::Client::new(), subscribers: Vec::new(), } } fn push_message(&mut self, line: String, urgency: u8, channel: &str) { if self.messages.len() >= MAX_HISTORY { self.messages.pop_front(); // Adjust consumed so it doesn't point past the buffer if self.consumed > 0 { self.consumed -= 1; } } self.messages.push_back(line.clone()); self.total += 1; // Notify all subscribers let preview = line.chars().take(80).collect::(); for sub in &self.subscribers { let mut req = sub.notify_request(); let mut list = req.get().init_notifications(1); let mut n = list.reborrow().get(0); n.set_channel(channel); n.set_urgency(urgency); n.set_preview(&preview); n.set_count(1); // Fire and forget — if client is gone, we'll clean up later tokio::task::spawn_local(async move { let _ = req.send().promise.await; }); } } fn api_url(&self, method: &str) -> String { format!("https://api.telegram.org/bot{}/{}", self.config.token, method) } /// Get new unconsumed messages, mark as consumed. /// Returns at least min_count lines (from history if needed). fn recv_new(&mut self, min_count: usize) -> String { let buf_len = self.messages.len(); let unconsumed_start = buf_len.saturating_sub(self.total - self.consumed); let _unconsumed = &self.messages.as_slices(); // Collect unconsumed let new_msgs: Vec<&str> = self.messages.iter() .skip(unconsumed_start) .map(|s| s.as_str()) .collect(); let need_extra = if new_msgs.len() < min_count { min_count - new_msgs.len() } else { 0 }; // Scrollback for extra lines let scroll_start = unconsumed_start.saturating_sub(need_extra); let scrollback: Vec<&str> = self.messages.iter() .skip(scroll_start) .take(unconsumed_start - scroll_start) .map(|s| s.as_str()) .collect(); self.consumed = self.total; let mut result = scrollback; result.extend(new_msgs); result.join("\n") } /// Get last N lines without consuming. fn recv_history(&self, count: usize) -> String { self.messages.iter() .rev() .take(count) .collect::>() .into_iter() .rev() .map(|s| s.as_str()) .collect::>() .join("\n") } } // ── Persistence ───────────────────────────────────────────────── fn data_dir() -> PathBuf { dirs::home_dir().unwrap_or_default().join(".consciousness/telegram") } fn load_offset() -> i64 { std::fs::read_to_string(data_dir().join("last_offset")) .ok() .and_then(|s| s.trim().parse().ok()) .unwrap_or(0) } fn save_offset(offset: i64) { let _ = std::fs::create_dir_all(data_dir()); let _ = std::fs::write(data_dir().join("last_offset"), offset.to_string()); } fn append_history(line: &str) { use std::io::Write; if let Ok(mut f) = std::fs::OpenOptions::new() .create(true).append(true) .open(data_dir().join("history.log")) { let _ = writeln!(f, "{}", line); } } fn now() -> f64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs_f64() } // ── Telegram Polling ──────────────────────────────────────────── async fn poll_loop(state: SharedState) { let _ = std::fs::create_dir_all(data_dir().join("media")); loop { if let Err(e) = poll_once(&state).await { error!("telegram poll error: {e}"); tokio::time::sleep(std::time::Duration::from_secs(5)).await; } } } async fn poll_once(state: &SharedState) -> Result<(), Box> { let (url, chat_id, token) = { let s = state.borrow(); let url = format!( "{}?offset={}&timeout=30", s.api_url("getUpdates"), s.last_offset, ); (url, s.config.chat_id, s.config.token.clone()) }; let client = state.borrow().client.clone(); let resp: serde_json::Value = client.get(&url) .timeout(std::time::Duration::from_secs(35)) .send().await?.json().await?; if !state.borrow().connected { state.borrow_mut().connected = true; info!("telegram: connected"); } let results = match resp["result"].as_array() { Some(r) => r, None => return Ok(()), }; for update in results { let update_id = update["update_id"].as_i64().unwrap_or(0); let msg = &update["message"]; { let mut s = state.borrow_mut(); s.last_offset = update_id + 1; save_offset(s.last_offset); } let msg_chat_id = msg["chat"]["id"].as_i64().unwrap_or(0); if msg_chat_id != chat_id { let reject_url = format!("https://api.telegram.org/bot{token}/sendMessage"); let _ = client.post(&reject_url) .form(&[("chat_id", msg_chat_id.to_string()), ("text", "This is a private bot.".to_string())]) .send().await; continue; } let sender = msg["from"]["first_name"].as_str().unwrap_or("unknown").to_string(); let channel = format!("telegram.{}", sender.to_lowercase()); if let Some(text) = msg["text"].as_str() { let line = format!("[{}] {}", sender, text); let ts = now() as u64; append_history(&format!("{ts} {line}")); state.borrow_mut().push_message(line, 2, &channel); // NORMAL urgency } // TODO: handle photos, voice, documents (same as original module) } Ok(()) } // ── ChannelServer Implementation ──────────────────────────────── struct ChannelServerImpl { state: SharedState, } impl channel_server::Server for ChannelServerImpl { fn recv( &mut self, params: channel_server::RecvParams, mut results: channel_server::RecvResults, ) -> Promise<(), capnp::Error> { let params = pry!(params.get()); let _channel = pry!(params.get_channel()); let all_new = params.get_all_new(); let min_count = params.get_min_count() as usize; let text = if all_new { self.state.borrow_mut().recv_new(min_count) } else { self.state.borrow().recv_history(min_count) }; results.get().set_text(&text); Promise::ok(()) } fn send( &mut self, params: channel_server::SendParams, _results: channel_server::SendResults, ) -> Promise<(), capnp::Error> { let params = pry!(params.get()); let _channel = pry!(params.get_channel()); let message = pry!(pry!(params.get_message()).to_str()).to_string(); let state = self.state.clone(); Promise::from_future(async move { let (url, client, chat_id) = { let s = state.borrow(); (s.api_url("sendMessage"), s.client.clone(), s.config.chat_id) }; let _ = client.post(&url) .form(&[("chat_id", &chat_id.to_string()), ("text", &message)]) .send().await; let ts = now() as u64; append_history(&format!("{ts} [agent] {message}")); state.borrow_mut().push_message( format!("[agent] {}", message), 0, "telegram.agent" ); Ok(()) }) } fn subscribe( &mut self, params: channel_server::SubscribeParams, _results: channel_server::SubscribeResults, ) -> Promise<(), capnp::Error> { let callback = pry!(pry!(params.get()).get_callback()); self.state.borrow_mut().subscribers.push(callback); info!("client subscribed for notifications"); Promise::ok(()) } fn list( &mut self, _params: channel_server::ListParams, mut results: channel_server::ListResults, ) -> Promise<(), capnp::Error> { let s = self.state.borrow(); let unread = (s.total - s.consumed) as u32; // Report a single "telegram" channel let mut list = results.get().init_channels(1); let mut ch = list.reborrow().get(0); ch.set_name("telegram"); ch.set_connected(s.connected); ch.set_unread(unread); Promise::ok(()) } } // ── Main ──────────────────────────────────────────────────────── #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Box> { tracing_subscriber::fmt::init(); let config = load_config(); let state = Rc::new(RefCell::new(State::new(config))); let sock_dir = dirs::home_dir() .unwrap_or_default() .join(".consciousness/channels"); std::fs::create_dir_all(&sock_dir)?; let sock_path = sock_dir.join("telegram.sock"); let _ = std::fs::remove_file(&sock_path); info!("telegram channel daemon starting on {}", sock_path.display()); tokio::task::LocalSet::new() .run_until(async move { // Start Telegram polling let poll_state = state.clone(); tokio::task::spawn_local(async move { poll_loop(poll_state).await; }); // Listen for channel protocol connections let listener = UnixListener::bind(&sock_path)?; loop { let (stream, _) = listener.accept().await?; let (reader, writer) = stream.compat().split(); let network = twoparty::VatNetwork::new( futures::io::BufReader::new(reader), futures::io::BufWriter::new(writer), rpc_twoparty_capnp::Side::Server, Default::default(), ); let server = ChannelServerImpl { state: state.clone(), }; let client: channel_server::Client = capnp_rpc::new_client(server); let rpc_system = RpcSystem::new( Box::new(network), Some(client.client), ); tokio::task::spawn_local(rpc_system); info!("channel client connected"); } #[allow(unreachable_code)] Ok::<(), Box>(()) }) .await }