consciousness/channels/irc/src/main.rs

657 lines
22 KiB
Rust
Raw Normal View History

// channel-irc — Standalone IRC channel daemon
//
// Maintains a persistent TLS connection to an IRC server, parses
// incoming messages, and serves them over the channel.capnp protocol
// on a Unix socket at ~/.consciousness/channels/irc.sock.
//
// Runs independently of the consciousness binary so restarts don't
// kill the IRC connection. Reconnects automatically with exponential
// backoff. Supports multiple simultaneous capnp clients.
//
// Config: ~/.consciousness/channels/irc.json5
// Socket: ~/.consciousness/channels/irc.sock
use std::cell::RefCell;
use std::collections::VecDeque;
use std::io;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc;
use capnp::capability::Promise;
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem};
use futures::AsyncReadExt;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixListener;
use tokio_util::compat::TokioAsyncReadCompatExt;
use log::{info, warn, error};
use poc_memory::channel_capnp::{channel_client, channel_server};
use poc_memory::thalamus::channel_log;
// ── Constants ──────────────────────────────────────────────────
const MAX_HISTORY: usize = 1000;
const RECONNECT_BASE_SECS: u64 = 5;
const RECONNECT_MAX_SECS: u64 = 300;
const PING_INTERVAL_SECS: u64 = 120;
const PING_TIMEOUT_SECS: u64 = 30;
// Urgency levels (matching thalamus/notify.rs)
const AMBIENT: u8 = 0;
const NORMAL: u8 = 2;
const URGENT: u8 = 3;
// ── Config ─────────────────────────────────────────────────────
#[derive(Clone, serde::Deserialize)]
struct Config {
server: String,
port: u16,
#[serde(default = "default_true")]
tls: bool,
nick: String,
channels: Vec<String>,
#[serde(default)]
password: Option<String>,
#[serde(default)]
nickserv_pass: Option<String>,
}
fn default_true() -> bool { true }
fn load_config() -> Config {
let path = dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/channels/irc.json5");
let text = std::fs::read_to_string(&path)
.unwrap_or_else(|e| panic!("failed to read {}: {e}", path.display()));
json5::from_str(&text)
.unwrap_or_else(|e| panic!("failed to parse {}: {e}", path.display()))
}
// ── IRC Message Parsing ────────────────────────────────────────
struct IrcMessage {
prefix: Option<String>,
command: String,
params: Vec<String>,
}
impl IrcMessage {
fn parse(line: &str) -> Option<Self> {
let line = line.trim_end_matches(|c| c == '\r' || c == '\n');
if line.is_empty() {
return None;
}
let (prefix, rest) = if line.starts_with(':') {
let space = line.find(' ')?;
(Some(line[1..space].to_string()), &line[space + 1..])
} else {
(None, line)
};
let (command_params, trailing) = if let Some(pos) = rest.find(" :") {
(&rest[..pos], Some(rest[pos + 2..].to_string()))
} else {
(rest, None)
};
let mut parts: Vec<String> = command_params
.split_whitespace()
.map(String::from)
.collect();
if parts.is_empty() {
return None;
}
let command = parts.remove(0).to_uppercase();
let mut params = parts;
if let Some(t) = trailing {
params.push(t);
}
Some(IrcMessage { prefix, command, params })
}
fn nick(&self) -> Option<&str> {
self.prefix.as_deref().and_then(|p| p.split('!').next())
}
}
// ── Writer Abstraction ─────────────────────────────────────────
type WriterHandle = Box<dyn AsyncWriter>;
trait AsyncWriter {
fn write_line(
&mut self,
line: &str,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<()>> + '_>>;
}
struct TlsWriter {
inner: tokio::io::WriteHalf<tokio_rustls::client::TlsStream<tokio::net::TcpStream>>,
}
impl AsyncWriter for TlsWriter {
fn write_line(
&mut self,
line: &str,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<()>> + '_>> {
let data = format!("{line}\r\n");
Box::pin(async move { self.inner.write_all(data.as_bytes()).await })
}
}
struct PlainWriter {
inner: tokio::io::WriteHalf<tokio::net::TcpStream>,
}
impl AsyncWriter for PlainWriter {
fn write_line(
&mut self,
line: &str,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<()>> + '_>> {
let data = format!("{line}\r\n");
Box::pin(async move { self.inner.write_all(data.as_bytes()).await })
}
}
// ── State ──────────────────────────────────────────────────────
use poc_memory::thalamus::channel_log::ChannelLog;
struct State {
config: Config,
/// Per-channel message logs (keyed by channel path, e.g. "irc.#bcachefs")
channel_logs: std::collections::BTreeMap<String, ChannelLog>,
/// Currently joined channels
channels: Vec<String>,
connected: bool,
/// IRC writer handle (None when disconnected)
writer: Option<WriterHandle>,
/// Registered notification callbacks
subscribers: Vec<channel_client::Client>,
}
type SharedState = Rc<RefCell<State>>;
impl State {
fn new(config: Config) -> Self {
let channels = config.channels.clone();
Self {
config,
channel_logs: std::collections::BTreeMap::new(),
channels,
connected: false,
writer: None,
subscribers: Vec::new(),
}
}
fn push_message(&mut self, line: String, urgency: u8, channel: &str) {
// Store in per-channel log
self.channel_logs
.entry(channel.to_string())
.or_insert_with(ChannelLog::new)
.push(line.clone());
// Notify all subscribers
let preview = line.chars().take(80).collect::<String>();
for sub in &self.subscribers {
let mut req = sub.notify_request();
let mut list = req.get().init_notifications(1);
let mut n = list.reborrow().get(0);
n.set_channel(channel);
n.set_urgency(urgency);
n.set_preview(&preview);
n.set_count(1);
tokio::task::spawn_local(async move {
let _ = req.send().promise.await;
});
}
}
async fn send_raw(&mut self, line: &str) -> io::Result<()> {
if let Some(ref mut w) = self.writer {
w.write_line(line).await
} else {
Err(io::Error::new(io::ErrorKind::NotConnected, "irc: not connected"))
}
}
async fn send_privmsg(&mut self, target: &str, msg: &str) -> io::Result<()> {
self.send_raw(&format!("PRIVMSG {target} :{msg}")).await
}
}
// ── Persistence ────────────────────────────────────────────────
fn log_dir() -> PathBuf {
channel_log::log_dir("irc")
}
fn append_log(target: &str, nick: &str, text: &str) {
channel_log::append_disk_log(&log_dir(), target, nick, text);
}
fn now() -> f64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64()
}
// ── TLS ────────────────────────────────────────────────────────
fn root_certs() -> rustls::RootCertStore {
let mut roots = rustls::RootCertStore::empty();
roots.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
roots
}
// ── IRC Connection Loop ────────────────────────────────────────
async fn connection_loop(state: SharedState) {
let _ = std::fs::create_dir_all(log_dir());
let mut backoff = RECONNECT_BASE_SECS;
loop {
let config = state.borrow().config.clone();
info!("irc: connecting to {}:{}", config.server, config.port);
match connect_and_run(&state, &config).await {
Ok(()) => info!("irc: connection closed cleanly"),
Err(e) => error!("irc: connection error: {e}"),
}
let was_connected = state.borrow().connected;
{
let mut s = state.borrow_mut();
s.connected = false;
s.writer = None;
}
if was_connected {
backoff = RECONNECT_BASE_SECS;
}
info!("irc: reconnecting in {backoff}s");
tokio::time::sleep(std::time::Duration::from_secs(backoff)).await;
backoff = (backoff * 2).min(RECONNECT_MAX_SECS);
}
}
async fn connect_and_run(state: &SharedState, config: &Config) -> io::Result<()> {
let addr = format!("{}:{}", config.server, config.port);
let tcp = tokio::net::TcpStream::connect(&addr).await?;
if config.tls {
let tls_config = rustls::ClientConfig::builder_with_provider(
rustls::crypto::ring::default_provider().into(),
)
.with_safe_default_protocol_versions()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
.with_root_certificates(root_certs())
.with_no_client_auth();
let connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config));
let server_name = rustls::pki_types::ServerName::try_from(config.server.clone())
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
let tls_stream = connector.connect(server_name, tcp).await?;
let (reader, writer) = tokio::io::split(tls_stream);
state.borrow_mut().writer = Some(Box::new(TlsWriter { inner: writer }));
register_and_read(state, config, BufReader::new(reader)).await
} else {
let (reader, writer) = tokio::io::split(tcp);
state.borrow_mut().writer = Some(Box::new(PlainWriter { inner: writer }));
register_and_read(state, config, BufReader::new(reader)).await
}
}
async fn register_and_read<R: tokio::io::AsyncRead + Unpin>(
state: &SharedState,
config: &Config,
mut reader: BufReader<R>,
) -> io::Result<()> {
// Send PASS if configured
if let Some(ref pass) = config.password {
state.borrow_mut().send_raw(&format!("PASS {pass}")).await?;
}
// Register with nick and user
{
let mut s = state.borrow_mut();
s.send_raw(&format!("NICK {}", config.nick)).await?;
s.send_raw(&format!("USER {} 0 * :{}", config.nick, config.nick)).await?;
}
let mut buf = Vec::new();
let mut ping_sent = false;
let mut deadline = tokio::time::Instant::now()
+ std::time::Duration::from_secs(PING_INTERVAL_SECS);
loop {
buf.clear();
let read_result = tokio::select! {
result = reader.read_until(b'\n', &mut buf) => result,
_ = tokio::time::sleep_until(deadline) => {
if ping_sent {
return Err(io::Error::new(
io::ErrorKind::TimedOut,
"ping timeout -- no response from server",
));
}
info!("irc: no data for {PING_INTERVAL_SECS}s, sending PING");
state.borrow_mut().send_raw("PING :keepalive").await?;
ping_sent = true;
deadline = tokio::time::Instant::now()
+ std::time::Duration::from_secs(PING_TIMEOUT_SECS);
continue;
}
};
let n = read_result?;
if n == 0 {
break;
}
// Any data resets the ping timer
ping_sent = false;
deadline = tokio::time::Instant::now()
+ std::time::Duration::from_secs(PING_INTERVAL_SECS);
// IRC is not guaranteed UTF-8
let line = String::from_utf8_lossy(&buf).trim_end().to_string();
if line.is_empty() {
continue;
}
let msg = match IrcMessage::parse(&line) {
Some(m) => m,
None => continue,
};
match msg.command.as_str() {
"PING" => {
let arg = msg.params.first().map(|s| s.as_str()).unwrap_or("");
state.borrow_mut().send_raw(&format!("PONG :{arg}")).await?;
}
// RPL_WELCOME -- registration complete
"001" => {
info!("irc: registered as {}", config.nick);
state.borrow_mut().connected = true;
// NickServ auth
if let Some(ref pass) = config.nickserv_pass {
state.borrow_mut()
.send_privmsg("NickServ", &format!("IDENTIFY {pass}"))
.await?;
}
// Join configured channels
let channels = state.borrow().channels.clone();
for ch in &channels {
if let Err(e) = state.borrow_mut().send_raw(&format!("JOIN {ch}")).await {
warn!("irc: failed to join {ch}: {e}");
}
// Create log entry so channel appears in list()
let key = format!("irc.{ch}");
state.borrow_mut().channel_logs
.entry(key)
.or_insert_with(ChannelLog::new);
}
}
"PRIVMSG" => {
let target = msg.params.first().map(|s| s.as_str()).unwrap_or("");
let text = msg.params.get(1).map(|s| s.as_str()).unwrap_or("");
let nick = msg.nick().unwrap_or("unknown");
// Handle CTCP requests
if text.starts_with('\x01') && text.ends_with('\x01') {
let ctcp = &text[1..text.len() - 1];
if ctcp.starts_with("VERSION") {
let reply = format!(
"NOTICE {nick} :\x01VERSION poc-channel-irc 0.1.0\x01"
);
state.borrow_mut().send_raw(&reply).await.ok();
}
continue;
}
// Format and classify
let (log_line, channel, urgency) = if target.starts_with('#') {
let line = format!("[{}] <{}> {}", target, nick, text);
let ch = format!("irc.{}", target);
let urg = if text.to_lowercase().contains(&config.nick.to_lowercase()) {
NORMAL // mentioned
} else {
AMBIENT
};
(line, ch, urg)
} else {
// Private message
let line = format!("[PM:{}] {}", nick, text);
let ch = format!("irc.pm.{}", nick.to_lowercase());
(line, ch, URGENT)
};
// Per-channel log file
if target.starts_with('#') {
append_log(target, nick, text);
} else {
append_log(&format!("pm-{nick}"), nick, text);
}
state.borrow_mut().push_message(log_line, urgency, &channel);
}
"NOTICE" => {
let text = msg.params.last().map(|s| s.as_str()).unwrap_or("");
let from = msg.nick().unwrap_or("server");
let log_line = format!("[notice:{}] {}", from, text);
state.borrow_mut().push_message(log_line, AMBIENT, "irc.server");
}
// Nick in use
"433" => {
let alt = format!("{}_", config.nick);
warn!("irc: nick in use, trying {alt}");
state.borrow_mut().send_raw(&format!("NICK {alt}")).await?;
}
"JOIN" | "PART" | "QUIT" | "KICK" | "MODE" | "TOPIC" => {
// Silent for now
}
_ => {}
}
}
Ok(())
}
// ── ChannelServer Implementation ───────────────────────────────
struct ChannelServerImpl {
state: SharedState,
}
impl channel_server::Server for ChannelServerImpl {
fn recv(
&mut self,
params: channel_server::RecvParams,
mut results: channel_server::RecvResults,
) -> Promise<(), capnp::Error> {
let params = pry!(params.get());
let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
let all_new = params.get_all_new();
let min_count = params.get_min_count() as usize;
let mut s = self.state.borrow_mut();
let text = match s.channel_logs.get_mut(&channel) {
Some(log) => {
if all_new { log.recv_new(min_count) } else { log.recv_history(min_count) }
}
None => String::new(),
};
results.get().set_text(&text);
Promise::ok(())
}
fn send(
&mut self,
params: channel_server::SendParams,
_results: channel_server::SendResults,
) -> Promise<(), capnp::Error> {
let params = pry!(params.get());
let channel = pry!(pry!(params.get_channel()).to_str()).to_string();
let message = pry!(pry!(params.get_message()).to_str()).to_string();
// Parse channel path to IRC target:
// irc.#bcachefs -> #bcachefs
// irc.pm.nick -> nick (PRIVMSG)
let target = channel_to_target(&channel);
let state = self.state.clone();
Promise::from_future(async move {
{
let mut s = state.borrow_mut();
s.send_privmsg(&target, &message).await
.map_err(|e| capnp::Error::failed(format!("send failed: {e}")))?;
}
let nick = state.borrow().config.nick.clone();
append_log(&target, &nick, &message);
let log_line = if target.starts_with('#') {
format!("[{}] <{}> {}", target, nick, message)
} else {
format!("[PM:{}] {}", target, message)
};
state.borrow_mut().channel_logs
.entry(channel.clone())
.or_insert_with(ChannelLog::new)
.push_own(log_line);
Ok(())
})
}
fn subscribe(
&mut self,
params: channel_server::SubscribeParams,
_results: channel_server::SubscribeResults,
) -> Promise<(), capnp::Error> {
let callback = pry!(pry!(params.get()).get_callback());
self.state.borrow_mut().subscribers.push(callback);
info!("client subscribed for notifications");
Promise::ok(())
}
fn list(
&mut self,
_params: channel_server::ListParams,
mut results: channel_server::ListResults,
) -> Promise<(), capnp::Error> {
let s = self.state.borrow();
let connected = s.connected;
// All channels with logs (joined + PMs)
let names: Vec<String> = s.channel_logs.keys().cloned().collect();
let mut list = results.get().init_channels(names.len() as u32);
for (i, name) in names.iter().enumerate() {
let mut entry = list.reborrow().get(i as u32);
entry.set_name(name);
entry.set_connected(connected);
entry.set_unread(
s.channel_logs.get(name).map_or(0, |l| l.unread())
);
}
Promise::ok(())
}
}
/// Convert a channel path to an IRC target.
/// "irc.#bcachefs" -> "#bcachefs"
/// "irc.pm.nick" -> "nick"
/// "#bcachefs" -> "#bcachefs" (passthrough)
fn channel_to_target(channel: &str) -> String {
if let Some(rest) = channel.strip_prefix("irc.") {
if let Some(nick) = rest.strip_prefix("pm.") {
nick.to_string()
} else {
// rest is "#bcachefs" or similar
rest.to_string()
}
} else {
channel.to_string()
}
}
// ── Main ───────────────────────────────────────────────────────
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let config = load_config();
let state = Rc::new(RefCell::new(State::new(config)));
let sock_dir = dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/channels");
std::fs::create_dir_all(&sock_dir)?;
let sock_path = sock_dir.join("irc.sock");
let _ = std::fs::remove_file(&sock_path);
info!("irc channel daemon starting on {}", sock_path.display());
tokio::task::LocalSet::new()
.run_until(async move {
// Start IRC connection loop
let irc_state = state.clone();
tokio::task::spawn_local(async move {
connection_loop(irc_state).await;
});
// Listen for channel protocol connections
let listener = UnixListener::bind(&sock_path)?;
loop {
let (stream, _) = listener.accept().await?;
let (reader, writer) = stream.compat().split();
let network = twoparty::VatNetwork::new(
futures::io::BufReader::new(reader),
futures::io::BufWriter::new(writer),
rpc_twoparty_capnp::Side::Server,
Default::default(),
);
let server = ChannelServerImpl {
state: state.clone(),
};
let client: channel_server::Client =
capnp_rpc::new_client(server);
let rpc_system = RpcSystem::new(
Box::new(network),
Some(client.client),
);
tokio::task::spawn_local(rpc_system);
info!("channel client connected");
}
#[allow(unreachable_code)]
Ok::<(), Box<dyn std::error::Error>>(())
})
.await
}