split into workspace: poc-memory and poc-daemon subcrates
poc-daemon (notification routing, idle timer, IRC, Telegram) was already fully self-contained with no imports from the poc-memory library. Now it's a proper separate crate with its own Cargo.toml and capnp schema. poc-memory retains the store, graph, search, neuro, knowledge, and the jobkit-based memory maintenance daemon (daemon.rs). Co-Authored-By: ProofOfConcept <poc@bcachefs.org>
This commit is contained in:
parent
488fd5a0aa
commit
fc48ac7c7f
53 changed files with 108 additions and 76 deletions
569
poc-daemon/src/modules/irc.rs
Normal file
569
poc-daemon/src/modules/irc.rs
Normal file
|
|
@ -0,0 +1,569 @@
|
|||
// IRC module.
|
||||
//
|
||||
// Maintains a persistent connection to an IRC server. Parses incoming
|
||||
// messages into notifications, supports sending messages and runtime
|
||||
// commands (join, leave, etc.). Config changes persist to daemon.toml.
|
||||
//
|
||||
// Runs as a spawned local task on the daemon's LocalSet. Notifications
|
||||
// flow through an mpsc channel into the main state. Reconnects
|
||||
// automatically with exponential backoff.
|
||||
|
||||
use crate::config::{Config, IrcConfig};
|
||||
use crate::notify::Notification;
|
||||
use crate::{home, now};
|
||||
use std::cell::RefCell;
|
||||
use std::collections::VecDeque;
|
||||
use std::io;
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
const MAX_LOG_LINES: usize = 200;
|
||||
const RECONNECT_BASE_SECS: u64 = 5;
|
||||
const RECONNECT_MAX_SECS: u64 = 300;
|
||||
const PING_INTERVAL_SECS: u64 = 120;
|
||||
const PING_TIMEOUT_SECS: u64 = 30;
|
||||
|
||||
/// Parsed IRC message.
|
||||
struct IrcMessage {
|
||||
prefix: Option<String>, // nick!user@host
|
||||
command: String,
|
||||
params: Vec<String>,
|
||||
}
|
||||
|
||||
impl IrcMessage {
|
||||
fn parse(line: &str) -> Option<Self> {
|
||||
let line = line.trim_end_matches(|c| c == '\r' || c == '\n');
|
||||
if line.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let (prefix, rest) = if line.starts_with(':') {
|
||||
let space = line.find(' ')?;
|
||||
(Some(line[1..space].to_string()), &line[space + 1..])
|
||||
} else {
|
||||
(None, line)
|
||||
};
|
||||
|
||||
let (command_params, trailing) = if let Some(pos) = rest.find(" :") {
|
||||
(&rest[..pos], Some(rest[pos + 2..].to_string()))
|
||||
} else {
|
||||
(rest, None)
|
||||
};
|
||||
|
||||
let mut parts: Vec<String> = command_params
|
||||
.split_whitespace()
|
||||
.map(String::from)
|
||||
.collect();
|
||||
|
||||
if parts.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let command = parts.remove(0).to_uppercase();
|
||||
let mut params = parts;
|
||||
if let Some(t) = trailing {
|
||||
params.push(t);
|
||||
}
|
||||
|
||||
Some(IrcMessage {
|
||||
prefix,
|
||||
command,
|
||||
params,
|
||||
})
|
||||
}
|
||||
|
||||
/// Extract nick from prefix (nick!user@host → nick).
|
||||
fn nick(&self) -> Option<&str> {
|
||||
self.prefix
|
||||
.as_deref()
|
||||
.and_then(|p| p.split('!').next())
|
||||
}
|
||||
}
|
||||
|
||||
/// Shared IRC state, accessible from both the read task and RPC handlers.
|
||||
pub struct IrcState {
|
||||
pub config: IrcConfig,
|
||||
pub connected: bool,
|
||||
pub channels: Vec<String>,
|
||||
pub log: VecDeque<String>,
|
||||
writer: Option<WriterHandle>,
|
||||
}
|
||||
|
||||
/// Type-erased writer handle so we can store it without generic params.
|
||||
type WriterHandle = Box<dyn AsyncWriter>;
|
||||
|
||||
trait AsyncWriter {
|
||||
fn write_line(&mut self, line: &str) -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<()>> + '_>>;
|
||||
}
|
||||
|
||||
/// Writer over a TLS stream.
|
||||
struct TlsWriter {
|
||||
inner: tokio::io::WriteHalf<tokio_rustls::client::TlsStream<tokio::net::TcpStream>>,
|
||||
}
|
||||
|
||||
impl AsyncWriter for TlsWriter {
|
||||
fn write_line(&mut self, line: &str) -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<()>> + '_>> {
|
||||
let data = format!("{line}\r\n");
|
||||
Box::pin(async move {
|
||||
self.inner.write_all(data.as_bytes()).await
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Writer over a plain TCP stream.
|
||||
struct PlainWriter {
|
||||
inner: tokio::io::WriteHalf<tokio::net::TcpStream>,
|
||||
}
|
||||
|
||||
impl AsyncWriter for PlainWriter {
|
||||
fn write_line(&mut self, line: &str) -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<()>> + '_>> {
|
||||
let data = format!("{line}\r\n");
|
||||
Box::pin(async move {
|
||||
self.inner.write_all(data.as_bytes()).await
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl IrcState {
|
||||
fn new(config: IrcConfig) -> Self {
|
||||
Self {
|
||||
channels: config.channels.clone(),
|
||||
config,
|
||||
connected: false,
|
||||
log: VecDeque::with_capacity(MAX_LOG_LINES),
|
||||
writer: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn push_log(&mut self, line: &str) {
|
||||
if self.log.len() >= MAX_LOG_LINES {
|
||||
self.log.pop_front();
|
||||
}
|
||||
self.log.push_back(line.to_string());
|
||||
}
|
||||
|
||||
async fn send_raw(&mut self, line: &str) -> io::Result<()> {
|
||||
if let Some(ref mut w) = self.writer {
|
||||
w.write_line(line).await
|
||||
} else {
|
||||
Err(io::Error::new(io::ErrorKind::NotConnected, "not connected"))
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_privmsg(&mut self, target: &str, msg: &str) -> io::Result<()> {
|
||||
self.send_raw(&format!("PRIVMSG {target} :{msg}")).await
|
||||
}
|
||||
|
||||
async fn join(&mut self, channel: &str) -> io::Result<()> {
|
||||
self.send_raw(&format!("JOIN {channel}")).await?;
|
||||
if !self.channels.iter().any(|c| c == channel) {
|
||||
self.channels.push(channel.to_string());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn part(&mut self, channel: &str) -> io::Result<()> {
|
||||
self.send_raw(&format!("PART {channel}")).await?;
|
||||
self.channels.retain(|c| c != channel);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub type SharedIrc = Rc<RefCell<IrcState>>;
|
||||
|
||||
/// Start the IRC module. Returns the shared state handle.
|
||||
pub fn start(
|
||||
config: IrcConfig,
|
||||
notify_tx: mpsc::UnboundedSender<Notification>,
|
||||
daemon_config: Rc<RefCell<Config>>,
|
||||
) -> SharedIrc {
|
||||
let state = Rc::new(RefCell::new(IrcState::new(config)));
|
||||
let state_clone = state.clone();
|
||||
|
||||
tokio::task::spawn_local(async move {
|
||||
connection_loop(state_clone, notify_tx, daemon_config).await;
|
||||
});
|
||||
|
||||
state
|
||||
}
|
||||
|
||||
async fn connection_loop(
|
||||
state: SharedIrc,
|
||||
notify_tx: mpsc::UnboundedSender<Notification>,
|
||||
daemon_config: Rc<RefCell<Config>>,
|
||||
) {
|
||||
let mut backoff = RECONNECT_BASE_SECS;
|
||||
|
||||
loop {
|
||||
let config = state.borrow().config.clone();
|
||||
info!("irc: connecting to {}:{}", config.server, config.port);
|
||||
|
||||
match connect_and_run(&state, &config, ¬ify_tx).await {
|
||||
Ok(()) => {
|
||||
info!("irc: connection closed cleanly");
|
||||
}
|
||||
Err(e) => {
|
||||
error!("irc: connection error: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
// Reset backoff if we had a working connection (registered
|
||||
// successfully before disconnecting)
|
||||
let was_connected = state.borrow().connected;
|
||||
state.borrow_mut().connected = false;
|
||||
state.borrow_mut().writer = None;
|
||||
if was_connected {
|
||||
backoff = RECONNECT_BASE_SECS;
|
||||
}
|
||||
|
||||
// Persist current channel list to config
|
||||
{
|
||||
let channels = state.borrow().channels.clone();
|
||||
let mut dc = daemon_config.borrow_mut();
|
||||
dc.irc.channels = channels;
|
||||
dc.save();
|
||||
}
|
||||
|
||||
info!("irc: reconnecting in {backoff}s");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(backoff)).await;
|
||||
backoff = (backoff * 2).min(RECONNECT_MAX_SECS);
|
||||
}
|
||||
}
|
||||
|
||||
async fn connect_and_run(
|
||||
state: &SharedIrc,
|
||||
config: &IrcConfig,
|
||||
notify_tx: &mpsc::UnboundedSender<Notification>,
|
||||
) -> io::Result<()> {
|
||||
let addr = format!("{}:{}", config.server, config.port);
|
||||
let tcp = tokio::net::TcpStream::connect(&addr).await?;
|
||||
|
||||
if config.tls {
|
||||
let tls_config = rustls::ClientConfig::builder_with_provider(
|
||||
rustls::crypto::ring::default_provider().into(),
|
||||
)
|
||||
.with_safe_default_protocol_versions()
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
|
||||
.with_root_certificates(root_certs())
|
||||
.with_no_client_auth();
|
||||
let connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config));
|
||||
let server_name = rustls::pki_types::ServerName::try_from(config.server.clone())
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
|
||||
let tls_stream = connector.connect(server_name, tcp).await?;
|
||||
|
||||
let (reader, writer) = tokio::io::split(tls_stream);
|
||||
state.borrow_mut().writer = Some(Box::new(TlsWriter { inner: writer }));
|
||||
|
||||
let buf_reader = BufReader::new(reader);
|
||||
register_and_read(state, config, buf_reader, notify_tx).await
|
||||
} else {
|
||||
let (reader, writer) = tokio::io::split(tcp);
|
||||
state.borrow_mut().writer = Some(Box::new(PlainWriter { inner: writer }));
|
||||
|
||||
let buf_reader = BufReader::new(reader);
|
||||
register_and_read(state, config, buf_reader, notify_tx).await
|
||||
}
|
||||
}
|
||||
|
||||
async fn register_and_read<R: tokio::io::AsyncRead + Unpin>(
|
||||
state: &SharedIrc,
|
||||
config: &IrcConfig,
|
||||
mut reader: BufReader<R>,
|
||||
notify_tx: &mpsc::UnboundedSender<Notification>,
|
||||
) -> io::Result<()> {
|
||||
// Register
|
||||
{
|
||||
let mut s = state.borrow_mut();
|
||||
s.send_raw(&format!("NICK {}", config.nick)).await?;
|
||||
s.send_raw(&format!("USER {} 0 * :{}", config.user, config.realname)).await?;
|
||||
}
|
||||
|
||||
let mut buf = Vec::new();
|
||||
let mut ping_sent = false;
|
||||
let mut deadline = tokio::time::Instant::now()
|
||||
+ std::time::Duration::from_secs(PING_INTERVAL_SECS);
|
||||
|
||||
loop {
|
||||
buf.clear();
|
||||
|
||||
let read_result = tokio::select! {
|
||||
result = reader.read_until(b'\n', &mut buf) => result,
|
||||
_ = tokio::time::sleep_until(deadline) => {
|
||||
if ping_sent {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::TimedOut,
|
||||
"ping timeout — no response from server",
|
||||
));
|
||||
}
|
||||
info!("irc: no data for {}s, sending PING", PING_INTERVAL_SECS);
|
||||
state.borrow_mut().send_raw("PING :keepalive").await?;
|
||||
ping_sent = true;
|
||||
deadline = tokio::time::Instant::now()
|
||||
+ std::time::Duration::from_secs(PING_TIMEOUT_SECS);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let n = read_result?;
|
||||
if n == 0 { break; }
|
||||
|
||||
// Any data from server resets the ping timer
|
||||
ping_sent = false;
|
||||
deadline = tokio::time::Instant::now()
|
||||
+ std::time::Duration::from_secs(PING_INTERVAL_SECS);
|
||||
|
||||
// IRC is not guaranteed UTF-8 — lossy conversion handles Latin-1 etc.
|
||||
let line = String::from_utf8_lossy(&buf).trim_end().to_string();
|
||||
if line.is_empty() { continue; }
|
||||
let msg = match IrcMessage::parse(&line) {
|
||||
Some(m) => m,
|
||||
None => continue,
|
||||
};
|
||||
|
||||
match msg.command.as_str() {
|
||||
"PING" => {
|
||||
let arg = msg.params.first().map(|s| s.as_str()).unwrap_or("");
|
||||
state.borrow_mut().send_raw(&format!("PONG :{arg}")).await?;
|
||||
}
|
||||
|
||||
// RPL_WELCOME — registration complete
|
||||
"001" => {
|
||||
info!("irc: registered as {}", config.nick);
|
||||
state.borrow_mut().connected = true;
|
||||
|
||||
// Join configured channels
|
||||
let channels = state.borrow().channels.clone();
|
||||
for ch in &channels {
|
||||
if let Err(e) = state.borrow_mut().send_raw(&format!("JOIN {ch}")).await {
|
||||
warn!("irc: failed to join {ch}: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"PRIVMSG" => {
|
||||
let target = msg.params.first().map(|s| s.as_str()).unwrap_or("");
|
||||
let text = msg.params.get(1).map(|s| s.as_str()).unwrap_or("");
|
||||
let nick = msg.nick().unwrap_or("unknown");
|
||||
|
||||
// Handle CTCP requests (wrapped in \x01)
|
||||
if text.starts_with('\x01') && text.ends_with('\x01') {
|
||||
let ctcp = &text[1..text.len()-1];
|
||||
if ctcp.starts_with("VERSION") {
|
||||
let reply = format!(
|
||||
"NOTICE {nick} :\x01VERSION poc-daemon 0.4.0\x01"
|
||||
);
|
||||
state.borrow_mut().send_raw(&reply).await.ok();
|
||||
}
|
||||
// Don't generate notifications for CTCP
|
||||
continue;
|
||||
}
|
||||
|
||||
// Log the message
|
||||
let log_line = if target.starts_with('#') {
|
||||
format!("[{}] <{}> {}", target, nick, text)
|
||||
} else {
|
||||
format!("[PM:{nick}] {text}")
|
||||
};
|
||||
state.borrow_mut().push_log(&log_line);
|
||||
|
||||
// Write to per-channel/per-user log file
|
||||
if target.starts_with('#') {
|
||||
append_log(target, nick, text);
|
||||
} else {
|
||||
append_log(&format!("pm-{nick}"), nick, text);
|
||||
}
|
||||
|
||||
// Generate notification
|
||||
let (ntype, urgency) = classify_privmsg(
|
||||
nick,
|
||||
target,
|
||||
text,
|
||||
&config.nick,
|
||||
);
|
||||
|
||||
let _ = notify_tx.send(Notification {
|
||||
ntype,
|
||||
urgency,
|
||||
message: log_line,
|
||||
timestamp: now(),
|
||||
});
|
||||
}
|
||||
|
||||
// Nick in use
|
||||
"433" => {
|
||||
let alt = format!("{}_", config.nick);
|
||||
warn!("irc: nick in use, trying {alt}");
|
||||
state.borrow_mut().send_raw(&format!("NICK {alt}")).await?;
|
||||
}
|
||||
|
||||
"JOIN" | "PART" | "QUIT" | "KICK" | "MODE" | "TOPIC" | "NOTICE" => {
|
||||
// Could log these, but skip for now
|
||||
}
|
||||
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Classify a PRIVMSG into notification type and urgency.
|
||||
fn classify_privmsg(nick: &str, target: &str, text: &str, my_nick: &str) -> (String, u8) {
|
||||
let my_nick_lower = my_nick.to_lowercase();
|
||||
let text_lower = text.to_lowercase();
|
||||
|
||||
if !target.starts_with('#') {
|
||||
// Private message
|
||||
(format!("irc.pm.{nick}"), crate::notify::URGENT)
|
||||
} else if text_lower.contains(&my_nick_lower) {
|
||||
// Mentioned in channel
|
||||
(format!("irc.mention.{nick}"), crate::notify::NORMAL)
|
||||
} else {
|
||||
// Regular channel message
|
||||
let channel = target.trim_start_matches('#');
|
||||
(format!("irc.channel.{channel}"), crate::notify::AMBIENT)
|
||||
}
|
||||
}
|
||||
|
||||
/// Append a message to the per-channel or per-user log file.
|
||||
/// Logs go to ~/.claude/irc/logs/{target}.log (e.g. #bcachefs.log, pm-kent.log)
|
||||
fn append_log(target: &str, nick: &str, text: &str) {
|
||||
use std::io::Write;
|
||||
// Sanitize target for filename (strip leading #, lowercase)
|
||||
let filename = format!("{}.log", target.trim_start_matches('#').to_lowercase());
|
||||
let dir = home().join(".claude/irc/logs");
|
||||
let _ = std::fs::create_dir_all(&dir);
|
||||
if let Ok(mut f) = std::fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(dir.join(&filename))
|
||||
{
|
||||
let secs = now() as u64;
|
||||
let _ = writeln!(f, "{secs} <{nick}> {text}");
|
||||
}
|
||||
}
|
||||
|
||||
fn root_certs() -> rustls::RootCertStore {
|
||||
let mut roots = rustls::RootCertStore::empty();
|
||||
roots.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
|
||||
roots
|
||||
}
|
||||
|
||||
/// Handle a runtime command from RPC.
|
||||
pub async fn handle_command(
|
||||
state: &SharedIrc,
|
||||
daemon_config: &Rc<RefCell<Config>>,
|
||||
cmd: &str,
|
||||
args: &[String],
|
||||
) -> Result<String, String> {
|
||||
match cmd {
|
||||
"join" => {
|
||||
let channel = args.first().ok_or("usage: irc join <channel>")?;
|
||||
let channel = if channel.starts_with('#') {
|
||||
channel.clone()
|
||||
} else {
|
||||
format!("#{channel}")
|
||||
};
|
||||
state
|
||||
.borrow_mut()
|
||||
.join(&channel)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
// Persist
|
||||
let mut dc = daemon_config.borrow_mut();
|
||||
if !dc.irc.channels.contains(&channel) {
|
||||
dc.irc.channels.push(channel.clone());
|
||||
}
|
||||
dc.save();
|
||||
|
||||
Ok(format!("joined {channel}"))
|
||||
}
|
||||
"leave" | "part" => {
|
||||
let channel = args.first().ok_or("usage: irc leave <channel>")?;
|
||||
let channel = if channel.starts_with('#') {
|
||||
channel.clone()
|
||||
} else {
|
||||
format!("#{channel}")
|
||||
};
|
||||
state
|
||||
.borrow_mut()
|
||||
.part(&channel)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
// Persist
|
||||
let mut dc = daemon_config.borrow_mut();
|
||||
dc.irc.channels.retain(|c| c != &channel);
|
||||
dc.save();
|
||||
|
||||
Ok(format!("left {channel}"))
|
||||
}
|
||||
"send" | "msg" => {
|
||||
if args.len() < 2 {
|
||||
return Err("usage: irc send <target> <message>".into());
|
||||
}
|
||||
let target = &args[0];
|
||||
if target.starts_with('#') {
|
||||
let s = state.borrow();
|
||||
if !s.channels.iter().any(|c| c == target) {
|
||||
return Err(format!(
|
||||
"not in channel {target} (joined: {})",
|
||||
s.channels.join(", ")
|
||||
));
|
||||
}
|
||||
}
|
||||
let msg = args[1..].join(" ");
|
||||
let nick = state.borrow().config.nick.clone();
|
||||
state
|
||||
.borrow_mut()
|
||||
.send_privmsg(target, &msg)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
append_log(target, &nick, &msg);
|
||||
Ok(format!("sent to {target}"))
|
||||
}
|
||||
"status" => {
|
||||
let s = state.borrow();
|
||||
Ok(format!(
|
||||
"connected={} channels={} log_lines={} nick={}",
|
||||
s.connected,
|
||||
s.channels.join(","),
|
||||
s.log.len(),
|
||||
s.config.nick,
|
||||
))
|
||||
}
|
||||
"log" => {
|
||||
let n: usize = args
|
||||
.first()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(15);
|
||||
let s = state.borrow();
|
||||
let lines: Vec<&String> = s.log.iter().rev().take(n).collect();
|
||||
let mut lines: Vec<&str> = lines.iter().map(|s| s.as_str()).collect();
|
||||
lines.reverse();
|
||||
Ok(lines.join("\n"))
|
||||
}
|
||||
"nick" => {
|
||||
let new_nick = args.first().ok_or("usage: irc nick <newnick>")?;
|
||||
state
|
||||
.borrow_mut()
|
||||
.send_raw(&format!("NICK {new_nick}"))
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
let mut dc = daemon_config.borrow_mut();
|
||||
dc.irc.nick = new_nick.clone();
|
||||
dc.save();
|
||||
|
||||
Ok(format!("nick → {new_nick}"))
|
||||
}
|
||||
_ => Err(format!(
|
||||
"unknown irc command: {cmd}\n\
|
||||
commands: join, leave, send, status, log, nick"
|
||||
)),
|
||||
}
|
||||
}
|
||||
2
poc-daemon/src/modules/mod.rs
Normal file
2
poc-daemon/src/modules/mod.rs
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
pub mod irc;
|
||||
pub mod telegram;
|
||||
374
poc-daemon/src/modules/telegram.rs
Normal file
374
poc-daemon/src/modules/telegram.rs
Normal file
|
|
@ -0,0 +1,374 @@
|
|||
// Telegram module.
|
||||
//
|
||||
// Long-polls the Telegram Bot API for messages from Kent's chat.
|
||||
// Downloads media (photos, voice, documents) to local files.
|
||||
// Sends text and files. Notifications flow through mpsc into the
|
||||
// daemon's main state.
|
||||
//
|
||||
// Only accepts messages from the configured chat_id (prompt
|
||||
// injection defense — other senders get a "private bot" reply).
|
||||
|
||||
use crate::config::{Config, TelegramConfig};
|
||||
use crate::notify::Notification;
|
||||
use crate::{home, now};
|
||||
use std::cell::RefCell;
|
||||
use std::collections::VecDeque;
|
||||
use std::path::PathBuf;
|
||||
use std::rc::Rc;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{error, info};
|
||||
|
||||
const MAX_LOG_LINES: usize = 100;
|
||||
const POLL_TIMEOUT: u64 = 30;
|
||||
|
||||
pub struct TelegramState {
|
||||
pub config: TelegramConfig,
|
||||
pub connected: bool,
|
||||
pub log: VecDeque<String>,
|
||||
pub last_offset: i64,
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
pub type SharedTelegram = Rc<RefCell<TelegramState>>;
|
||||
|
||||
impl TelegramState {
|
||||
fn new(config: TelegramConfig) -> Self {
|
||||
let last_offset = load_offset();
|
||||
Self {
|
||||
config,
|
||||
connected: false,
|
||||
log: VecDeque::with_capacity(MAX_LOG_LINES),
|
||||
last_offset,
|
||||
client: reqwest::Client::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn push_log(&mut self, line: &str) {
|
||||
if self.log.len() >= MAX_LOG_LINES {
|
||||
self.log.pop_front();
|
||||
}
|
||||
self.log.push_back(line.to_string());
|
||||
}
|
||||
|
||||
fn api_url(&self, method: &str) -> String {
|
||||
format!(
|
||||
"https://api.telegram.org/bot{}/{}",
|
||||
self.config.token, method
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn offset_path() -> PathBuf {
|
||||
home().join(".claude/telegram/last_offset")
|
||||
}
|
||||
|
||||
fn load_offset() -> i64 {
|
||||
std::fs::read_to_string(offset_path())
|
||||
.ok()
|
||||
.and_then(|s| s.trim().parse().ok())
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
fn save_offset(offset: i64) {
|
||||
let _ = std::fs::write(offset_path(), offset.to_string());
|
||||
}
|
||||
|
||||
fn history_path() -> PathBuf {
|
||||
home().join(".claude/telegram/history.log")
|
||||
}
|
||||
|
||||
fn media_dir() -> PathBuf {
|
||||
home().join(".claude/telegram/media")
|
||||
}
|
||||
|
||||
fn append_history(line: &str) {
|
||||
use std::io::Write;
|
||||
if let Ok(mut f) = std::fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(history_path())
|
||||
{
|
||||
let _ = writeln!(f, "{}", line);
|
||||
}
|
||||
}
|
||||
|
||||
/// Start the Telegram module. Returns the shared state handle.
|
||||
pub fn start(
|
||||
config: TelegramConfig,
|
||||
notify_tx: mpsc::UnboundedSender<Notification>,
|
||||
_daemon_config: Rc<RefCell<Config>>,
|
||||
) -> SharedTelegram {
|
||||
let state = Rc::new(RefCell::new(TelegramState::new(config)));
|
||||
let state_clone = state.clone();
|
||||
|
||||
tokio::task::spawn_local(async move {
|
||||
poll_loop(state_clone, notify_tx).await;
|
||||
});
|
||||
|
||||
state
|
||||
}
|
||||
|
||||
async fn poll_loop(
|
||||
state: SharedTelegram,
|
||||
notify_tx: mpsc::UnboundedSender<Notification>,
|
||||
) {
|
||||
let _ = std::fs::create_dir_all(media_dir());
|
||||
|
||||
loop {
|
||||
match poll_once(&state, ¬ify_tx).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
error!("telegram: poll error: {e}");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn poll_once(
|
||||
state: &SharedTelegram,
|
||||
notify_tx: &mpsc::UnboundedSender<Notification>,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let (url, chat_id, token) = {
|
||||
let s = state.borrow();
|
||||
let url = format!(
|
||||
"{}?offset={}&timeout={}",
|
||||
s.api_url("getUpdates"),
|
||||
s.last_offset,
|
||||
POLL_TIMEOUT,
|
||||
);
|
||||
(url, s.config.chat_id, s.config.token.clone())
|
||||
};
|
||||
|
||||
let client = state.borrow().client.clone();
|
||||
let resp: serde_json::Value = client
|
||||
.get(&url)
|
||||
.timeout(std::time::Duration::from_secs(POLL_TIMEOUT + 5))
|
||||
.send()
|
||||
.await?
|
||||
.json()
|
||||
.await?;
|
||||
|
||||
if !state.borrow().connected {
|
||||
state.borrow_mut().connected = true;
|
||||
info!("telegram: connected");
|
||||
}
|
||||
|
||||
let results = resp["result"].as_array();
|
||||
let results = match results {
|
||||
Some(r) => r,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
for update in results {
|
||||
let update_id = update["update_id"].as_i64().unwrap_or(0);
|
||||
let msg = &update["message"];
|
||||
|
||||
// Update offset
|
||||
{
|
||||
let mut s = state.borrow_mut();
|
||||
s.last_offset = update_id + 1;
|
||||
save_offset(s.last_offset);
|
||||
}
|
||||
|
||||
let msg_chat_id = msg["chat"]["id"].as_i64().unwrap_or(0);
|
||||
if msg_chat_id != chat_id {
|
||||
// Reject messages from unknown chats
|
||||
let reject_url = format!(
|
||||
"https://api.telegram.org/bot{}/sendMessage",
|
||||
token
|
||||
);
|
||||
let _ = client
|
||||
.post(&reject_url)
|
||||
.form(&[
|
||||
("chat_id", msg_chat_id.to_string()),
|
||||
("text", "This is a private bot.".to_string()),
|
||||
])
|
||||
.send()
|
||||
.await;
|
||||
continue;
|
||||
}
|
||||
|
||||
let sender = msg["from"]["first_name"]
|
||||
.as_str()
|
||||
.unwrap_or("unknown")
|
||||
.to_string();
|
||||
|
||||
// Handle different message types
|
||||
if let Some(text) = msg["text"].as_str() {
|
||||
let log_line = format!("[{}] {}", sender, text);
|
||||
state.borrow_mut().push_log(&log_line);
|
||||
|
||||
let ts = timestamp();
|
||||
append_history(&format!("{ts} [{sender}] {text}"));
|
||||
|
||||
let _ = notify_tx.send(Notification {
|
||||
ntype: format!("telegram.{}", sender.to_lowercase()),
|
||||
urgency: crate::notify::NORMAL,
|
||||
message: log_line,
|
||||
timestamp: now(),
|
||||
});
|
||||
} else if let Some(photos) = msg["photo"].as_array() {
|
||||
// Pick largest photo
|
||||
let best = photos.iter().max_by_key(|p| p["file_size"].as_i64().unwrap_or(0));
|
||||
if let Some(photo) = best {
|
||||
if let Some(file_id) = photo["file_id"].as_str() {
|
||||
let caption = msg["caption"].as_str().unwrap_or("");
|
||||
let local = download_file(&client, &token, file_id, ".jpg").await;
|
||||
let display = match &local {
|
||||
Some(p) => format!("[photo: {}]{}", p.display(), if caption.is_empty() { String::new() } else { format!(" {caption}") }),
|
||||
None => format!("[photo]{}", if caption.is_empty() { String::new() } else { format!(" {caption}") }),
|
||||
};
|
||||
let log_line = format!("[{}] {}", sender, display);
|
||||
state.borrow_mut().push_log(&log_line);
|
||||
let ts = timestamp();
|
||||
append_history(&format!("{ts} [{sender}] {display}"));
|
||||
|
||||
let _ = notify_tx.send(Notification {
|
||||
ntype: format!("telegram.{}", sender.to_lowercase()),
|
||||
urgency: crate::notify::NORMAL,
|
||||
message: log_line,
|
||||
timestamp: now(),
|
||||
});
|
||||
}
|
||||
}
|
||||
} else if msg["voice"].is_object() {
|
||||
if let Some(file_id) = msg["voice"]["file_id"].as_str() {
|
||||
let caption = msg["caption"].as_str().unwrap_or("");
|
||||
let local = download_file(&client, &token, file_id, ".ogg").await;
|
||||
let display = match &local {
|
||||
Some(p) => format!("[voice: {}]{}", p.display(), if caption.is_empty() { String::new() } else { format!(" {caption}") }),
|
||||
None => format!("[voice]{}", if caption.is_empty() { String::new() } else { format!(" {caption}") }),
|
||||
};
|
||||
let log_line = format!("[{}] {}", sender, display);
|
||||
state.borrow_mut().push_log(&log_line);
|
||||
let ts = timestamp();
|
||||
append_history(&format!("{ts} [{sender}] {display}"));
|
||||
|
||||
let _ = notify_tx.send(Notification {
|
||||
ntype: format!("telegram.{}", sender.to_lowercase()),
|
||||
urgency: crate::notify::NORMAL,
|
||||
message: log_line,
|
||||
timestamp: now(),
|
||||
});
|
||||
}
|
||||
} else if msg["document"].is_object() {
|
||||
if let Some(file_id) = msg["document"]["file_id"].as_str() {
|
||||
let fname = msg["document"]["file_name"].as_str().unwrap_or("file");
|
||||
let caption = msg["caption"].as_str().unwrap_or("");
|
||||
let local = download_file(&client, &token, file_id, "").await;
|
||||
let display = match &local {
|
||||
Some(p) => format!("[doc: {} -> {}]{}", fname, p.display(), if caption.is_empty() { String::new() } else { format!(" {caption}") }),
|
||||
None => format!("[doc: {}]{}", fname, if caption.is_empty() { String::new() } else { format!(" {caption}") }),
|
||||
};
|
||||
let log_line = format!("[{}] {}", sender, display);
|
||||
state.borrow_mut().push_log(&log_line);
|
||||
let ts = timestamp();
|
||||
append_history(&format!("{ts} [{sender}] {display}"));
|
||||
|
||||
let _ = notify_tx.send(Notification {
|
||||
ntype: format!("telegram.{}", sender.to_lowercase()),
|
||||
urgency: crate::notify::NORMAL,
|
||||
message: log_line,
|
||||
timestamp: now(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn download_file(
|
||||
client: &reqwest::Client,
|
||||
token: &str,
|
||||
file_id: &str,
|
||||
ext: &str,
|
||||
) -> Option<PathBuf> {
|
||||
let url = format!("https://api.telegram.org/bot{token}/getFile?file_id={file_id}");
|
||||
let resp: serde_json::Value = client.get(&url).send().await.ok()?.json().await.ok()?;
|
||||
let file_path = resp["result"]["file_path"].as_str()?;
|
||||
|
||||
let download_url = format!("https://api.telegram.org/file/bot{token}/{file_path}");
|
||||
let bytes = client.get(&download_url).send().await.ok()?.bytes().await.ok()?;
|
||||
|
||||
let basename = std::path::Path::new(file_path)
|
||||
.file_name()
|
||||
.and_then(|n| n.to_str())
|
||||
.unwrap_or("file");
|
||||
let local_name = if ext.is_empty() {
|
||||
basename.to_string()
|
||||
} else {
|
||||
let stem = std::path::Path::new(basename)
|
||||
.file_stem()
|
||||
.and_then(|s| s.to_str())
|
||||
.unwrap_or("file");
|
||||
format!("{}{}", stem, ext)
|
||||
};
|
||||
let secs = now() as u64;
|
||||
let local_path = media_dir().join(format!("{secs}_{local_name}"));
|
||||
std::fs::write(&local_path, &bytes).ok()?;
|
||||
Some(local_path)
|
||||
}
|
||||
|
||||
fn timestamp() -> String {
|
||||
// Use the same unix seconds approach as IRC module
|
||||
format!("{}", now() as u64)
|
||||
}
|
||||
|
||||
/// Handle a runtime command from RPC.
|
||||
pub async fn handle_command(
|
||||
state: &SharedTelegram,
|
||||
_daemon_config: &Rc<RefCell<Config>>,
|
||||
cmd: &str,
|
||||
args: &[String],
|
||||
) -> Result<String, String> {
|
||||
match cmd {
|
||||
"send" => {
|
||||
let msg = args.join(" ");
|
||||
if msg.is_empty() {
|
||||
return Err("usage: telegram send <message>".into());
|
||||
}
|
||||
let (url, client) = {
|
||||
let s = state.borrow();
|
||||
(s.api_url("sendMessage"), s.client.clone())
|
||||
};
|
||||
let chat_id = state.borrow().config.chat_id.to_string();
|
||||
client
|
||||
.post(&url)
|
||||
.form(&[("chat_id", chat_id.as_str()), ("text", msg.as_str())])
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
let ts = timestamp();
|
||||
append_history(&format!("{ts} [ProofOfConcept] {msg}"));
|
||||
|
||||
Ok("sent".to_string())
|
||||
}
|
||||
"status" => {
|
||||
let s = state.borrow();
|
||||
Ok(format!(
|
||||
"connected={} log_lines={} offset={}",
|
||||
s.connected,
|
||||
s.log.len(),
|
||||
s.last_offset,
|
||||
))
|
||||
}
|
||||
"log" => {
|
||||
let n: usize = args
|
||||
.first()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(15);
|
||||
let s = state.borrow();
|
||||
let lines: Vec<&String> = s.log.iter().rev().take(n).collect();
|
||||
let mut lines: Vec<&str> = lines.iter().map(|s| s.as_str()).collect();
|
||||
lines.reverse();
|
||||
Ok(lines.join("\n"))
|
||||
}
|
||||
_ => Err(format!(
|
||||
"unknown telegram command: {cmd}\n\
|
||||
commands: send, status, log"
|
||||
)),
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue