forked from kent/consciousness
tmux channel: - Track connected state per-pane (shows true channel availability) - Persist pane config on add/remove (survives restarts) - Remove cleanup_pipes on exit (unnecessary with persisted config) - Reorder PaneConfig fields for consistency telegram channel: - Use json5 crate for config parsing (matches tmux) Co-Authored-By: Proof of Concept <poc@bcachefs.org>
457 lines
16 KiB
Rust
457 lines
16 KiB
Rust
// channel-telegram — Standalone Telegram channel daemon
|
|
//
|
|
// Long-polls the Telegram Bot API, stores messages, and serves
|
|
// them over the channel.capnp protocol on a Unix socket at
|
|
// ~/.consciousness/channels/telegram.sock.
|
|
//
|
|
// Runs independently of the consciousness binary so restarts
|
|
// don't kill the Telegram connection.
|
|
|
|
use std::cell::RefCell;
|
|
use std::path::PathBuf;
|
|
use std::rc::Rc;
|
|
|
|
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
|
|
use futures::AsyncReadExt;
|
|
use tokio::net::UnixListener;
|
|
use tokio_util::compat::TokioAsyncReadCompatExt;
|
|
use log::{info, error};
|
|
|
|
use consciousness::channel_capnp::{channel_client, channel_server};
|
|
|
|
// ── Config ──────────────────────────────────────────────────────
|
|
|
|
#[derive(Clone, serde::Serialize, serde::Deserialize)]
|
|
struct Config {
|
|
#[serde(default, skip_serializing)]
|
|
token: String,
|
|
#[serde(default)]
|
|
chat_ids: std::collections::BTreeMap<String, i64>,
|
|
}
|
|
|
|
fn channels_dir() -> PathBuf {
|
|
dirs::home_dir()
|
|
.unwrap_or_default()
|
|
.join(".consciousness/channels")
|
|
}
|
|
|
|
fn load_config() -> Config {
|
|
let dir = channels_dir();
|
|
let config_path = dir.join("telegram.json5");
|
|
let text = std::fs::read_to_string(&config_path)
|
|
.unwrap_or_else(|_| panic!("failed to read {}", config_path.display()));
|
|
let mut config: Config = json5::from_str(&text)
|
|
.unwrap_or_else(|e| panic!("failed to parse {}: {}", config_path.display(), e));
|
|
|
|
// Read token from secrets file
|
|
let token_path = dir.join("telegram.secrets/token");
|
|
if let Ok(token) = std::fs::read_to_string(&token_path) {
|
|
config.token = token.trim().to_string();
|
|
}
|
|
if config.token.is_empty() {
|
|
panic!("no telegram token — set it in {}", token_path.display());
|
|
}
|
|
config
|
|
}
|
|
|
|
// ── State ───────────────────────────────────────────────────────
|
|
|
|
use consciousness::thalamus::channel_log::{self, ChannelLog};
|
|
|
|
struct State {
|
|
config: Config,
|
|
/// Per-channel message logs (keyed by channel path, e.g. "telegram.kent")
|
|
channel_logs: std::collections::BTreeMap<String, ChannelLog>,
|
|
/// Telegram API offset
|
|
last_offset: i64,
|
|
connected: bool,
|
|
client: consciousness::agent::api::http::HttpClient,
|
|
/// Registered notification callbacks
|
|
subscribers: Vec<channel_client::Client>,
|
|
}
|
|
|
|
type SharedState = Rc<RefCell<State>>;
|
|
|
|
impl State {
|
|
fn new(config: Config) -> Self {
|
|
let last_offset = load_offset();
|
|
|
|
// Load existing sub-channel logs from disk
|
|
let mut channel_logs = std::collections::BTreeMap::new();
|
|
let log_path = log_dir();
|
|
if let Ok(entries) = std::fs::read_dir(&log_path) {
|
|
for entry in entries.flatten() {
|
|
let name = entry.file_name().to_string_lossy().to_string();
|
|
if let Some(target) = name.strip_suffix(".log") {
|
|
let key = format!("telegram.{}", target);
|
|
channel_logs.insert(
|
|
key,
|
|
channel_log::load_disk_log(&log_path, target),
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
Self {
|
|
config,
|
|
channel_logs,
|
|
last_offset,
|
|
connected: false,
|
|
client: consciousness::agent::api::http::HttpClient::new(),
|
|
subscribers: Vec::new(),
|
|
}
|
|
}
|
|
|
|
fn push_message(&mut self, line: String, urgency: u8, channel: &str) {
|
|
let target = channel_to_target(channel);
|
|
self.channel_logs
|
|
.entry(channel.to_string())
|
|
.or_insert_with(|| channel_log::load_disk_log(&log_dir(), &target))
|
|
.push(line.clone());
|
|
|
|
// Notify all subscribers
|
|
let preview = line.chars().take(80).collect::<String>();
|
|
for sub in &self.subscribers {
|
|
let mut req = sub.notify_request();
|
|
let mut list = req.get().init_notifications(1);
|
|
let mut n = list.reborrow().get(0);
|
|
n.set_channel(channel);
|
|
n.set_urgency(urgency);
|
|
n.set_preview(&preview);
|
|
n.set_count(1);
|
|
// Fire and forget — if client is gone, we'll clean up later
|
|
tokio::task::spawn_local(async move {
|
|
let _ = req.send().promise.await;
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
// ── Persistence ─────────────────────────────────────────────────
|
|
|
|
fn log_dir() -> PathBuf {
|
|
channel_log::log_dir("telegram")
|
|
}
|
|
|
|
fn load_offset() -> i64 {
|
|
std::fs::read_to_string(log_dir().join("last_offset"))
|
|
.ok()
|
|
.and_then(|s| s.trim().parse().ok())
|
|
.unwrap_or(0)
|
|
}
|
|
|
|
fn save_offset(offset: i64) {
|
|
let _ = std::fs::create_dir_all(log_dir());
|
|
let _ = std::fs::write(log_dir().join("last_offset"), offset.to_string());
|
|
}
|
|
|
|
/// Convert a channel path to a telegram target name.
|
|
/// "telegram.kent" -> "kent"
|
|
fn channel_to_target(channel: &str) -> String {
|
|
channel.strip_prefix("telegram.").unwrap_or(channel).to_string()
|
|
}
|
|
|
|
fn config_path() -> PathBuf {
|
|
channels_dir().join("telegram.json5")
|
|
}
|
|
|
|
fn save_config(config: &Config) {
|
|
if let Ok(json) = serde_json::to_string_pretty(config) {
|
|
let _ = std::fs::write(config_path(), json);
|
|
}
|
|
}
|
|
|
|
// ── Telegram API ────────────────────────────────────────────────
|
|
//
|
|
// NOTE: The current HttpClient opens a new TCP+TLS connection per request.
|
|
// Telegram's API supports HTTP/2, which would allow multiplexing getUpdates
|
|
// and sendMessage on a single connection. To use HTTP/2:
|
|
// - Replace HttpClient with hyper_util::client::legacy::Client using
|
|
// a Connector that enables HTTP/2 (hyper_util::client::legacy::connect::HttpConnector
|
|
// + hyper_rustls with ALPN h2).
|
|
// - Or use reqwest with the "http2" feature, which handles connection pooling
|
|
// and HTTP/2 negotiation automatically.
|
|
// - The API functions below would then share a single pooled client, and
|
|
// concurrent requests (poll + send) would multiplex over one connection.
|
|
|
|
use consciousness::agent::api::http::HttpClient;
|
|
|
|
struct TelegramMessage {
|
|
update_id: i64,
|
|
chat_id: i64,
|
|
sender: String,
|
|
text: String,
|
|
}
|
|
|
|
/// Fetch and parse pending updates from Telegram via long polling.
|
|
async fn get_updates(
|
|
client: &HttpClient,
|
|
token: &str,
|
|
offset: i64,
|
|
) -> Result<Vec<TelegramMessage>, Box<dyn std::error::Error>> {
|
|
let url = format!(
|
|
"https://api.telegram.org/bot{}/getUpdates?offset={}&timeout=30",
|
|
token, offset,
|
|
);
|
|
let response = client.get(&url).await?;
|
|
let body = response.text().await?;
|
|
let resp: serde_json::Value = serde_json::from_str(&body)
|
|
.map_err(|e| format!("getUpdates JSON parse error: {e}\nbody: {}", &body[..body.len().min(500)]))?;
|
|
|
|
let mut messages = Vec::new();
|
|
if let Some(results) = resp["result"].as_array() {
|
|
for update in results {
|
|
let update_id = update["update_id"].as_i64().unwrap_or(0);
|
|
let msg = &update["message"];
|
|
let sender = msg["from"]["first_name"].as_str().unwrap_or("unknown").to_string();
|
|
let chat_id = msg["chat"]["id"].as_i64().unwrap_or(0);
|
|
|
|
if let Some(text) = msg["text"].as_str() {
|
|
messages.push(TelegramMessage {
|
|
update_id,
|
|
chat_id,
|
|
sender,
|
|
text: text.to_string(),
|
|
});
|
|
}
|
|
}
|
|
}
|
|
Ok(messages)
|
|
}
|
|
|
|
/// Send a text message to a Telegram chat.
|
|
async fn send_message(
|
|
client: &HttpClient,
|
|
token: &str,
|
|
chat_id: i64,
|
|
text: &str,
|
|
) -> Result<(), Box<dyn std::error::Error>> {
|
|
let url = format!(
|
|
"https://api.telegram.org/bot{}/sendMessage",
|
|
token,
|
|
);
|
|
let response = client.post_form(&url, &[
|
|
("chat_id", &chat_id.to_string()),
|
|
("text", text),
|
|
]).await?;
|
|
let status = response.status();
|
|
if !status.is_success() {
|
|
let body = response.text().await.unwrap_or_default();
|
|
return Err(format!("sendMessage failed: {} — {}", status, &body[..body.len().min(500)]).into());
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
// ── ChannelServer Implementation ────────────────────────────────
|
|
|
|
struct ChannelServerImpl {
|
|
state: SharedState,
|
|
}
|
|
|
|
macro_rules! pry {
|
|
($e:expr) => {
|
|
match $e {
|
|
Ok(v) => v,
|
|
Err(e) => return std::future::ready(Err(e.into())),
|
|
}
|
|
};
|
|
}
|
|
|
|
impl channel_server::Server for ChannelServerImpl {
|
|
fn recv(
|
|
self: Rc<Self>,
|
|
params: channel_server::RecvParams,
|
|
mut results: channel_server::RecvResults,
|
|
) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
|
|
let params = pry!(params.get());
|
|
let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
|
|
let all_new = params.get_all_new();
|
|
let min_count = params.get_min_count() as usize;
|
|
|
|
let mut s = self.state.borrow_mut();
|
|
let text = match s.channel_logs.get_mut(&channel) {
|
|
Some(log) => {
|
|
if all_new { log.recv_new(min_count) } else { log.recv_history(min_count) }
|
|
}
|
|
None => String::new(),
|
|
};
|
|
|
|
results.get().set_text(&text);
|
|
std::future::ready(Ok(()))
|
|
}
|
|
|
|
fn send(
|
|
self: Rc<Self>,
|
|
params: channel_server::SendParams,
|
|
_results: channel_server::SendResults,
|
|
) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
|
|
let state = self.state.clone();
|
|
async move {
|
|
let params = params.get()?;
|
|
let channel = params.get_channel()?.to_str()?.to_string();
|
|
let message = params.get_message()?.to_str()?.to_string();
|
|
let target = channel_to_target(&channel);
|
|
|
|
let (token, client, chat_id) = {
|
|
let s = state.borrow();
|
|
let chat_id = s.config.chat_ids.get(&target).copied()
|
|
.ok_or_else(|| capnp::Error::failed(
|
|
format!("no chat_id known for {target}")))?;
|
|
(s.config.token.clone(), s.client.clone(), chat_id)
|
|
};
|
|
|
|
send_message(&client, &token, chat_id, &message).await
|
|
.map_err(|e| capnp::Error::failed(format!("send_message: {e}")))?;
|
|
|
|
channel_log::append_disk_log(&log_dir(), &target, "PoC", &message);
|
|
state.borrow_mut().channel_logs
|
|
.entry(channel)
|
|
.or_insert_with(|| channel_log::load_disk_log(&log_dir(), &target))
|
|
.push_own(format!("[PoC] {}", message));
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
fn subscribe(
|
|
self: Rc<Self>,
|
|
params: channel_server::SubscribeParams,
|
|
_results: channel_server::SubscribeResults,
|
|
) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
|
|
let callback = pry!(pry!(params.get()).get_callback());
|
|
self.state.borrow_mut().subscribers.push(callback);
|
|
info!("client subscribed for notifications");
|
|
std::future::ready(Ok(()))
|
|
}
|
|
|
|
fn list(
|
|
self: Rc<Self>,
|
|
_params: channel_server::ListParams,
|
|
mut results: channel_server::ListResults,
|
|
) -> impl std::future::Future<Output = Result<(), capnp::Error>> {
|
|
let s = self.state.borrow();
|
|
let connected = s.connected;
|
|
|
|
let names: Vec<String> = s.channel_logs.keys().cloned().collect();
|
|
let mut list = results.get().init_channels(names.len() as u32);
|
|
for (i, name) in names.iter().enumerate() {
|
|
let mut entry = list.reborrow().get(i as u32);
|
|
entry.set_name(name);
|
|
entry.set_connected(connected);
|
|
entry.set_unread(
|
|
s.channel_logs.get(name).map_or(0, |l| l.unread())
|
|
);
|
|
}
|
|
|
|
std::future::ready(Ok(()))
|
|
}
|
|
}
|
|
|
|
// ── Main ────────────────────────────────────────────────────────
|
|
|
|
async fn poll_once(
|
|
token: &str,
|
|
client: &HttpClient,
|
|
state: &SharedState,
|
|
) -> Result<(), Box<dyn std::error::Error>> {
|
|
let offset = state.borrow().last_offset;
|
|
let messages = get_updates(client, token, offset).await?;
|
|
|
|
if !state.borrow().connected {
|
|
state.borrow_mut().connected = true;
|
|
info!("telegram: connected");
|
|
}
|
|
|
|
let mut max_offset = offset;
|
|
|
|
for msg in &messages {
|
|
max_offset = max_offset.max(msg.update_id + 1);
|
|
let sender_lower = msg.sender.to_lowercase();
|
|
let channel = format!("telegram.{}", sender_lower);
|
|
|
|
channel_log::append_disk_log(&log_dir(), &sender_lower, &msg.sender, &msg.text);
|
|
|
|
let mut s = state.borrow_mut();
|
|
s.config.chat_ids.insert(sender_lower, msg.chat_id);
|
|
let line = format!("[{}] {}", msg.sender, msg.text);
|
|
s.push_message(line, 2, &channel);
|
|
}
|
|
|
|
if max_offset > offset {
|
|
let mut s = state.borrow_mut();
|
|
s.last_offset = max_offset;
|
|
save_offset(max_offset);
|
|
save_config(&s.config);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::main(flavor = "current_thread")]
|
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
env_logger::init();
|
|
|
|
let config = load_config();
|
|
let token = config.token.clone();
|
|
let state = Rc::new(RefCell::new(State::new(config)));
|
|
|
|
let sock_dir = dirs::home_dir()
|
|
.unwrap_or_default()
|
|
.join(".consciousness/channels");
|
|
std::fs::create_dir_all(&sock_dir)?;
|
|
let sock_path = sock_dir.join("telegram.sock");
|
|
let _ = std::fs::remove_file(&sock_path);
|
|
let _ = std::fs::create_dir_all(log_dir().join("media"));
|
|
|
|
info!("telegram channel daemon starting on {}", sock_path.display());
|
|
|
|
tokio::task::LocalSet::new()
|
|
.run_until(async move {
|
|
// Start Telegram polling
|
|
let poll_state = state.clone();
|
|
let poll_client = state.borrow().client.clone();
|
|
tokio::task::spawn_local(async move {
|
|
loop {
|
|
if let Err(e) = poll_once(&token, &poll_client, &poll_state).await {
|
|
error!("telegram poll error: {e}");
|
|
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Listen for channel protocol connections
|
|
let listener = UnixListener::bind(&sock_path)?;
|
|
state.borrow_mut().connected = true;
|
|
|
|
info!("listening on socket {}", sock_path.display());
|
|
|
|
loop {
|
|
let (stream, _) = listener.accept().await?;
|
|
let (reader, writer) = stream.compat().split();
|
|
let network = twoparty::VatNetwork::new(
|
|
futures::io::BufReader::new(reader),
|
|
futures::io::BufWriter::new(writer),
|
|
rpc_twoparty_capnp::Side::Server,
|
|
Default::default(),
|
|
);
|
|
|
|
let server = ChannelServerImpl {
|
|
state: state.clone(),
|
|
};
|
|
let client: channel_server::Client =
|
|
capnp_rpc::new_client(server);
|
|
|
|
let rpc_system = RpcSystem::new(
|
|
Box::new(network),
|
|
Some(client.client),
|
|
);
|
|
|
|
tokio::task::spawn_local(rpc_system);
|
|
info!("channel client connected");
|
|
}
|
|
|
|
#[allow(unreachable_code)]
|
|
Ok::<(), Box<dyn std::error::Error>>(())
|
|
})
|
|
.await
|
|
}
|