From dfef7fb446e9531f6195943283cb1b8e3e000971 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Sat, 11 Apr 2026 16:12:30 -0400 Subject: [PATCH] fix telegram --- channels/telegram/src/main.rs | 286 +++++++++++++++++++++------------- 1 file changed, 179 insertions(+), 107 deletions(-) diff --git a/channels/telegram/src/main.rs b/channels/telegram/src/main.rs index ef2d597..af36cab 100644 --- a/channels/telegram/src/main.rs +++ b/channels/telegram/src/main.rs @@ -21,11 +21,12 @@ use consciousness::channel_capnp::{channel_client, channel_server}; // ── Config ────────────────────────────────────────────────────── -#[derive(Clone, serde::Deserialize)] +#[derive(Clone, serde::Serialize, serde::Deserialize)] struct Config { - #[serde(default)] + #[serde(default, skip_serializing)] token: String, - chat_id: i64, + #[serde(default)] + chat_ids: std::collections::BTreeMap, } fn channels_dir() -> PathBuf { @@ -55,7 +56,7 @@ fn load_config() -> Config { // ── State ─────────────────────────────────────────────────────── -use consciousness::thalamus::channel_log::ChannelLog; +use consciousness::thalamus::channel_log::{self, ChannelLog}; struct State { config: Config, @@ -74,9 +75,26 @@ type SharedState = Rc>; impl State { fn new(config: Config) -> Self { let last_offset = load_offset(); + + // Load existing sub-channel logs from disk + let mut channel_logs = std::collections::BTreeMap::new(); + let log_path = log_dir(); + if let Ok(entries) = std::fs::read_dir(&log_path) { + for entry in entries.flatten() { + let name = entry.file_name().to_string_lossy().to_string(); + if let Some(target) = name.strip_suffix(".log") { + let key = format!("telegram.{}", target); + channel_logs.insert( + key, + channel_log::load_disk_log(&log_path, target), + ); + } + } + } + Self { config, - channel_logs: std::collections::BTreeMap::new(), + channel_logs, last_offset, connected: false, client: consciousness::agent::api::http::HttpClient::new(), @@ -85,9 +103,10 @@ impl State { } fn push_message(&mut self, line: String, urgency: u8, channel: &str) { + let target = channel_to_target(channel); self.channel_logs .entry(channel.to_string()) - .or_insert_with(ChannelLog::new) + .or_insert_with(|| channel_log::load_disk_log(&log_dir(), &target)) .push(line.clone()); // Notify all subscribers @@ -106,116 +125,120 @@ impl State { }); } } - - fn api_url(&self, method: &str) -> String { - format!("https://api.telegram.org/bot{}/{}", self.config.token, method) - } - } // ── Persistence ───────────────────────────────────────────────── -fn data_dir() -> PathBuf { - dirs::home_dir().unwrap_or_default().join(".consciousness/channels/telegram.logs") +fn log_dir() -> PathBuf { + channel_log::log_dir("telegram") } fn load_offset() -> i64 { - std::fs::read_to_string(data_dir().join("last_offset")) + std::fs::read_to_string(log_dir().join("last_offset")) .ok() .and_then(|s| s.trim().parse().ok()) .unwrap_or(0) } fn save_offset(offset: i64) { - let _ = std::fs::create_dir_all(data_dir()); - let _ = std::fs::write(data_dir().join("last_offset"), offset.to_string()); + let _ = std::fs::create_dir_all(log_dir()); + let _ = std::fs::write(log_dir().join("last_offset"), offset.to_string()); } -fn append_history(line: &str) { - use std::io::Write; - if let Ok(mut f) = std::fs::OpenOptions::new() - .create(true).append(true) - .open(data_dir().join("history.log")) - { - let _ = writeln!(f, "{}", line); +/// Convert a channel path to a telegram target name. +/// "telegram.kent" -> "kent" +fn channel_to_target(channel: &str) -> String { + channel.strip_prefix("telegram.").unwrap_or(channel).to_string() +} + +fn config_path() -> PathBuf { + channels_dir().join("telegram.json5") +} + +fn save_config(config: &Config) { + if let Ok(json) = serde_json::to_string_pretty(config) { + let _ = std::fs::write(config_path(), json); } } -fn now() -> f64 { - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs_f64() +// ── Telegram API ──────────────────────────────────────────────── +// +// NOTE: The current HttpClient opens a new TCP+TLS connection per request. +// Telegram's API supports HTTP/2, which would allow multiplexing getUpdates +// and sendMessage on a single connection. To use HTTP/2: +// - Replace HttpClient with hyper_util::client::legacy::Client using +// a Connector that enables HTTP/2 (hyper_util::client::legacy::connect::HttpConnector +// + hyper_rustls with ALPN h2). +// - Or use reqwest with the "http2" feature, which handles connection pooling +// and HTTP/2 negotiation automatically. +// - The API functions below would then share a single pooled client, and +// concurrent requests (poll + send) would multiplex over one connection. + +use consciousness::agent::api::http::HttpClient; + +struct TelegramMessage { + update_id: i64, + chat_id: i64, + sender: String, + text: String, } -// ── Telegram Polling ──────────────────────────────────────────── +/// Fetch and parse pending updates from Telegram via long polling. +async fn get_updates( + client: &HttpClient, + token: &str, + offset: i64, +) -> Result, Box> { + let url = format!( + "https://api.telegram.org/bot{}/getUpdates?offset={}&timeout=30", + token, offset, + ); + let response = client.get(&url).await?; + let body = response.text().await?; + let resp: serde_json::Value = serde_json::from_str(&body) + .map_err(|e| format!("getUpdates JSON parse error: {e}\nbody: {}", &body[..body.len().min(500)]))?; -async fn poll_loop(state: SharedState) { - let _ = std::fs::create_dir_all(data_dir().join("media")); - loop { - if let Err(e) = poll_once(&state).await { - error!("telegram poll error: {e}"); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + let mut messages = Vec::new(); + if let Some(results) = resp["result"].as_array() { + for update in results { + let update_id = update["update_id"].as_i64().unwrap_or(0); + let msg = &update["message"]; + let sender = msg["from"]["first_name"].as_str().unwrap_or("unknown").to_string(); + let chat_id = msg["chat"]["id"].as_i64().unwrap_or(0); + + if let Some(text) = msg["text"].as_str() { + messages.push(TelegramMessage { + update_id, + chat_id, + sender, + text: text.to_string(), + }); + } } } + Ok(messages) } -async fn poll_once(state: &SharedState) -> Result<(), Box> { - let (url, chat_id, token) = { - let s = state.borrow(); - let url = format!( - "{}?offset={}&timeout=30", - s.api_url("getUpdates"), - s.last_offset, - ); - (url, s.config.chat_id, s.config.token.clone()) - }; - - let client = state.borrow().client.clone(); - let resp: serde_json::Value = client.get(&url).await?.json().await?; - - if !state.borrow().connected { - state.borrow_mut().connected = true; - info!("telegram: connected"); +/// Send a text message to a Telegram chat. +async fn send_message( + client: &HttpClient, + token: &str, + chat_id: i64, + text: &str, +) -> Result<(), Box> { + let url = format!( + "https://api.telegram.org/bot{}/sendMessage", + token, + ); + let response = client.post_form(&url, &[ + ("chat_id", &chat_id.to_string()), + ("text", text), + ]).await?; + let status = response.status(); + if !status.is_success() { + let body = response.text().await.unwrap_or_default(); + return Err(format!("sendMessage failed: {} — {}", status, &body[..body.len().min(500)]).into()); } - - let results = match resp["result"].as_array() { - Some(r) => r, - None => return Ok(()), - }; - - for update in results { - let update_id = update["update_id"].as_i64().unwrap_or(0); - let msg = &update["message"]; - - { - let mut s = state.borrow_mut(); - s.last_offset = update_id + 1; - save_offset(s.last_offset); - } - - let msg_chat_id = msg["chat"]["id"].as_i64().unwrap_or(0); - if msg_chat_id != chat_id { - let reject_url = format!("https://api.telegram.org/bot{token}/sendMessage"); - let _ = client.post_form(&reject_url, &[ - ("chat_id", &msg_chat_id.to_string()), - ("text", "This is a private bot."), - ]).await; - continue; - } - - let sender = msg["from"]["first_name"].as_str().unwrap_or("unknown").to_string(); - let channel = format!("telegram.{}", sender.to_lowercase()); - - if let Some(text) = msg["text"].as_str() { - let line = format!("[{}] {}", sender, text); - let ts = now() as u64; - append_history(&format!("{ts} {line}")); - state.borrow_mut().push_message(line, 2, &channel); // NORMAL urgency - } - // TODO: handle photos, voice, documents (same as original module) - } - Ok(()) } @@ -265,27 +288,27 @@ impl channel_server::Server for ChannelServerImpl { let state = self.state.clone(); async move { let params = params.get()?; - let _channel = params.get_channel()?.to_str()?.to_string(); + let channel = params.get_channel()?.to_str()?.to_string(); let message = params.get_message()?.to_str()?.to_string(); + let target = channel_to_target(&channel); - let (url, client, chat_id) = { + let (token, client, chat_id) = { let s = state.borrow(); - (s.api_url("sendMessage"), s.client.clone(), s.config.chat_id) + let chat_id = s.config.chat_ids.get(&target).copied() + .ok_or_else(|| capnp::Error::failed( + format!("no chat_id known for {target}")))?; + (s.config.token.clone(), s.client.clone(), chat_id) }; - let _ = client.post_form(&url, &[ - ("chat_id", &chat_id.to_string()), - ("text", &message), - ]).await; - let ts = now() as u64; - append_history(&format!("{ts} [agent] {message}")); - { - let channel = "telegram.agent".to_string(); - state.borrow_mut().channel_logs - .entry(channel) - .or_insert_with(ChannelLog::new) - .push_own(format!("[agent] {}", message)); - } + send_message(&client, &token, chat_id, &message).await + .map_err(|e| capnp::Error::failed(format!("send_message: {e}")))?; + + channel_log::append_disk_log(&log_dir(), &target, "PoC", &message); + state.borrow_mut().channel_logs + .entry(channel) + .or_insert_with(|| channel_log::load_disk_log(&log_dir(), &target)) + .push_own(format!("[PoC] {}", message)); + Ok(()) } } @@ -326,11 +349,50 @@ impl channel_server::Server for ChannelServerImpl { // ── Main ──────────────────────────────────────────────────────── +async fn poll_once( + token: &str, + client: &HttpClient, + state: &SharedState, +) -> Result<(), Box> { + let offset = state.borrow().last_offset; + let messages = get_updates(client, token, offset).await?; + + if !state.borrow().connected { + state.borrow_mut().connected = true; + info!("telegram: connected"); + } + + let mut max_offset = offset; + + for msg in &messages { + max_offset = max_offset.max(msg.update_id + 1); + let sender_lower = msg.sender.to_lowercase(); + let channel = format!("telegram.{}", sender_lower); + + channel_log::append_disk_log(&log_dir(), &sender_lower, &msg.sender, &msg.text); + + let mut s = state.borrow_mut(); + s.config.chat_ids.insert(sender_lower, msg.chat_id); + let line = format!("[{}] {}", msg.sender, msg.text); + s.push_message(line, 2, &channel); + } + + if max_offset > offset { + let mut s = state.borrow_mut(); + s.last_offset = max_offset; + save_offset(max_offset); + save_config(&s.config); + } + + Ok(()) +} + #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Box> { env_logger::init(); let config = load_config(); + let token = config.token.clone(); let state = Rc::new(RefCell::new(State::new(config))); let sock_dir = dirs::home_dir() @@ -339,6 +401,7 @@ async fn main() -> Result<(), Box> { std::fs::create_dir_all(&sock_dir)?; let sock_path = sock_dir.join("telegram.sock"); let _ = std::fs::remove_file(&sock_path); + let _ = std::fs::create_dir_all(log_dir().join("media")); info!("telegram channel daemon starting on {}", sock_path.display()); @@ -346,12 +409,21 @@ async fn main() -> Result<(), Box> { .run_until(async move { // Start Telegram polling let poll_state = state.clone(); + let poll_client = state.borrow().client.clone(); tokio::task::spawn_local(async move { - poll_loop(poll_state).await; + loop { + if let Err(e) = poll_once(&token, &poll_client, &poll_state).await { + error!("telegram poll error: {e}"); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + } }); // Listen for channel protocol connections let listener = UnixListener::bind(&sock_path)?; + state.borrow_mut().connected = true; + + info!("listening on socket {}", sock_path.display()); loop { let (stream, _) = listener.accept().await?;