consciousness/channels/telegram/src/main.rs
Kent Overstreet 1cf4f504c0 Kill reqwest — minimal HTTP client on raw hyper + tokio-rustls
New src/agent/api/http.rs: ~240 lines, supports GET/POST, JSON/form
bodies, SSE streaming via chunk(), TLS via rustls. No tracing dep.

Removes reqwest from the main crate and telegram channel crate.
Cargo.lock drops ~900 lines of transitive dependencies.

tracing now only pulled in by tui-markdown.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-07 12:50:40 -04:00

385 lines
13 KiB
Rust

// channel-telegram — Standalone Telegram channel daemon
//
// Long-polls the Telegram Bot API, stores messages, and serves
// them over the channel.capnp protocol on a Unix socket at
// ~/.consciousness/channels/telegram.sock.
//
// Runs independently of the consciousness binary so restarts
// don't kill the Telegram connection.
use std::cell::RefCell;
use std::path::PathBuf;
use std::rc::Rc;
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
use futures::AsyncReadExt;
use tokio::net::UnixListener;
use tokio_util::compat::TokioAsyncReadCompatExt;
use log::{info, error};
use poc_memory::channel_capnp::{channel_client, channel_server};
// ── Config ──────────────────────────────────────────────────────
#[derive(Clone, serde::Deserialize)]
struct Config {
#[serde(default)]
token: String,
chat_id: i64,
}
fn channels_dir() -> PathBuf {
dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/channels")
}
fn load_config() -> Config {
let dir = channels_dir();
let config_path = dir.join("telegram.json5");
let text = std::fs::read_to_string(&config_path)
.unwrap_or_else(|_| panic!("failed to read {}", config_path.display()));
let mut config: Config = serde_json::from_str(&text)
.unwrap_or_else(|e| panic!("failed to parse {}: {}", config_path.display(), e));
// Read token from secrets file
let token_path = dir.join("telegram.secrets/token");
if let Ok(token) = std::fs::read_to_string(&token_path) {
config.token = token.trim().to_string();
}
if config.token.is_empty() {
panic!("no telegram token — set it in {}", token_path.display());
}
config
}
// ── State ───────────────────────────────────────────────────────
use poc_memory::thalamus::channel_log::ChannelLog;
struct State {
config: Config,
/// Per-channel message logs (keyed by channel path, e.g. "telegram.kent")
channel_logs: std::collections::BTreeMap<String, ChannelLog>,
/// Telegram API offset
last_offset: i64,
connected: bool,
client: poc_memory::agent::api::http::HttpClient,
/// Registered notification callbacks
subscribers: Vec<channel_client::Client>,
}
type SharedState = Rc<RefCell<State>>;
impl State {
fn new(config: Config) -> Self {
let last_offset = load_offset();
Self {
config,
channel_logs: std::collections::BTreeMap::new(),
last_offset,
connected: false,
client: poc_memory::agent::api::http::HttpClient::new(),
subscribers: Vec::new(),
}
}
fn push_message(&mut self, line: String, urgency: u8, channel: &str) {
self.channel_logs
.entry(channel.to_string())
.or_insert_with(ChannelLog::new)
.push(line.clone());
// Notify all subscribers
let preview = line.chars().take(80).collect::<String>();
for sub in &self.subscribers {
let mut req = sub.notify_request();
let mut list = req.get().init_notifications(1);
let mut n = list.reborrow().get(0);
n.set_channel(channel);
n.set_urgency(urgency);
n.set_preview(&preview);
n.set_count(1);
// Fire and forget — if client is gone, we'll clean up later
tokio::task::spawn_local(async move {
let _ = req.send().promise.await;
});
}
}
fn api_url(&self, method: &str) -> String {
format!("https://api.telegram.org/bot{}/{}", self.config.token, method)
}
}
// ── Persistence ─────────────────────────────────────────────────
fn data_dir() -> PathBuf {
dirs::home_dir().unwrap_or_default().join(".consciousness/channels/telegram.logs")
}
fn load_offset() -> i64 {
std::fs::read_to_string(data_dir().join("last_offset"))
.ok()
.and_then(|s| s.trim().parse().ok())
.unwrap_or(0)
}
fn save_offset(offset: i64) {
let _ = std::fs::create_dir_all(data_dir());
let _ = std::fs::write(data_dir().join("last_offset"), offset.to_string());
}
fn append_history(line: &str) {
use std::io::Write;
if let Ok(mut f) = std::fs::OpenOptions::new()
.create(true).append(true)
.open(data_dir().join("history.log"))
{
let _ = writeln!(f, "{}", line);
}
}
fn now() -> f64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64()
}
// ── Telegram Polling ────────────────────────────────────────────
async fn poll_loop(state: SharedState) {
let _ = std::fs::create_dir_all(data_dir().join("media"));
loop {
if let Err(e) = poll_once(&state).await {
error!("telegram poll error: {e}");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
async fn poll_once(state: &SharedState) -> Result<(), Box<dyn std::error::Error>> {
let (url, chat_id, token) = {
let s = state.borrow();
let url = format!(
"{}?offset={}&timeout=30",
s.api_url("getUpdates"),
s.last_offset,
);
(url, s.config.chat_id, s.config.token.clone())
};
let client = state.borrow().client.clone();
let resp: serde_json::Value = client.get(&url).await?.json().await?;
if !state.borrow().connected {
state.borrow_mut().connected = true;
info!("telegram: connected");
}
let results = match resp["result"].as_array() {
Some(r) => r,
None => return Ok(()),
};
for update in results {
let update_id = update["update_id"].as_i64().unwrap_or(0);
let msg = &update["message"];
{
let mut s = state.borrow_mut();
s.last_offset = update_id + 1;
save_offset(s.last_offset);
}
let msg_chat_id = msg["chat"]["id"].as_i64().unwrap_or(0);
if msg_chat_id != chat_id {
let reject_url = format!("https://api.telegram.org/bot{token}/sendMessage");
let _ = client.post_form(&reject_url, &[
("chat_id", &msg_chat_id.to_string()),
("text", "This is a private bot."),
]).await;
continue;
}
let sender = msg["from"]["first_name"].as_str().unwrap_or("unknown").to_string();
let channel = format!("telegram.{}", sender.to_lowercase());
if let Some(text) = msg["text"].as_str() {
let line = format!("[{}] {}", sender, text);
let ts = now() as u64;
append_history(&format!("{ts} {line}"));
state.borrow_mut().push_message(line, 2, &channel); // NORMAL urgency
}
// TODO: handle photos, voice, documents (same as original module)
}
Ok(())
}
// ── ChannelServer Implementation ────────────────────────────────
struct ChannelServerImpl {
state: SharedState,
}
macro_rules! pry {
($e:expr) => {
match $e {
Ok(v) => v,
Err(e) => return std::future::ready(Err(e.into())),
}
};
}
impl channel_server::Server for ChannelServerImpl {
fn recv(
self: Rc<Self>,
params: channel_server::RecvParams,
mut results: channel_server::RecvResults,
) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let params = pry!(params.get());
let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
let all_new = params.get_all_new();
let min_count = params.get_min_count() as usize;
let mut s = self.state.borrow_mut();
let text = match s.channel_logs.get_mut(&channel) {
Some(log) => {
if all_new { log.recv_new(min_count) } else { log.recv_history(min_count) }
}
None => String::new(),
};
results.get().set_text(&text);
std::future::ready(Ok(()))
}
fn send(
self: Rc<Self>,
params: channel_server::SendParams,
_results: channel_server::SendResults,
) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let state = self.state.clone();
async move {
let params = params.get()?;
let _channel = params.get_channel()?.to_str()?.to_string();
let message = params.get_message()?.to_str()?.to_string();
let (url, client, chat_id) = {
let s = state.borrow();
(s.api_url("sendMessage"), s.client.clone(), s.config.chat_id)
};
let _ = client.post_form(&url, &[
("chat_id", &chat_id.to_string()),
("text", &message),
]).await;
let ts = now() as u64;
append_history(&format!("{ts} [agent] {message}"));
{
let channel = "telegram.agent".to_string();
state.borrow_mut().channel_logs
.entry(channel)
.or_insert_with(ChannelLog::new)
.push_own(format!("[agent] {}", message));
}
Ok(())
}
}
fn subscribe(
self: Rc<Self>,
params: channel_server::SubscribeParams,
_results: channel_server::SubscribeResults,
) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let callback = pry!(pry!(params.get()).get_callback());
self.state.borrow_mut().subscribers.push(callback);
info!("client subscribed for notifications");
std::future::ready(Ok(()))
}
fn list(
self: Rc<Self>,
_params: channel_server::ListParams,
mut results: channel_server::ListResults,
) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
let s = self.state.borrow();
let connected = s.connected;
let names: Vec<String> = s.channel_logs.keys().cloned().collect();
let mut list = results.get().init_channels(names.len() as u32);
for (i, name) in names.iter().enumerate() {
let mut entry = list.reborrow().get(i as u32);
entry.set_name(name);
entry.set_connected(connected);
entry.set_unread(
s.channel_logs.get(name).map_or(0, |l| l.unread())
);
}
std::future::ready(Ok(()))
}
}
// ── Main ────────────────────────────────────────────────────────
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let config = load_config();
let state = Rc::new(RefCell::new(State::new(config)));
let sock_dir = dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/channels");
std::fs::create_dir_all(&sock_dir)?;
let sock_path = sock_dir.join("telegram.sock");
let _ = std::fs::remove_file(&sock_path);
info!("telegram channel daemon starting on {}", sock_path.display());
tokio::task::LocalSet::new()
.run_until(async move {
// Start Telegram polling
let poll_state = state.clone();
tokio::task::spawn_local(async move {
poll_loop(poll_state).await;
});
// Listen for channel protocol connections
let listener = UnixListener::bind(&sock_path)?;
loop {
let (stream, _) = listener.accept().await?;
let (reader, writer) = stream.compat().split();
let network = twoparty::VatNetwork::new(
futures::io::BufReader::new(reader),
futures::io::BufWriter::new(writer),
rpc_twoparty_capnp::Side::Server,
Default::default(),
);
let server = ChannelServerImpl {
state: state.clone(),
};
let client: channel_server::Client =
capnp_rpc::new_client(server);
let rpc_system = RpcSystem::new(
Box::new(network),
Some(client.client),
);
tokio::task::spawn_local(rpc_system);
info!("channel client connected");
}
#[allow(unreachable_code)]
Ok::<(), Box<dyn std::error::Error>>(())
})
.await
}