forked from kent/consciousness
fix telegram
This commit is contained in:
parent
57bd5b6d8b
commit
dfef7fb446
1 changed files with 179 additions and 107 deletions
|
|
@ -21,11 +21,12 @@ use consciousness::channel_capnp::{channel_client, channel_server};
|
|||
|
||||
// ── Config ──────────────────────────────────────────────────────
|
||||
|
||||
#[derive(Clone, serde::Deserialize)]
|
||||
#[derive(Clone, serde::Serialize, serde::Deserialize)]
|
||||
struct Config {
|
||||
#[serde(default)]
|
||||
#[serde(default, skip_serializing)]
|
||||
token: String,
|
||||
chat_id: i64,
|
||||
#[serde(default)]
|
||||
chat_ids: std::collections::BTreeMap<String, i64>,
|
||||
}
|
||||
|
||||
fn channels_dir() -> PathBuf {
|
||||
|
|
@ -55,7 +56,7 @@ fn load_config() -> Config {
|
|||
|
||||
// ── State ───────────────────────────────────────────────────────
|
||||
|
||||
use consciousness::thalamus::channel_log::ChannelLog;
|
||||
use consciousness::thalamus::channel_log::{self, ChannelLog};
|
||||
|
||||
struct State {
|
||||
config: Config,
|
||||
|
|
@ -74,9 +75,26 @@ type SharedState = Rc<RefCell<State>>;
|
|||
impl State {
|
||||
fn new(config: Config) -> Self {
|
||||
let last_offset = load_offset();
|
||||
|
||||
// Load existing sub-channel logs from disk
|
||||
let mut channel_logs = std::collections::BTreeMap::new();
|
||||
let log_path = log_dir();
|
||||
if let Ok(entries) = std::fs::read_dir(&log_path) {
|
||||
for entry in entries.flatten() {
|
||||
let name = entry.file_name().to_string_lossy().to_string();
|
||||
if let Some(target) = name.strip_suffix(".log") {
|
||||
let key = format!("telegram.{}", target);
|
||||
channel_logs.insert(
|
||||
key,
|
||||
channel_log::load_disk_log(&log_path, target),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Self {
|
||||
config,
|
||||
channel_logs: std::collections::BTreeMap::new(),
|
||||
channel_logs,
|
||||
last_offset,
|
||||
connected: false,
|
||||
client: consciousness::agent::api::http::HttpClient::new(),
|
||||
|
|
@ -85,9 +103,10 @@ impl State {
|
|||
}
|
||||
|
||||
fn push_message(&mut self, line: String, urgency: u8, channel: &str) {
|
||||
let target = channel_to_target(channel);
|
||||
self.channel_logs
|
||||
.entry(channel.to_string())
|
||||
.or_insert_with(ChannelLog::new)
|
||||
.or_insert_with(|| channel_log::load_disk_log(&log_dir(), &target))
|
||||
.push(line.clone());
|
||||
|
||||
// Notify all subscribers
|
||||
|
|
@ -106,116 +125,120 @@ impl State {
|
|||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn api_url(&self, method: &str) -> String {
|
||||
format!("https://api.telegram.org/bot{}/{}", self.config.token, method)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// ── Persistence ─────────────────────────────────────────────────
|
||||
|
||||
fn data_dir() -> PathBuf {
|
||||
dirs::home_dir().unwrap_or_default().join(".consciousness/channels/telegram.logs")
|
||||
fn log_dir() -> PathBuf {
|
||||
channel_log::log_dir("telegram")
|
||||
}
|
||||
|
||||
fn load_offset() -> i64 {
|
||||
std::fs::read_to_string(data_dir().join("last_offset"))
|
||||
std::fs::read_to_string(log_dir().join("last_offset"))
|
||||
.ok()
|
||||
.and_then(|s| s.trim().parse().ok())
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
fn save_offset(offset: i64) {
|
||||
let _ = std::fs::create_dir_all(data_dir());
|
||||
let _ = std::fs::write(data_dir().join("last_offset"), offset.to_string());
|
||||
let _ = std::fs::create_dir_all(log_dir());
|
||||
let _ = std::fs::write(log_dir().join("last_offset"), offset.to_string());
|
||||
}
|
||||
|
||||
fn append_history(line: &str) {
|
||||
use std::io::Write;
|
||||
if let Ok(mut f) = std::fs::OpenOptions::new()
|
||||
.create(true).append(true)
|
||||
.open(data_dir().join("history.log"))
|
||||
{
|
||||
let _ = writeln!(f, "{}", line);
|
||||
/// Convert a channel path to a telegram target name.
|
||||
/// "telegram.kent" -> "kent"
|
||||
fn channel_to_target(channel: &str) -> String {
|
||||
channel.strip_prefix("telegram.").unwrap_or(channel).to_string()
|
||||
}
|
||||
|
||||
fn config_path() -> PathBuf {
|
||||
channels_dir().join("telegram.json5")
|
||||
}
|
||||
|
||||
fn save_config(config: &Config) {
|
||||
if let Ok(json) = serde_json::to_string_pretty(config) {
|
||||
let _ = std::fs::write(config_path(), json);
|
||||
}
|
||||
}
|
||||
|
||||
fn now() -> f64 {
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs_f64()
|
||||
// ── Telegram API ────────────────────────────────────────────────
|
||||
//
|
||||
// NOTE: The current HttpClient opens a new TCP+TLS connection per request.
|
||||
// Telegram's API supports HTTP/2, which would allow multiplexing getUpdates
|
||||
// and sendMessage on a single connection. To use HTTP/2:
|
||||
// - Replace HttpClient with hyper_util::client::legacy::Client using
|
||||
// a Connector that enables HTTP/2 (hyper_util::client::legacy::connect::HttpConnector
|
||||
// + hyper_rustls with ALPN h2).
|
||||
// - Or use reqwest with the "http2" feature, which handles connection pooling
|
||||
// and HTTP/2 negotiation automatically.
|
||||
// - The API functions below would then share a single pooled client, and
|
||||
// concurrent requests (poll + send) would multiplex over one connection.
|
||||
|
||||
use consciousness::agent::api::http::HttpClient;
|
||||
|
||||
struct TelegramMessage {
|
||||
update_id: i64,
|
||||
chat_id: i64,
|
||||
sender: String,
|
||||
text: String,
|
||||
}
|
||||
|
||||
// ── Telegram Polling ────────────────────────────────────────────
|
||||
|
||||
async fn poll_loop(state: SharedState) {
|
||||
let _ = std::fs::create_dir_all(data_dir().join("media"));
|
||||
loop {
|
||||
if let Err(e) = poll_once(&state).await {
|
||||
error!("telegram poll error: {e}");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn poll_once(state: &SharedState) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let (url, chat_id, token) = {
|
||||
let s = state.borrow();
|
||||
/// Fetch and parse pending updates from Telegram via long polling.
|
||||
async fn get_updates(
|
||||
client: &HttpClient,
|
||||
token: &str,
|
||||
offset: i64,
|
||||
) -> Result<Vec<TelegramMessage>, Box<dyn std::error::Error>> {
|
||||
let url = format!(
|
||||
"{}?offset={}&timeout=30",
|
||||
s.api_url("getUpdates"),
|
||||
s.last_offset,
|
||||
"https://api.telegram.org/bot{}/getUpdates?offset={}&timeout=30",
|
||||
token, offset,
|
||||
);
|
||||
(url, s.config.chat_id, s.config.token.clone())
|
||||
};
|
||||
|
||||
let client = state.borrow().client.clone();
|
||||
let resp: serde_json::Value = client.get(&url).await?.json().await?;
|
||||
|
||||
if !state.borrow().connected {
|
||||
state.borrow_mut().connected = true;
|
||||
info!("telegram: connected");
|
||||
}
|
||||
|
||||
let results = match resp["result"].as_array() {
|
||||
Some(r) => r,
|
||||
None => return Ok(()),
|
||||
};
|
||||
let response = client.get(&url).await?;
|
||||
let body = response.text().await?;
|
||||
let resp: serde_json::Value = serde_json::from_str(&body)
|
||||
.map_err(|e| format!("getUpdates JSON parse error: {e}\nbody: {}", &body[..body.len().min(500)]))?;
|
||||
|
||||
let mut messages = Vec::new();
|
||||
if let Some(results) = resp["result"].as_array() {
|
||||
for update in results {
|
||||
let update_id = update["update_id"].as_i64().unwrap_or(0);
|
||||
let msg = &update["message"];
|
||||
|
||||
{
|
||||
let mut s = state.borrow_mut();
|
||||
s.last_offset = update_id + 1;
|
||||
save_offset(s.last_offset);
|
||||
}
|
||||
|
||||
let msg_chat_id = msg["chat"]["id"].as_i64().unwrap_or(0);
|
||||
if msg_chat_id != chat_id {
|
||||
let reject_url = format!("https://api.telegram.org/bot{token}/sendMessage");
|
||||
let _ = client.post_form(&reject_url, &[
|
||||
("chat_id", &msg_chat_id.to_string()),
|
||||
("text", "This is a private bot."),
|
||||
]).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
let sender = msg["from"]["first_name"].as_str().unwrap_or("unknown").to_string();
|
||||
let channel = format!("telegram.{}", sender.to_lowercase());
|
||||
let chat_id = msg["chat"]["id"].as_i64().unwrap_or(0);
|
||||
|
||||
if let Some(text) = msg["text"].as_str() {
|
||||
let line = format!("[{}] {}", sender, text);
|
||||
let ts = now() as u64;
|
||||
append_history(&format!("{ts} {line}"));
|
||||
state.borrow_mut().push_message(line, 2, &channel); // NORMAL urgency
|
||||
messages.push(TelegramMessage {
|
||||
update_id,
|
||||
chat_id,
|
||||
sender,
|
||||
text: text.to_string(),
|
||||
});
|
||||
}
|
||||
// TODO: handle photos, voice, documents (same as original module)
|
||||
}
|
||||
}
|
||||
Ok(messages)
|
||||
}
|
||||
|
||||
/// Send a text message to a Telegram chat.
|
||||
async fn send_message(
|
||||
client: &HttpClient,
|
||||
token: &str,
|
||||
chat_id: i64,
|
||||
text: &str,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let url = format!(
|
||||
"https://api.telegram.org/bot{}/sendMessage",
|
||||
token,
|
||||
);
|
||||
let response = client.post_form(&url, &[
|
||||
("chat_id", &chat_id.to_string()),
|
||||
("text", text),
|
||||
]).await?;
|
||||
let status = response.status();
|
||||
if !status.is_success() {
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
return Err(format!("sendMessage failed: {} — {}", status, &body[..body.len().min(500)]).into());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -265,27 +288,27 @@ impl channel_server::Server for ChannelServerImpl {
|
|||
let state = self.state.clone();
|
||||
async move {
|
||||
let params = params.get()?;
|
||||
let _channel = params.get_channel()?.to_str()?.to_string();
|
||||
let channel = params.get_channel()?.to_str()?.to_string();
|
||||
let message = params.get_message()?.to_str()?.to_string();
|
||||
let target = channel_to_target(&channel);
|
||||
|
||||
let (url, client, chat_id) = {
|
||||
let (token, client, chat_id) = {
|
||||
let s = state.borrow();
|
||||
(s.api_url("sendMessage"), s.client.clone(), s.config.chat_id)
|
||||
let chat_id = s.config.chat_ids.get(&target).copied()
|
||||
.ok_or_else(|| capnp::Error::failed(
|
||||
format!("no chat_id known for {target}")))?;
|
||||
(s.config.token.clone(), s.client.clone(), chat_id)
|
||||
};
|
||||
let _ = client.post_form(&url, &[
|
||||
("chat_id", &chat_id.to_string()),
|
||||
("text", &message),
|
||||
]).await;
|
||||
|
||||
let ts = now() as u64;
|
||||
append_history(&format!("{ts} [agent] {message}"));
|
||||
{
|
||||
let channel = "telegram.agent".to_string();
|
||||
send_message(&client, &token, chat_id, &message).await
|
||||
.map_err(|e| capnp::Error::failed(format!("send_message: {e}")))?;
|
||||
|
||||
channel_log::append_disk_log(&log_dir(), &target, "PoC", &message);
|
||||
state.borrow_mut().channel_logs
|
||||
.entry(channel)
|
||||
.or_insert_with(ChannelLog::new)
|
||||
.push_own(format!("[agent] {}", message));
|
||||
}
|
||||
.or_insert_with(|| channel_log::load_disk_log(&log_dir(), &target))
|
||||
.push_own(format!("[PoC] {}", message));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
@ -326,11 +349,50 @@ impl channel_server::Server for ChannelServerImpl {
|
|||
|
||||
// ── Main ────────────────────────────────────────────────────────
|
||||
|
||||
async fn poll_once(
|
||||
token: &str,
|
||||
client: &HttpClient,
|
||||
state: &SharedState,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let offset = state.borrow().last_offset;
|
||||
let messages = get_updates(client, token, offset).await?;
|
||||
|
||||
if !state.borrow().connected {
|
||||
state.borrow_mut().connected = true;
|
||||
info!("telegram: connected");
|
||||
}
|
||||
|
||||
let mut max_offset = offset;
|
||||
|
||||
for msg in &messages {
|
||||
max_offset = max_offset.max(msg.update_id + 1);
|
||||
let sender_lower = msg.sender.to_lowercase();
|
||||
let channel = format!("telegram.{}", sender_lower);
|
||||
|
||||
channel_log::append_disk_log(&log_dir(), &sender_lower, &msg.sender, &msg.text);
|
||||
|
||||
let mut s = state.borrow_mut();
|
||||
s.config.chat_ids.insert(sender_lower, msg.chat_id);
|
||||
let line = format!("[{}] {}", msg.sender, msg.text);
|
||||
s.push_message(line, 2, &channel);
|
||||
}
|
||||
|
||||
if max_offset > offset {
|
||||
let mut s = state.borrow_mut();
|
||||
s.last_offset = max_offset;
|
||||
save_offset(max_offset);
|
||||
save_config(&s.config);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
env_logger::init();
|
||||
|
||||
let config = load_config();
|
||||
let token = config.token.clone();
|
||||
let state = Rc::new(RefCell::new(State::new(config)));
|
||||
|
||||
let sock_dir = dirs::home_dir()
|
||||
|
|
@ -339,6 +401,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
std::fs::create_dir_all(&sock_dir)?;
|
||||
let sock_path = sock_dir.join("telegram.sock");
|
||||
let _ = std::fs::remove_file(&sock_path);
|
||||
let _ = std::fs::create_dir_all(log_dir().join("media"));
|
||||
|
||||
info!("telegram channel daemon starting on {}", sock_path.display());
|
||||
|
||||
|
|
@ -346,12 +409,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
.run_until(async move {
|
||||
// Start Telegram polling
|
||||
let poll_state = state.clone();
|
||||
let poll_client = state.borrow().client.clone();
|
||||
tokio::task::spawn_local(async move {
|
||||
poll_loop(poll_state).await;
|
||||
loop {
|
||||
if let Err(e) = poll_once(&token, &poll_client, &poll_state).await {
|
||||
error!("telegram poll error: {e}");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Listen for channel protocol connections
|
||||
let listener = UnixListener::bind(&sock_path)?;
|
||||
state.borrow_mut().connected = true;
|
||||
|
||||
info!("listening on socket {}", sock_path.display());
|
||||
|
||||
loop {
|
||||
let (stream, _) = listener.accept().await?;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue