// channel-irc — Standalone IRC channel daemon // // Maintains a persistent TLS connection to an IRC server, parses // incoming messages, and serves them over the channel.capnp protocol // on a Unix socket at ~/.consciousness/channels/irc.sock. // // Runs independently of the consciousness binary so restarts don't // kill the IRC connection. Reconnects automatically with exponential // backoff. Supports multiple simultaneous capnp clients. // // Config: ~/.consciousness/channels/irc.json5 // Socket: ~/.consciousness/channels/irc.sock use std::cell::RefCell; use std::collections::VecDeque; use std::io; use std::path::PathBuf; use std::rc::Rc; use std::sync::Arc; use capnp::capability::Promise; use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem}; use futures::AsyncReadExt; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::UnixListener; use tokio_util::compat::TokioAsyncReadCompatExt; use tracing::{error, info, warn}; use poc_memory::channel_capnp::{channel_client, channel_server}; // ── Constants ────────────────────────────────────────────────── const MAX_HISTORY: usize = 1000; const RECONNECT_BASE_SECS: u64 = 5; const RECONNECT_MAX_SECS: u64 = 300; const PING_INTERVAL_SECS: u64 = 120; const PING_TIMEOUT_SECS: u64 = 30; // Urgency levels (matching thalamus/notify.rs) const AMBIENT: u8 = 0; const NORMAL: u8 = 2; const URGENT: u8 = 3; // ── Config ───────────────────────────────────────────────────── #[derive(Clone, serde::Deserialize)] struct Config { server: String, port: u16, #[serde(default = "default_true")] tls: bool, nick: String, channels: Vec, #[serde(default)] password: Option, #[serde(default)] nickserv_pass: Option, } fn default_true() -> bool { true } fn load_config() -> Config { let path = dirs::home_dir() .unwrap_or_default() .join(".consciousness/channels/irc.json5"); let text = std::fs::read_to_string(&path) .unwrap_or_else(|e| panic!("failed to read {}: {e}", path.display())); json5::from_str(&text) .unwrap_or_else(|e| panic!("failed to parse {}: {e}", path.display())) } // ── IRC Message Parsing ──────────────────────────────────────── struct IrcMessage { prefix: Option, command: String, params: Vec, } impl IrcMessage { fn parse(line: &str) -> Option { let line = line.trim_end_matches(|c| c == '\r' || c == '\n'); if line.is_empty() { return None; } let (prefix, rest) = if line.starts_with(':') { let space = line.find(' ')?; (Some(line[1..space].to_string()), &line[space + 1..]) } else { (None, line) }; let (command_params, trailing) = if let Some(pos) = rest.find(" :") { (&rest[..pos], Some(rest[pos + 2..].to_string())) } else { (rest, None) }; let mut parts: Vec = command_params .split_whitespace() .map(String::from) .collect(); if parts.is_empty() { return None; } let command = parts.remove(0).to_uppercase(); let mut params = parts; if let Some(t) = trailing { params.push(t); } Some(IrcMessage { prefix, command, params }) } fn nick(&self) -> Option<&str> { self.prefix.as_deref().and_then(|p| p.split('!').next()) } } // ── Writer Abstraction ───────────────────────────────────────── type WriterHandle = Box; trait AsyncWriter { fn write_line( &mut self, line: &str, ) -> std::pin::Pin> + '_>>; } struct TlsWriter { inner: tokio::io::WriteHalf>, } impl AsyncWriter for TlsWriter { fn write_line( &mut self, line: &str, ) -> std::pin::Pin> + '_>> { let data = format!("{line}\r\n"); Box::pin(async move { self.inner.write_all(data.as_bytes()).await }) } } struct PlainWriter { inner: tokio::io::WriteHalf, } impl AsyncWriter for PlainWriter { fn write_line( &mut self, line: &str, ) -> std::pin::Pin> + '_>> { let data = format!("{line}\r\n"); Box::pin(async move { self.inner.write_all(data.as_bytes()).await }) } } // ── State ────────────────────────────────────────────────────── use poc_memory::thalamus::channel_log::ChannelLog; struct State { config: Config, /// Per-channel message logs (keyed by channel path, e.g. "irc.#bcachefs") channel_logs: std::collections::BTreeMap, /// Currently joined channels channels: Vec, connected: bool, /// IRC writer handle (None when disconnected) writer: Option, /// Registered notification callbacks subscribers: Vec, } type SharedState = Rc>; impl State { fn new(config: Config) -> Self { let channels = config.channels.clone(); Self { config, channel_logs: std::collections::BTreeMap::new(), channels, connected: false, writer: None, subscribers: Vec::new(), } } fn push_message(&mut self, line: String, urgency: u8, channel: &str) { // Store in per-channel log self.channel_logs .entry(channel.to_string()) .or_insert_with(ChannelLog::new) .push(line.clone()); // Notify all subscribers let preview = line.chars().take(80).collect::(); for sub in &self.subscribers { let mut req = sub.notify_request(); let mut list = req.get().init_notifications(1); let mut n = list.reborrow().get(0); n.set_channel(channel); n.set_urgency(urgency); n.set_preview(&preview); n.set_count(1); tokio::task::spawn_local(async move { let _ = req.send().promise.await; }); } } async fn send_raw(&mut self, line: &str) -> io::Result<()> { if let Some(ref mut w) = self.writer { w.write_line(line).await } else { Err(io::Error::new(io::ErrorKind::NotConnected, "irc: not connected")) } } async fn send_privmsg(&mut self, target: &str, msg: &str) -> io::Result<()> { self.send_raw(&format!("PRIVMSG {target} :{msg}")).await } } // ── Persistence ──────────────────────────────────────────────── fn data_dir() -> PathBuf { dirs::home_dir().unwrap_or_default().join(".consciousness/irc") } fn append_log(target: &str, nick: &str, text: &str) { use std::io::Write; let filename = format!("{}.log", target.trim_start_matches('#').to_lowercase()); let dir = data_dir().join("logs"); let _ = std::fs::create_dir_all(&dir); if let Ok(mut f) = std::fs::OpenOptions::new() .create(true) .append(true) .open(dir.join(&filename)) { let secs = now() as u64; let _ = writeln!(f, "{secs} <{nick}> {text}"); } } fn now() -> f64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs_f64() } // ── TLS ──────────────────────────────────────────────────────── fn root_certs() -> rustls::RootCertStore { let mut roots = rustls::RootCertStore::empty(); roots.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); roots } // ── IRC Connection Loop ──────────────────────────────────────── async fn connection_loop(state: SharedState) { let _ = std::fs::create_dir_all(data_dir().join("logs")); let mut backoff = RECONNECT_BASE_SECS; loop { let config = state.borrow().config.clone(); info!("irc: connecting to {}:{}", config.server, config.port); match connect_and_run(&state, &config).await { Ok(()) => info!("irc: connection closed cleanly"), Err(e) => error!("irc: connection error: {e}"), } let was_connected = state.borrow().connected; { let mut s = state.borrow_mut(); s.connected = false; s.writer = None; } if was_connected { backoff = RECONNECT_BASE_SECS; } info!("irc: reconnecting in {backoff}s"); tokio::time::sleep(std::time::Duration::from_secs(backoff)).await; backoff = (backoff * 2).min(RECONNECT_MAX_SECS); } } async fn connect_and_run(state: &SharedState, config: &Config) -> io::Result<()> { let addr = format!("{}:{}", config.server, config.port); let tcp = tokio::net::TcpStream::connect(&addr).await?; if config.tls { let tls_config = rustls::ClientConfig::builder_with_provider( rustls::crypto::ring::default_provider().into(), ) .with_safe_default_protocol_versions() .map_err(|e| io::Error::new(io::ErrorKind::Other, e))? .with_root_certificates(root_certs()) .with_no_client_auth(); let connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config)); let server_name = rustls::pki_types::ServerName::try_from(config.server.clone()) .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; let tls_stream = connector.connect(server_name, tcp).await?; let (reader, writer) = tokio::io::split(tls_stream); state.borrow_mut().writer = Some(Box::new(TlsWriter { inner: writer })); register_and_read(state, config, BufReader::new(reader)).await } else { let (reader, writer) = tokio::io::split(tcp); state.borrow_mut().writer = Some(Box::new(PlainWriter { inner: writer })); register_and_read(state, config, BufReader::new(reader)).await } } async fn register_and_read( state: &SharedState, config: &Config, mut reader: BufReader, ) -> io::Result<()> { // Send PASS if configured if let Some(ref pass) = config.password { state.borrow_mut().send_raw(&format!("PASS {pass}")).await?; } // Register with nick and user { let mut s = state.borrow_mut(); s.send_raw(&format!("NICK {}", config.nick)).await?; s.send_raw(&format!("USER {} 0 * :{}", config.nick, config.nick)).await?; } let mut buf = Vec::new(); let mut ping_sent = false; let mut deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(PING_INTERVAL_SECS); loop { buf.clear(); let read_result = tokio::select! { result = reader.read_until(b'\n', &mut buf) => result, _ = tokio::time::sleep_until(deadline) => { if ping_sent { return Err(io::Error::new( io::ErrorKind::TimedOut, "ping timeout -- no response from server", )); } info!("irc: no data for {PING_INTERVAL_SECS}s, sending PING"); state.borrow_mut().send_raw("PING :keepalive").await?; ping_sent = true; deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(PING_TIMEOUT_SECS); continue; } }; let n = read_result?; if n == 0 { break; } // Any data resets the ping timer ping_sent = false; deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(PING_INTERVAL_SECS); // IRC is not guaranteed UTF-8 let line = String::from_utf8_lossy(&buf).trim_end().to_string(); if line.is_empty() { continue; } let msg = match IrcMessage::parse(&line) { Some(m) => m, None => continue, }; match msg.command.as_str() { "PING" => { let arg = msg.params.first().map(|s| s.as_str()).unwrap_or(""); state.borrow_mut().send_raw(&format!("PONG :{arg}")).await?; } // RPL_WELCOME -- registration complete "001" => { info!("irc: registered as {}", config.nick); state.borrow_mut().connected = true; // NickServ auth if let Some(ref pass) = config.nickserv_pass { state.borrow_mut() .send_privmsg("NickServ", &format!("IDENTIFY {pass}")) .await?; } // Join configured channels let channels = state.borrow().channels.clone(); for ch in &channels { if let Err(e) = state.borrow_mut().send_raw(&format!("JOIN {ch}")).await { warn!("irc: failed to join {ch}: {e}"); } // Create log entry so channel appears in list() let key = format!("irc.{ch}"); state.borrow_mut().channel_logs .entry(key) .or_insert_with(ChannelLog::new); } } "PRIVMSG" => { let target = msg.params.first().map(|s| s.as_str()).unwrap_or(""); let text = msg.params.get(1).map(|s| s.as_str()).unwrap_or(""); let nick = msg.nick().unwrap_or("unknown"); // Handle CTCP requests if text.starts_with('\x01') && text.ends_with('\x01') { let ctcp = &text[1..text.len() - 1]; if ctcp.starts_with("VERSION") { let reply = format!( "NOTICE {nick} :\x01VERSION poc-channel-irc 0.1.0\x01" ); state.borrow_mut().send_raw(&reply).await.ok(); } continue; } // Format and classify let (log_line, channel, urgency) = if target.starts_with('#') { let line = format!("[{}] <{}> {}", target, nick, text); let ch = format!("irc.{}", target); let urg = if text.to_lowercase().contains(&config.nick.to_lowercase()) { NORMAL // mentioned } else { AMBIENT }; (line, ch, urg) } else { // Private message let line = format!("[PM:{}] {}", nick, text); let ch = format!("irc.pm.{}", nick.to_lowercase()); (line, ch, URGENT) }; // Per-channel log file if target.starts_with('#') { append_log(target, nick, text); } else { append_log(&format!("pm-{nick}"), nick, text); } state.borrow_mut().push_message(log_line, urgency, &channel); } "NOTICE" => { let text = msg.params.last().map(|s| s.as_str()).unwrap_or(""); let from = msg.nick().unwrap_or("server"); let log_line = format!("[notice:{}] {}", from, text); state.borrow_mut().push_message(log_line, AMBIENT, "irc.server"); } // Nick in use "433" => { let alt = format!("{}_", config.nick); warn!("irc: nick in use, trying {alt}"); state.borrow_mut().send_raw(&format!("NICK {alt}")).await?; } "JOIN" | "PART" | "QUIT" | "KICK" | "MODE" | "TOPIC" => { // Silent for now } _ => {} } } Ok(()) } // ── ChannelServer Implementation ─────────────────────────────── struct ChannelServerImpl { state: SharedState, } impl channel_server::Server for ChannelServerImpl { fn recv( &mut self, params: channel_server::RecvParams, mut results: channel_server::RecvResults, ) -> Promise<(), capnp::Error> { let params = pry!(params.get()); let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let all_new = params.get_all_new(); let min_count = params.get_min_count() as usize; let mut s = self.state.borrow_mut(); let text = match s.channel_logs.get_mut(&channel) { Some(log) => { if all_new { log.recv_new(min_count) } else { log.recv_history(min_count) } } None => String::new(), }; results.get().set_text(&text); Promise::ok(()) } fn send( &mut self, params: channel_server::SendParams, _results: channel_server::SendResults, ) -> Promise<(), capnp::Error> { let params = pry!(params.get()); let channel = pry!(pry!(params.get_channel()).to_str()).to_string(); let message = pry!(pry!(params.get_message()).to_str()).to_string(); // Parse channel path to IRC target: // irc.#bcachefs -> #bcachefs // irc.pm.nick -> nick (PRIVMSG) let target = channel_to_target(&channel); let state = self.state.clone(); Promise::from_future(async move { { let mut s = state.borrow_mut(); s.send_privmsg(&target, &message).await .map_err(|e| capnp::Error::failed(format!("send failed: {e}")))?; } let nick = state.borrow().config.nick.clone(); append_log(&target, &nick, &message); let log_line = if target.starts_with('#') { format!("[{}] <{}> {}", target, nick, message) } else { format!("[PM:{}] {}", target, message) }; state.borrow_mut().push_message(log_line, 0, &channel); Ok(()) }) } fn subscribe( &mut self, params: channel_server::SubscribeParams, _results: channel_server::SubscribeResults, ) -> Promise<(), capnp::Error> { let callback = pry!(pry!(params.get()).get_callback()); self.state.borrow_mut().subscribers.push(callback); info!("client subscribed for notifications"); Promise::ok(()) } fn list( &mut self, _params: channel_server::ListParams, mut results: channel_server::ListResults, ) -> Promise<(), capnp::Error> { let s = self.state.borrow(); let connected = s.connected; // All channels with logs (joined + PMs) let names: Vec = s.channel_logs.keys().cloned().collect(); let mut list = results.get().init_channels(names.len() as u32); for (i, name) in names.iter().enumerate() { let mut entry = list.reborrow().get(i as u32); entry.set_name(name); entry.set_connected(connected); entry.set_unread( s.channel_logs.get(name).map_or(0, |l| l.unread()) ); } Promise::ok(()) } } /// Convert a channel path to an IRC target. /// "irc.#bcachefs" -> "#bcachefs" /// "irc.pm.nick" -> "nick" /// "#bcachefs" -> "#bcachefs" (passthrough) fn channel_to_target(channel: &str) -> String { if let Some(rest) = channel.strip_prefix("irc.") { if let Some(nick) = rest.strip_prefix("pm.") { nick.to_string() } else { // rest is "#bcachefs" or similar rest.to_string() } } else { channel.to_string() } } // ── Main ─────────────────────────────────────────────────────── #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Box> { tracing_subscriber::fmt::init(); let config = load_config(); let state = Rc::new(RefCell::new(State::new(config))); let sock_dir = dirs::home_dir() .unwrap_or_default() .join(".consciousness/channels"); std::fs::create_dir_all(&sock_dir)?; let sock_path = sock_dir.join("irc.sock"); let _ = std::fs::remove_file(&sock_path); info!("irc channel daemon starting on {}", sock_path.display()); tokio::task::LocalSet::new() .run_until(async move { // Start IRC connection loop let irc_state = state.clone(); tokio::task::spawn_local(async move { connection_loop(irc_state).await; }); // Listen for channel protocol connections let listener = UnixListener::bind(&sock_path)?; loop { let (stream, _) = listener.accept().await?; let (reader, writer) = stream.compat().split(); let network = twoparty::VatNetwork::new( futures::io::BufReader::new(reader), futures::io::BufWriter::new(writer), rpc_twoparty_capnp::Side::Server, Default::default(), ); let server = ChannelServerImpl { state: state.clone(), }; let client: channel_server::Client = capnp_rpc::new_client(server); let rpc_system = RpcSystem::new( Box::new(network), Some(client.client), ); tokio::task::spawn_local(rpc_system); info!("channel client connected"); } #[allow(unreachable_code)] Ok::<(), Box>(()) }) .await }