// Telegram module. // // Long-polls the Telegram Bot API for messages from Kent's chat. // Downloads media (photos, voice, documents) to local files. // Sends text and files. Notifications flow through mpsc into the // daemon's main state. // // Only accepts messages from the configured chat_id (prompt // injection defense — other senders get a "private bot" reply). use crate::config::{Config, TelegramConfig}; use crate::notify::Notification; use crate::{home, now}; use std::cell::RefCell; use std::collections::VecDeque; use std::path::PathBuf; use std::rc::Rc; use tokio::sync::mpsc; use tracing::{error, info}; const MAX_LOG_LINES: usize = 100; const POLL_TIMEOUT: u64 = 30; pub struct TelegramState { pub config: TelegramConfig, pub connected: bool, pub log: VecDeque, pub last_offset: i64, client: reqwest::Client, } pub type SharedTelegram = Rc>; impl TelegramState { fn new(config: TelegramConfig) -> Self { let last_offset = load_offset(); Self { config, connected: false, log: VecDeque::with_capacity(MAX_LOG_LINES), last_offset, client: reqwest::Client::new(), } } fn push_log(&mut self, line: &str) { if self.log.len() >= MAX_LOG_LINES { self.log.pop_front(); } self.log.push_back(line.to_string()); } fn api_url(&self, method: &str) -> String { format!( "https://api.telegram.org/bot{}/{}", self.config.token, method ) } } fn offset_path() -> PathBuf { home().join(".claude/telegram/last_offset") } fn load_offset() -> i64 { std::fs::read_to_string(offset_path()) .ok() .and_then(|s| s.trim().parse().ok()) .unwrap_or(0) } fn save_offset(offset: i64) { let _ = std::fs::write(offset_path(), offset.to_string()); } fn history_path() -> PathBuf { home().join(".claude/telegram/history.log") } fn media_dir() -> PathBuf { home().join(".claude/telegram/media") } fn append_history(line: &str) { use std::io::Write; if let Ok(mut f) = std::fs::OpenOptions::new() .create(true) .append(true) .open(history_path()) { let _ = writeln!(f, "{}", line); } } /// Start the Telegram module. Returns the shared state handle. pub fn start( config: TelegramConfig, notify_tx: mpsc::UnboundedSender, _daemon_config: Rc>, ) -> SharedTelegram { let state = Rc::new(RefCell::new(TelegramState::new(config))); let state_clone = state.clone(); tokio::task::spawn_local(async move { poll_loop(state_clone, notify_tx).await; }); state } async fn poll_loop( state: SharedTelegram, notify_tx: mpsc::UnboundedSender, ) { let _ = std::fs::create_dir_all(media_dir()); loop { match poll_once(&state, ¬ify_tx).await { Ok(()) => {} Err(e) => { error!("telegram: poll error: {e}"); tokio::time::sleep(std::time::Duration::from_secs(5)).await; } } } } async fn poll_once( state: &SharedTelegram, notify_tx: &mpsc::UnboundedSender, ) -> Result<(), Box> { let (url, chat_id, token) = { let s = state.borrow(); let url = format!( "{}?offset={}&timeout={}", s.api_url("getUpdates"), s.last_offset, POLL_TIMEOUT, ); (url, s.config.chat_id, s.config.token.clone()) }; let client = state.borrow().client.clone(); let resp: serde_json::Value = client .get(&url) .timeout(std::time::Duration::from_secs(POLL_TIMEOUT + 5)) .send() .await? .json() .await?; if !state.borrow().connected { state.borrow_mut().connected = true; info!("telegram: connected"); } let results = resp["result"].as_array(); let results = match results { Some(r) => r, None => return Ok(()), }; for update in results { let update_id = update["update_id"].as_i64().unwrap_or(0); let msg = &update["message"]; // Update offset { let mut s = state.borrow_mut(); s.last_offset = update_id + 1; save_offset(s.last_offset); } let msg_chat_id = msg["chat"]["id"].as_i64().unwrap_or(0); if msg_chat_id != chat_id { // Reject messages from unknown chats let reject_url = format!( "https://api.telegram.org/bot{}/sendMessage", token ); let _ = client .post(&reject_url) .form(&[ ("chat_id", msg_chat_id.to_string()), ("text", "This is a private bot.".to_string()), ]) .send() .await; continue; } let sender = msg["from"]["first_name"] .as_str() .unwrap_or("unknown") .to_string(); // Handle different message types if let Some(text) = msg["text"].as_str() { let log_line = format!("[{}] {}", sender, text); state.borrow_mut().push_log(&log_line); let ts = timestamp(); append_history(&format!("{ts} [{sender}] {text}")); let _ = notify_tx.send(Notification { ntype: format!("telegram.{}", sender.to_lowercase()), urgency: crate::notify::NORMAL, message: log_line, timestamp: now(), }); } else if let Some(photos) = msg["photo"].as_array() { // Pick largest photo let best = photos.iter().max_by_key(|p| p["file_size"].as_i64().unwrap_or(0)); if let Some(photo) = best { if let Some(file_id) = photo["file_id"].as_str() { let caption = msg["caption"].as_str().unwrap_or(""); let local = download_file(&client, &token, file_id, ".jpg").await; let display = match &local { Some(p) => format!("[photo: {}]{}", p.display(), if caption.is_empty() { String::new() } else { format!(" {caption}") }), None => format!("[photo]{}", if caption.is_empty() { String::new() } else { format!(" {caption}") }), }; let log_line = format!("[{}] {}", sender, display); state.borrow_mut().push_log(&log_line); let ts = timestamp(); append_history(&format!("{ts} [{sender}] {display}")); let _ = notify_tx.send(Notification { ntype: format!("telegram.{}", sender.to_lowercase()), urgency: crate::notify::NORMAL, message: log_line, timestamp: now(), }); } } } else if msg["voice"].is_object() { if let Some(file_id) = msg["voice"]["file_id"].as_str() { let caption = msg["caption"].as_str().unwrap_or(""); let local = download_file(&client, &token, file_id, ".ogg").await; let display = match &local { Some(p) => format!("[voice: {}]{}", p.display(), if caption.is_empty() { String::new() } else { format!(" {caption}") }), None => format!("[voice]{}", if caption.is_empty() { String::new() } else { format!(" {caption}") }), }; let log_line = format!("[{}] {}", sender, display); state.borrow_mut().push_log(&log_line); let ts = timestamp(); append_history(&format!("{ts} [{sender}] {display}")); let _ = notify_tx.send(Notification { ntype: format!("telegram.{}", sender.to_lowercase()), urgency: crate::notify::NORMAL, message: log_line, timestamp: now(), }); } } else if msg["document"].is_object() { if let Some(file_id) = msg["document"]["file_id"].as_str() { let fname = msg["document"]["file_name"].as_str().unwrap_or("file"); let caption = msg["caption"].as_str().unwrap_or(""); let local = download_file(&client, &token, file_id, "").await; let display = match &local { Some(p) => format!("[doc: {} -> {}]{}", fname, p.display(), if caption.is_empty() { String::new() } else { format!(" {caption}") }), None => format!("[doc: {}]{}", fname, if caption.is_empty() { String::new() } else { format!(" {caption}") }), }; let log_line = format!("[{}] {}", sender, display); state.borrow_mut().push_log(&log_line); let ts = timestamp(); append_history(&format!("{ts} [{sender}] {display}")); let _ = notify_tx.send(Notification { ntype: format!("telegram.{}", sender.to_lowercase()), urgency: crate::notify::NORMAL, message: log_line, timestamp: now(), }); } } } Ok(()) } async fn download_file( client: &reqwest::Client, token: &str, file_id: &str, ext: &str, ) -> Option { let url = format!("https://api.telegram.org/bot{token}/getFile?file_id={file_id}"); let resp: serde_json::Value = client.get(&url).send().await.ok()?.json().await.ok()?; let file_path = resp["result"]["file_path"].as_str()?; let download_url = format!("https://api.telegram.org/file/bot{token}/{file_path}"); let bytes = client.get(&download_url).send().await.ok()?.bytes().await.ok()?; let basename = std::path::Path::new(file_path) .file_name() .and_then(|n| n.to_str()) .unwrap_or("file"); let local_name = if ext.is_empty() { basename.to_string() } else { let stem = std::path::Path::new(basename) .file_stem() .and_then(|s| s.to_str()) .unwrap_or("file"); format!("{}{}", stem, ext) }; let secs = now() as u64; let local_path = media_dir().join(format!("{secs}_{local_name}")); std::fs::write(&local_path, &bytes).ok()?; Some(local_path) } fn timestamp() -> String { // Use the same unix seconds approach as IRC module format!("{}", now() as u64) } /// Handle a runtime command from RPC. pub async fn handle_command( state: &SharedTelegram, _daemon_config: &Rc>, cmd: &str, args: &[String], ) -> Result { match cmd { "send" => { let msg = args.join(" "); if msg.is_empty() { return Err("usage: telegram send ".into()); } let (url, client) = { let s = state.borrow(); (s.api_url("sendMessage"), s.client.clone()) }; let chat_id = state.borrow().config.chat_id.to_string(); client .post(&url) .form(&[("chat_id", chat_id.as_str()), ("text", msg.as_str())]) .send() .await .map_err(|e| e.to_string())?; let ts = timestamp(); append_history(&format!("{ts} [ProofOfConcept] {msg}")); Ok("sent".to_string()) } "status" => { let s = state.borrow(); Ok(format!( "connected={} log_lines={} offset={}", s.connected, s.log.len(), s.last_offset, )) } "log" => { let n: usize = args .first() .and_then(|s| s.parse().ok()) .unwrap_or(15); let s = state.borrow(); let lines: Vec<&String> = s.log.iter().rev().take(n).collect(); let mut lines: Vec<&str> = lines.iter().map(|s| s.as_str()).collect(); lines.reverse(); Ok(lines.join("\n")) } _ => Err(format!( "unknown telegram command: {cmd}\n\ commands: send, status, log" )), } }