channel architecture: wire protocol, daemons, supervisor

Design and implement the channel system for external communications:

- schema/channel.capnp: wire protocol for channel daemons
  (recv with all_new/min_count, send, subscribe, list)
- channels/irc/: standalone IRC daemon crate (consciousness-channel-irc)
- channels/telegram/: standalone Telegram daemon crate
  (consciousness-channel-telegram)
- src/thalamus/channels.rs: client connecting to daemon sockets
- src/thalamus/supervisor.rs: daemon lifecycle with file locking
  for multi-instance safety

Channel daemons listen on ~/.consciousness/channels/*.sock,
configs in *.json5, supervisor discovers and starts them.
IRC/Telegram modules removed from thalamus core — they're
now independent daemons that survive consciousness restarts.

Also: delete standalone tui.rs (moved to consciousness F4/F5),
fix build warnings, add F5 thalamus screen with channel status.

Co-Developed-By: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
ProofOfConcept 2026-04-03 18:46:14 -04:00
parent db42bf6243
commit ad5f69abb8
23 changed files with 1716 additions and 1921 deletions

178
src/thalamus/channels.rs Normal file
View file

@ -0,0 +1,178 @@
// channels.rs — Channel client for the thalamus
//
// Discovers channel daemon sockets in ~/.consciousness/channels/,
// connects via capnp RPC, and provides send/recv operations.
//
// Each daemon socket speaks the channel.capnp protocol. The channel
// manager routes by prefix: "irc.#bcachefs" → connects to irc.sock.
use std::collections::HashMap;
use std::path::PathBuf;
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
use futures::AsyncReadExt;
use tokio::net::UnixStream;
use tokio_util::compat::TokioAsyncReadCompatExt;
use tracing::{info, warn};
use crate::channel_capnp::channel_server;
/// A live connection to a channel daemon.
struct DaemonConnection {
#[allow(dead_code)]
prefix: String,
client: channel_server::Client,
// Hold the RPC system task so it doesn't get dropped
_rpc_task: tokio::task::JoinHandle<Result<(), capnp::Error>>,
}
/// Manages all channel daemon connections.
pub struct ChannelManager {
daemons: HashMap<String, DaemonConnection>,
channels_dir: PathBuf,
}
impl ChannelManager {
pub fn new() -> Self {
let channels_dir = dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/channels");
Self {
daemons: HashMap::new(),
channels_dir,
}
}
/// Connect to a daemon socket, returning the capnp client.
async fn connect(path: &std::path::Path) -> Result<
(channel_server::Client, tokio::task::JoinHandle<Result<(), capnp::Error>>),
Box<dyn std::error::Error>,
> {
let stream = UnixStream::connect(path).await?;
let (reader, writer) = stream.compat().split();
let rpc_network = Box::new(twoparty::VatNetwork::new(
futures::io::BufReader::new(reader),
futures::io::BufWriter::new(writer),
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
let mut rpc_system = RpcSystem::new(rpc_network, None);
let client: channel_server::Client =
rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
let task = tokio::task::spawn_local(rpc_system);
Ok((client, task))
}
/// Scan the channels directory for daemon sockets and connect.
pub async fn discover(&mut self) {
let dir = match std::fs::read_dir(&self.channels_dir) {
Ok(d) => d,
Err(_) => return, // directory doesn't exist yet
};
for entry in dir.flatten() {
let path = entry.path();
if path.extension().map_or(false, |e| e == "sock") {
let prefix = path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("")
.to_string();
if self.daemons.contains_key(&prefix) {
continue;
}
match Self::connect(&path).await {
Ok((client, task)) => {
info!("connected to channel daemon: {}", prefix);
self.daemons.insert(
prefix.clone(),
DaemonConnection {
prefix,
client,
_rpc_task: task,
},
);
}
Err(e) => {
warn!("failed to connect to {}: {}", path.display(), e);
}
}
}
}
}
/// Find the daemon client for a channel path.
fn client_for(&self, channel: &str) -> Option<&channel_server::Client> {
let prefix = channel.split('.').next()?;
self.daemons.get(prefix).map(|d| &d.client)
}
/// Send a message to a channel.
pub async fn send(&self, channel: &str, message: &str) -> Result<(), String> {
let client = self.client_for(channel)
.ok_or_else(|| format!("no daemon for channel: {}", channel))?;
let mut req = client.send_request();
req.get().set_channel(channel);
req.get().set_message(message);
req.send().promise.await
.map_err(|e| format!("send failed: {}", e))?;
Ok(())
}
/// Read from a channel.
pub async fn recv(
&self,
channel: &str,
all_new: bool,
min_count: u32,
) -> Result<String, String> {
let client = self.client_for(channel)
.ok_or_else(|| format!("no daemon for channel: {}", channel))?;
let mut req = client.recv_request();
req.get().set_channel(channel);
req.get().set_all_new(all_new);
req.get().set_min_count(min_count);
let reply = req.send().promise.await
.map_err(|e| format!("recv failed: {}", e))?;
let text = reply.get()
.map_err(|e| format!("recv reply error: {}", e))?
.get_text()
.map_err(|e| format!("recv text error: {}", e))?;
Ok(text.to_str().unwrap_or("").to_string())
}
/// List connected daemon prefixes.
pub fn prefixes(&self) -> Vec<&str> {
self.daemons.keys().map(|s| s.as_str()).collect()
}
/// List all channels from all connected daemons.
pub async fn list_all(&self) -> Vec<(String, bool, u32)> {
let mut result = Vec::new();
for daemon in self.daemons.values() {
let req = daemon.client.list_request();
if let Ok(reply) = req.send().promise.await {
if let Ok(r) = reply.get() {
if let Ok(channels) = r.get_channels() {
for ch in channels.iter() {
if let Ok(name) = ch.get_name() {
result.push((
name.to_str().unwrap_or("").to_string(),
ch.get_connected(),
ch.get_unread(),
));
}
}
}
}
}
}
result
}
}

View file

@ -8,7 +8,9 @@
// Moved from the standalone poc-daemon crate into the main
// consciousness crate.
pub mod channels;
pub mod config;
pub mod supervisor;
pub mod context;
pub mod idle;
pub mod modules;
@ -480,28 +482,12 @@ async fn server_main() -> Result<(), Box<dyn std::error::Error>> {
tokio::task::LocalSet::new()
.run_until(async move {
// Start modules
let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel();
let (_notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel::<notify::Notification>();
let irc_state = if daemon_config.borrow().irc.enabled {
let irc_config = daemon_config.borrow().irc.clone();
info!("starting irc module: {}:{}", irc_config.server, irc_config.port);
Some(modules::irc::start(irc_config, notify_tx.clone(), daemon_config.clone()))
} else {
info!("irc module disabled");
None
};
let telegram_state = if daemon_config.borrow().telegram.enabled {
info!("starting telegram module");
Some(modules::telegram::start(
daemon_config.borrow().telegram.clone(),
notify_tx.clone(),
daemon_config.clone(),
))
} else {
info!("telegram module disabled");
None
};
// External modules (IRC, Telegram) now run as separate daemons.
// They connect via the notification channel when implemented.
let _irc_state: Option<()> = None;
let _telegram_state: Option<()> = None;
let listener = UnixListener::bind(&sock)?;
#[cfg(unix)]
@ -571,8 +557,6 @@ async fn server_main() -> Result<(), Box<dyn std::error::Error>> {
let daemon_impl = rpc::DaemonImpl::new(
state.clone(),
irc_state.clone(),
telegram_state.clone(),
daemon_config.clone(),
);
let client: daemon_capnp::daemon::Client =

View file

@ -1,569 +0,0 @@
// IRC module.
//
// Maintains a persistent connection to an IRC server. Parses incoming
// messages into notifications, supports sending messages and runtime
// commands (join, leave, etc.). Config changes persist to daemon.toml.
//
// Runs as a spawned local task on the daemon's LocalSet. Notifications
// flow through an mpsc channel into the main state. Reconnects
// automatically with exponential backoff.
use crate::thalamus::config::{Config, IrcConfig};
use crate::thalamus::notify::Notification;
use crate::thalamus::{home, now};
use std::cell::RefCell;
use std::collections::VecDeque;
use std::io;
use std::rc::Rc;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::sync::mpsc;
use tracing::{error, info, warn};
const MAX_LOG_LINES: usize = 200;
const RECONNECT_BASE_SECS: u64 = 5;
const RECONNECT_MAX_SECS: u64 = 300;
const PING_INTERVAL_SECS: u64 = 120;
const PING_TIMEOUT_SECS: u64 = 30;
/// Parsed IRC message.
struct IrcMessage {
prefix: Option<String>, // nick!user@host
command: String,
params: Vec<String>,
}
impl IrcMessage {
fn parse(line: &str) -> Option<Self> {
let line = line.trim_end_matches(|c| c == '\r' || c == '\n');
if line.is_empty() {
return None;
}
let (prefix, rest) = if line.starts_with(':') {
let space = line.find(' ')?;
(Some(line[1..space].to_string()), &line[space + 1..])
} else {
(None, line)
};
let (command_params, trailing) = if let Some(pos) = rest.find(" :") {
(&rest[..pos], Some(rest[pos + 2..].to_string()))
} else {
(rest, None)
};
let mut parts: Vec<String> = command_params
.split_whitespace()
.map(String::from)
.collect();
if parts.is_empty() {
return None;
}
let command = parts.remove(0).to_uppercase();
let mut params = parts;
if let Some(t) = trailing {
params.push(t);
}
Some(IrcMessage {
prefix,
command,
params,
})
}
/// Extract nick from prefix (nick!user@host → nick).
fn nick(&self) -> Option<&str> {
self.prefix
.as_deref()
.and_then(|p| p.split('!').next())
}
}
/// Shared IRC state, accessible from both the read task and RPC handlers.
pub struct IrcState {
pub config: IrcConfig,
pub connected: bool,
pub channels: Vec<String>,
pub log: VecDeque<String>,
writer: Option<WriterHandle>,
}
/// Type-erased writer handle so we can store it without generic params.
type WriterHandle = Box<dyn AsyncWriter>;
trait AsyncWriter {
fn write_line(&mut self, line: &str) -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<()>> + '_>>;
}
/// Writer over a TLS stream.
struct TlsWriter {
inner: tokio::io::WriteHalf<tokio_rustls::client::TlsStream<tokio::net::TcpStream>>,
}
impl AsyncWriter for TlsWriter {
fn write_line(&mut self, line: &str) -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<()>> + '_>> {
let data = format!("{line}\r\n");
Box::pin(async move {
self.inner.write_all(data.as_bytes()).await
})
}
}
/// Writer over a plain TCP stream.
struct PlainWriter {
inner: tokio::io::WriteHalf<tokio::net::TcpStream>,
}
impl AsyncWriter for PlainWriter {
fn write_line(&mut self, line: &str) -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<()>> + '_>> {
let data = format!("{line}\r\n");
Box::pin(async move {
self.inner.write_all(data.as_bytes()).await
})
}
}
impl IrcState {
fn new(config: IrcConfig) -> Self {
Self {
channels: config.channels.clone(),
config,
connected: false,
log: VecDeque::with_capacity(MAX_LOG_LINES),
writer: None,
}
}
fn push_log(&mut self, line: &str) {
if self.log.len() >= MAX_LOG_LINES {
self.log.pop_front();
}
self.log.push_back(line.to_string());
}
async fn send_raw(&mut self, line: &str) -> io::Result<()> {
if let Some(ref mut w) = self.writer {
w.write_line(line).await
} else {
Err(io::Error::new(io::ErrorKind::NotConnected, "not connected"))
}
}
async fn send_privmsg(&mut self, target: &str, msg: &str) -> io::Result<()> {
self.send_raw(&format!("PRIVMSG {target} :{msg}")).await
}
async fn join(&mut self, channel: &str) -> io::Result<()> {
self.send_raw(&format!("JOIN {channel}")).await?;
if !self.channels.iter().any(|c| c == channel) {
self.channels.push(channel.to_string());
}
Ok(())
}
async fn part(&mut self, channel: &str) -> io::Result<()> {
self.send_raw(&format!("PART {channel}")).await?;
self.channels.retain(|c| c != channel);
Ok(())
}
}
pub type SharedIrc = Rc<RefCell<IrcState>>;
/// Start the IRC module. Returns the shared state handle.
pub fn start(
config: IrcConfig,
notify_tx: mpsc::UnboundedSender<Notification>,
daemon_config: Rc<RefCell<Config>>,
) -> SharedIrc {
let state = Rc::new(RefCell::new(IrcState::new(config)));
let state_clone = state.clone();
tokio::task::spawn_local(async move {
connection_loop(state_clone, notify_tx, daemon_config).await;
});
state
}
async fn connection_loop(
state: SharedIrc,
notify_tx: mpsc::UnboundedSender<Notification>,
daemon_config: Rc<RefCell<Config>>,
) {
let mut backoff = RECONNECT_BASE_SECS;
loop {
let config = state.borrow().config.clone();
info!("irc: connecting to {}:{}", config.server, config.port);
match connect_and_run(&state, &config, &notify_tx).await {
Ok(()) => {
info!("irc: connection closed cleanly");
}
Err(e) => {
error!("irc: connection error: {e}");
}
}
// Reset backoff if we had a working connection (registered
// successfully before disconnecting)
let was_connected = state.borrow().connected;
state.borrow_mut().connected = false;
state.borrow_mut().writer = None;
if was_connected {
backoff = RECONNECT_BASE_SECS;
}
// Persist current channel list to config
{
let channels = state.borrow().channels.clone();
let mut dc = daemon_config.borrow_mut();
dc.irc.channels = channels;
dc.save();
}
info!("irc: reconnecting in {backoff}s");
tokio::time::sleep(std::time::Duration::from_secs(backoff)).await;
backoff = (backoff * 2).min(RECONNECT_MAX_SECS);
}
}
async fn connect_and_run(
state: &SharedIrc,
config: &IrcConfig,
notify_tx: &mpsc::UnboundedSender<Notification>,
) -> io::Result<()> {
let addr = format!("{}:{}", config.server, config.port);
let tcp = tokio::net::TcpStream::connect(&addr).await?;
if config.tls {
let tls_config = rustls::ClientConfig::builder_with_provider(
rustls::crypto::ring::default_provider().into(),
)
.with_safe_default_protocol_versions()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
.with_root_certificates(root_certs())
.with_no_client_auth();
let connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config));
let server_name = rustls::pki_types::ServerName::try_from(config.server.clone())
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
let tls_stream = connector.connect(server_name, tcp).await?;
let (reader, writer) = tokio::io::split(tls_stream);
state.borrow_mut().writer = Some(Box::new(TlsWriter { inner: writer }));
let buf_reader = BufReader::new(reader);
register_and_read(state, config, buf_reader, notify_tx).await
} else {
let (reader, writer) = tokio::io::split(tcp);
state.borrow_mut().writer = Some(Box::new(PlainWriter { inner: writer }));
let buf_reader = BufReader::new(reader);
register_and_read(state, config, buf_reader, notify_tx).await
}
}
async fn register_and_read<R: tokio::io::AsyncRead + Unpin>(
state: &SharedIrc,
config: &IrcConfig,
mut reader: BufReader<R>,
notify_tx: &mpsc::UnboundedSender<Notification>,
) -> io::Result<()> {
// Register
{
let mut s = state.borrow_mut();
s.send_raw(&format!("NICK {}", config.nick)).await?;
s.send_raw(&format!("USER {} 0 * :{}", config.user, config.realname)).await?;
}
let mut buf = Vec::new();
let mut ping_sent = false;
let mut deadline = tokio::time::Instant::now()
+ std::time::Duration::from_secs(PING_INTERVAL_SECS);
loop {
buf.clear();
let read_result = tokio::select! {
result = reader.read_until(b'\n', &mut buf) => result,
_ = tokio::time::sleep_until(deadline) => {
if ping_sent {
return Err(io::Error::new(
io::ErrorKind::TimedOut,
"ping timeout — no response from server",
));
}
info!("irc: no data for {}s, sending PING", PING_INTERVAL_SECS);
state.borrow_mut().send_raw("PING :keepalive").await?;
ping_sent = true;
deadline = tokio::time::Instant::now()
+ std::time::Duration::from_secs(PING_TIMEOUT_SECS);
continue;
}
};
let n = read_result?;
if n == 0 { break; }
// Any data from server resets the ping timer
ping_sent = false;
deadline = tokio::time::Instant::now()
+ std::time::Duration::from_secs(PING_INTERVAL_SECS);
// IRC is not guaranteed UTF-8 — lossy conversion handles Latin-1 etc.
let line = String::from_utf8_lossy(&buf).trim_end().to_string();
if line.is_empty() { continue; }
let msg = match IrcMessage::parse(&line) {
Some(m) => m,
None => continue,
};
match msg.command.as_str() {
"PING" => {
let arg = msg.params.first().map(|s| s.as_str()).unwrap_or("");
state.borrow_mut().send_raw(&format!("PONG :{arg}")).await?;
}
// RPL_WELCOME — registration complete
"001" => {
info!("irc: registered as {}", config.nick);
state.borrow_mut().connected = true;
// Join configured channels
let channels = state.borrow().channels.clone();
for ch in &channels {
if let Err(e) = state.borrow_mut().send_raw(&format!("JOIN {ch}")).await {
warn!("irc: failed to join {ch}: {e}");
}
}
}
"PRIVMSG" => {
let target = msg.params.first().map(|s| s.as_str()).unwrap_or("");
let text = msg.params.get(1).map(|s| s.as_str()).unwrap_or("");
let nick = msg.nick().unwrap_or("unknown");
// Handle CTCP requests (wrapped in \x01)
if text.starts_with('\x01') && text.ends_with('\x01') {
let ctcp = &text[1..text.len()-1];
if ctcp.starts_with("VERSION") {
let reply = format!(
"NOTICE {nick} :\x01VERSION poc-daemon 0.4.0\x01"
);
state.borrow_mut().send_raw(&reply).await.ok();
}
// Don't generate notifications for CTCP
continue;
}
// Log the message
let log_line = if target.starts_with('#') {
format!("[{}] <{}> {}", target, nick, text)
} else {
format!("[PM:{nick}] {text}")
};
state.borrow_mut().push_log(&log_line);
// Write to per-channel/per-user log file
if target.starts_with('#') {
append_log(target, nick, text);
} else {
append_log(&format!("pm-{nick}"), nick, text);
}
// Generate notification
let (ntype, urgency) = classify_privmsg(
nick,
target,
text,
&config.nick,
);
let _ = notify_tx.send(Notification {
ntype,
urgency,
message: log_line,
timestamp: now(),
});
}
// Nick in use
"433" => {
let alt = format!("{}_", config.nick);
warn!("irc: nick in use, trying {alt}");
state.borrow_mut().send_raw(&format!("NICK {alt}")).await?;
}
"JOIN" | "PART" | "QUIT" | "KICK" | "MODE" | "TOPIC" | "NOTICE" => {
// Could log these, but skip for now
}
_ => {}
}
}
Ok(())
}
/// Classify a PRIVMSG into notification type and urgency.
fn classify_privmsg(nick: &str, target: &str, text: &str, my_nick: &str) -> (String, u8) {
let my_nick_lower = my_nick.to_lowercase();
let text_lower = text.to_lowercase();
if !target.starts_with('#') {
// Private message
(format!("irc.pm.{nick}"), crate::thalamus::notify::URGENT)
} else if text_lower.contains(&my_nick_lower) {
// Mentioned in channel
(format!("irc.mention.{nick}"), crate::thalamus::notify::NORMAL)
} else {
// Regular channel message
let channel = target.trim_start_matches('#');
(format!("irc.channel.{channel}"), crate::thalamus::notify::AMBIENT)
}
}
/// Append a message to the per-channel or per-user log file.
/// Logs go to ~/.consciousness/irc/logs/{target}.log (e.g. #bcachefs.log, pm-user.log)
fn append_log(target: &str, nick: &str, text: &str) {
use std::io::Write;
// Sanitize target for filename (strip leading #, lowercase)
let filename = format!("{}.log", target.trim_start_matches('#').to_lowercase());
let dir = home().join(".consciousness/irc/logs");
let _ = std::fs::create_dir_all(&dir);
if let Ok(mut f) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(dir.join(&filename))
{
let secs = now() as u64;
let _ = writeln!(f, "{secs} <{nick}> {text}");
}
}
fn root_certs() -> rustls::RootCertStore {
let mut roots = rustls::RootCertStore::empty();
roots.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
roots
}
/// Handle a runtime command from RPC.
pub async fn handle_command(
state: &SharedIrc,
daemon_config: &Rc<RefCell<Config>>,
cmd: &str,
args: &[String],
) -> Result<String, String> {
match cmd {
"join" => {
let channel = args.first().ok_or("usage: irc join <channel>")?;
let channel = if channel.starts_with('#') {
channel.clone()
} else {
format!("#{channel}")
};
state
.borrow_mut()
.join(&channel)
.await
.map_err(|e| e.to_string())?;
// Persist
let mut dc = daemon_config.borrow_mut();
if !dc.irc.channels.contains(&channel) {
dc.irc.channels.push(channel.clone());
}
dc.save();
Ok(format!("joined {channel}"))
}
"leave" | "part" => {
let channel = args.first().ok_or("usage: irc leave <channel>")?;
let channel = if channel.starts_with('#') {
channel.clone()
} else {
format!("#{channel}")
};
state
.borrow_mut()
.part(&channel)
.await
.map_err(|e| e.to_string())?;
// Persist
let mut dc = daemon_config.borrow_mut();
dc.irc.channels.retain(|c| c != &channel);
dc.save();
Ok(format!("left {channel}"))
}
"send" | "msg" => {
if args.len() < 2 {
return Err("usage: irc send <target> <message>".into());
}
let target = &args[0];
if target.starts_with('#') {
let s = state.borrow();
if !s.channels.iter().any(|c| c == target) {
return Err(format!(
"not in channel {target} (joined: {})",
s.channels.join(", ")
));
}
}
let msg = args[1..].join(" ");
let nick = state.borrow().config.nick.clone();
state
.borrow_mut()
.send_privmsg(target, &msg)
.await
.map_err(|e| e.to_string())?;
append_log(target, &nick, &msg);
Ok(format!("sent to {target}"))
}
"status" => {
let s = state.borrow();
Ok(format!(
"connected={} channels={} log_lines={} nick={}",
s.connected,
s.channels.join(","),
s.log.len(),
s.config.nick,
))
}
"log" => {
let n: usize = args
.first()
.and_then(|s| s.parse().ok())
.unwrap_or(15);
let s = state.borrow();
let lines: Vec<&String> = s.log.iter().rev().take(n).collect();
let mut lines: Vec<&str> = lines.iter().map(|s| s.as_str()).collect();
lines.reverse();
Ok(lines.join("\n"))
}
"nick" => {
let new_nick = args.first().ok_or("usage: irc nick <newnick>")?;
state
.borrow_mut()
.send_raw(&format!("NICK {new_nick}"))
.await
.map_err(|e| e.to_string())?;
let mut dc = daemon_config.borrow_mut();
dc.irc.nick = new_nick.clone();
dc.save();
Ok(format!("nick → {new_nick}"))
}
_ => Err(format!(
"unknown irc command: {cmd}\n\
commands: join, leave, send, status, log, nick"
)),
}
}

View file

@ -1,2 +1,2 @@
pub mod irc;
pub mod telegram;
// External communication modules (IRC, Telegram, etc.) live in
// separate daemons, not in the core consciousness binary.

View file

@ -1,374 +0,0 @@
// Telegram module.
//
// Long-polls the Telegram Bot API for messages from Kent's chat.
// Downloads media (photos, voice, documents) to local files.
// Sends text and files. Notifications flow through mpsc into the
// daemon's main state.
//
// Only accepts messages from the configured chat_id (prompt
// injection defense — other senders get a "private bot" reply).
use crate::thalamus::config::{Config, TelegramConfig};
use crate::thalamus::notify::Notification;
use crate::thalamus::{home, now};
use std::cell::RefCell;
use std::collections::VecDeque;
use std::path::PathBuf;
use std::rc::Rc;
use tokio::sync::mpsc;
use tracing::{error, info};
const MAX_LOG_LINES: usize = 100;
const POLL_TIMEOUT: u64 = 30;
pub struct TelegramState {
pub config: TelegramConfig,
pub connected: bool,
pub log: VecDeque<String>,
pub last_offset: i64,
client: reqwest::Client,
}
pub type SharedTelegram = Rc<RefCell<TelegramState>>;
impl TelegramState {
fn new(config: TelegramConfig) -> Self {
let last_offset = load_offset();
Self {
config,
connected: false,
log: VecDeque::with_capacity(MAX_LOG_LINES),
last_offset,
client: reqwest::Client::new(),
}
}
fn push_log(&mut self, line: &str) {
if self.log.len() >= MAX_LOG_LINES {
self.log.pop_front();
}
self.log.push_back(line.to_string());
}
fn api_url(&self, method: &str) -> String {
format!(
"https://api.telegram.org/bot{}/{}",
self.config.token, method
)
}
}
fn offset_path() -> PathBuf {
home().join(".consciousness/telegram/last_offset")
}
fn load_offset() -> i64 {
std::fs::read_to_string(offset_path())
.ok()
.and_then(|s| s.trim().parse().ok())
.unwrap_or(0)
}
fn save_offset(offset: i64) {
let _ = std::fs::write(offset_path(), offset.to_string());
}
fn history_path() -> PathBuf {
home().join(".consciousness/telegram/history.log")
}
fn media_dir() -> PathBuf {
home().join(".consciousness/telegram/media")
}
fn append_history(line: &str) {
use std::io::Write;
if let Ok(mut f) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(history_path())
{
let _ = writeln!(f, "{}", line);
}
}
/// Start the Telegram module. Returns the shared state handle.
pub fn start(
config: TelegramConfig,
notify_tx: mpsc::UnboundedSender<Notification>,
_daemon_config: Rc<RefCell<Config>>,
) -> SharedTelegram {
let state = Rc::new(RefCell::new(TelegramState::new(config)));
let state_clone = state.clone();
tokio::task::spawn_local(async move {
poll_loop(state_clone, notify_tx).await;
});
state
}
async fn poll_loop(
state: SharedTelegram,
notify_tx: mpsc::UnboundedSender<Notification>,
) {
let _ = std::fs::create_dir_all(media_dir());
loop {
match poll_once(&state, &notify_tx).await {
Ok(()) => {}
Err(e) => {
error!("telegram: poll error: {e}");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
}
async fn poll_once(
state: &SharedTelegram,
notify_tx: &mpsc::UnboundedSender<Notification>,
) -> Result<(), Box<dyn std::error::Error>> {
let (url, chat_id, token) = {
let s = state.borrow();
let url = format!(
"{}?offset={}&timeout={}",
s.api_url("getUpdates"),
s.last_offset,
POLL_TIMEOUT,
);
(url, s.config.chat_id, s.config.token.clone())
};
let client = state.borrow().client.clone();
let resp: serde_json::Value = client
.get(&url)
.timeout(std::time::Duration::from_secs(POLL_TIMEOUT + 5))
.send()
.await?
.json()
.await?;
if !state.borrow().connected {
state.borrow_mut().connected = true;
info!("telegram: connected");
}
let results = resp["result"].as_array();
let results = match results {
Some(r) => r,
None => return Ok(()),
};
for update in results {
let update_id = update["update_id"].as_i64().unwrap_or(0);
let msg = &update["message"];
// Update offset
{
let mut s = state.borrow_mut();
s.last_offset = update_id + 1;
save_offset(s.last_offset);
}
let msg_chat_id = msg["chat"]["id"].as_i64().unwrap_or(0);
if msg_chat_id != chat_id {
// Reject messages from unknown chats
let reject_url = format!(
"https://api.telegram.org/bot{}/sendMessage",
token
);
let _ = client
.post(&reject_url)
.form(&[
("chat_id", msg_chat_id.to_string()),
("text", "This is a private bot.".to_string()),
])
.send()
.await;
continue;
}
let sender = msg["from"]["first_name"]
.as_str()
.unwrap_or("unknown")
.to_string();
// Handle different message types
if let Some(text) = msg["text"].as_str() {
let log_line = format!("[{}] {}", sender, text);
state.borrow_mut().push_log(&log_line);
let ts = timestamp();
append_history(&format!("{ts} [{sender}] {text}"));
let _ = notify_tx.send(Notification {
ntype: format!("telegram.{}", sender.to_lowercase()),
urgency: crate::thalamus::notify::NORMAL,
message: log_line,
timestamp: now(),
});
} else if let Some(photos) = msg["photo"].as_array() {
// Pick largest photo
let best = photos.iter().max_by_key(|p| p["file_size"].as_i64().unwrap_or(0));
if let Some(photo) = best {
if let Some(file_id) = photo["file_id"].as_str() {
let caption = msg["caption"].as_str().unwrap_or("");
let local = download_file(&client, &token, file_id, ".jpg").await;
let display = match &local {
Some(p) => format!("[photo: {}]{}", p.display(), if caption.is_empty() { String::new() } else { format!(" {caption}") }),
None => format!("[photo]{}", if caption.is_empty() { String::new() } else { format!(" {caption}") }),
};
let log_line = format!("[{}] {}", sender, display);
state.borrow_mut().push_log(&log_line);
let ts = timestamp();
append_history(&format!("{ts} [{sender}] {display}"));
let _ = notify_tx.send(Notification {
ntype: format!("telegram.{}", sender.to_lowercase()),
urgency: crate::thalamus::notify::NORMAL,
message: log_line,
timestamp: now(),
});
}
}
} else if msg["voice"].is_object() {
if let Some(file_id) = msg["voice"]["file_id"].as_str() {
let caption = msg["caption"].as_str().unwrap_or("");
let local = download_file(&client, &token, file_id, ".ogg").await;
let display = match &local {
Some(p) => format!("[voice: {}]{}", p.display(), if caption.is_empty() { String::new() } else { format!(" {caption}") }),
None => format!("[voice]{}", if caption.is_empty() { String::new() } else { format!(" {caption}") }),
};
let log_line = format!("[{}] {}", sender, display);
state.borrow_mut().push_log(&log_line);
let ts = timestamp();
append_history(&format!("{ts} [{sender}] {display}"));
let _ = notify_tx.send(Notification {
ntype: format!("telegram.{}", sender.to_lowercase()),
urgency: crate::thalamus::notify::NORMAL,
message: log_line,
timestamp: now(),
});
}
} else if msg["document"].is_object() {
if let Some(file_id) = msg["document"]["file_id"].as_str() {
let fname = msg["document"]["file_name"].as_str().unwrap_or("file");
let caption = msg["caption"].as_str().unwrap_or("");
let local = download_file(&client, &token, file_id, "").await;
let display = match &local {
Some(p) => format!("[doc: {} -> {}]{}", fname, p.display(), if caption.is_empty() { String::new() } else { format!(" {caption}") }),
None => format!("[doc: {}]{}", fname, if caption.is_empty() { String::new() } else { format!(" {caption}") }),
};
let log_line = format!("[{}] {}", sender, display);
state.borrow_mut().push_log(&log_line);
let ts = timestamp();
append_history(&format!("{ts} [{sender}] {display}"));
let _ = notify_tx.send(Notification {
ntype: format!("telegram.{}", sender.to_lowercase()),
urgency: crate::thalamus::notify::NORMAL,
message: log_line,
timestamp: now(),
});
}
}
}
Ok(())
}
async fn download_file(
client: &reqwest::Client,
token: &str,
file_id: &str,
ext: &str,
) -> Option<PathBuf> {
let url = format!("https://api.telegram.org/bot{token}/getFile?file_id={file_id}");
let resp: serde_json::Value = client.get(&url).send().await.ok()?.json().await.ok()?;
let file_path = resp["result"]["file_path"].as_str()?;
let download_url = format!("https://api.telegram.org/file/bot{token}/{file_path}");
let bytes = client.get(&download_url).send().await.ok()?.bytes().await.ok()?;
let basename = std::path::Path::new(file_path)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("file");
let local_name = if ext.is_empty() {
basename.to_string()
} else {
let stem = std::path::Path::new(basename)
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("file");
format!("{}{}", stem, ext)
};
let secs = now() as u64;
let local_path = media_dir().join(format!("{secs}_{local_name}"));
std::fs::write(&local_path, &bytes).ok()?;
Some(local_path)
}
fn timestamp() -> String {
// Use the same unix seconds approach as IRC module
format!("{}", now() as u64)
}
/// Handle a runtime command from RPC.
pub async fn handle_command(
state: &SharedTelegram,
_daemon_config: &Rc<RefCell<Config>>,
cmd: &str,
args: &[String],
) -> Result<String, String> {
match cmd {
"send" => {
let msg = args.join(" ");
if msg.is_empty() {
return Err("usage: telegram send <message>".into());
}
let (url, client) = {
let s = state.borrow();
(s.api_url("sendMessage"), s.client.clone())
};
let chat_id = state.borrow().config.chat_id.to_string();
client
.post(&url)
.form(&[("chat_id", chat_id.as_str()), ("text", msg.as_str())])
.send()
.await
.map_err(|e| e.to_string())?;
let ts = timestamp();
append_history(&format!("{ts} [agent] {msg}"));
Ok("sent".to_string())
}
"status" => {
let s = state.borrow();
Ok(format!(
"connected={} log_lines={} offset={}",
s.connected,
s.log.len(),
s.last_offset,
))
}
"log" => {
let n: usize = args
.first()
.and_then(|s| s.parse().ok())
.unwrap_or(15);
let s = state.borrow();
let lines: Vec<&String> = s.log.iter().rev().take(n).collect();
let mut lines: Vec<&str> = lines.iter().map(|s| s.as_str()).collect();
lines.reverse();
Ok(lines.join("\n"))
}
_ => Err(format!(
"unknown telegram command: {cmd}\n\
commands: send, status, log"
)),
}
}

View file

@ -7,7 +7,6 @@
use super::config::Config;
use super::daemon_capnp::daemon;
use super::idle;
use super::modules::{irc, telegram};
use super::notify;
use capnp::capability::Promise;
use std::cell::RefCell;
@ -16,19 +15,16 @@ use tracing::info;
pub struct DaemonImpl {
state: Rc<RefCell<idle::State>>,
irc: Option<irc::SharedIrc>,
telegram: Option<telegram::SharedTelegram>,
config: Rc<RefCell<Config>>,
// TODO: replace with named channel map
_config: Rc<RefCell<Config>>,
}
impl DaemonImpl {
pub fn new(
state: Rc<RefCell<idle::State>>,
irc: Option<irc::SharedIrc>,
telegram: Option<telegram::SharedTelegram>,
config: Rc<RefCell<Config>>,
_config: Rc<RefCell<Config>>,
) -> Self {
Self { state, irc, telegram, config }
Self { state, _config }
}
}
@ -361,7 +357,7 @@ impl daemon::Server for DaemonImpl {
) -> Promise<(), capnp::Error> {
let params = pry!(params.get());
let module = pry!(pry!(params.get_module()).to_str()).to_string();
let command = pry!(pry!(params.get_command()).to_str()).to_string();
let _command = pry!(pry!(params.get_command()).to_str()).to_string();
let args_reader = pry!(params.get_args());
let mut args = Vec::new();
for i in 0..args_reader.len() {
@ -369,44 +365,7 @@ impl daemon::Server for DaemonImpl {
}
match module.as_str() {
"irc" => {
let irc = match &self.irc {
Some(irc) => irc.clone(),
None => {
results.get().set_result("irc module not enabled");
return Promise::ok(());
}
};
let config = self.config.clone();
Promise::from_future(async move {
let result = irc::handle_command(&irc, &config, &command, &args).await;
match result {
Ok(msg) => results.get().set_result(&msg),
Err(msg) => results.get().set_result(&format!("error: {msg}")),
}
Ok(())
})
}
"telegram" => {
let tg = match &self.telegram {
Some(tg) => tg.clone(),
None => {
results.get().set_result("telegram module not enabled");
return Promise::ok(());
}
};
let config = self.config.clone();
Promise::from_future(async move {
let result = telegram::handle_command(&tg, &config, &command, &args).await;
match result {
Ok(msg) => results.get().set_result(&msg),
Err(msg) => results.get().set_result(&format!("error: {msg}")),
}
Ok(())
})
}
// TODO: route module commands through named channel system
_ => {
results
.get()

204
src/thalamus/supervisor.rs Normal file
View file

@ -0,0 +1,204 @@
// supervisor.rs — Channel daemon lifecycle management
//
// Reads ~/.consciousness/channels/channels.json5, starts/stops
// channel daemons as needed. The socket file is the liveness
// indicator — if it exists and responds, the daemon is running.
//
// File locking prevents multiple consciousness/claude-code instances
// from racing to start the same daemon.
use std::collections::BTreeMap;
use std::fs::File;
use std::path::PathBuf;
use std::process::Child;
use tracing::{info, warn, error};
fn channels_dir() -> PathBuf {
dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/channels")
}
fn config_path() -> PathBuf {
channels_dir().join("channels.json5")
}
fn lock_path() -> PathBuf {
channels_dir().join(".supervisor.lock")
}
#[derive(serde::Deserialize, serde::Serialize, Clone)]
pub struct ChannelEntry {
/// Binary name (looked up in PATH)
pub binary: String,
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(default = "default_true")]
pub autostart: bool,
}
fn default_true() -> bool { true }
/// RAII file lock — prevents multiple instances from racing.
struct SupervisorLock {
_file: File,
}
impl SupervisorLock {
fn acquire() -> Option<Self> {
let _ = std::fs::create_dir_all(channels_dir());
let file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.open(lock_path())
.ok()?;
use std::os::unix::io::AsRawFd;
let ret = unsafe {
libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB)
};
if ret != 0 {
return None;
}
Some(Self { _file: file })
}
}
/// Manages channel daemon processes.
pub struct Supervisor {
config: BTreeMap<String, ChannelEntry>,
children: BTreeMap<String, Child>,
}
impl Supervisor {
pub fn new() -> Self {
Self {
config: BTreeMap::new(),
children: BTreeMap::new(),
}
}
/// Load config from channels.json5.
pub fn load_config(&mut self) {
let path = config_path();
match std::fs::read_to_string(&path) {
Ok(text) => {
match serde_json::from_str::<BTreeMap<String, ChannelEntry>>(&text) {
Ok(cfg) => {
info!("loaded {} channel configs", cfg.len());
self.config = cfg;
}
Err(e) => warn!("failed to parse {}: {}", path.display(), e),
}
}
Err(_) => info!("no channels.json5, no channels configured"),
}
}
/// Check if a daemon is alive by testing its socket.
fn is_alive(name: &str) -> bool {
let sock = channels_dir().join(format!("{}.sock", name));
if !sock.exists() {
return false;
}
match std::os::unix::net::UnixStream::connect(&sock) {
Ok(_) => true,
Err(_) => {
let _ = std::fs::remove_file(&sock);
false
}
}
}
/// Ensure all configured autostart daemons are running.
/// Acquires file lock to prevent races with other instances.
pub fn ensure_running(&mut self) {
let _lock = match SupervisorLock::acquire() {
Some(l) => l,
None => {
info!("another instance is managing channels");
return;
}
};
let entries: Vec<(String, ChannelEntry)> = self.config.iter()
.filter(|(_, e)| e.enabled && e.autostart)
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
for (name, entry) in entries {
if Self::is_alive(&name) {
continue;
}
// Check if we spawned it and it died unexpectedly
if let Some(child) = self.children.get_mut(&name) {
match child.try_wait() {
Ok(Some(status)) => {
warn!("channel {} exited unexpectedly ({}), restarting", name, status);
self.children.remove(&name);
}
Ok(None) => continue, // still starting up
Err(_) => { self.children.remove(&name); }
}
}
self.start_one(&name, &entry);
}
}
fn start_one(&mut self, name: &str, entry: &ChannelEntry) {
info!("starting channel daemon: {} ({})", name, entry.binary);
match std::process::Command::new(&entry.binary)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
{
Ok(child) => {
info!("channel {} started (pid {})", name, child.id());
self.children.insert(name.to_string(), child);
}
Err(e) => error!("failed to start channel {}: {}", name, e),
}
}
/// Stop a specific daemon.
pub fn stop_one(&mut self, name: &str) {
let sock = channels_dir().join(format!("{}.sock", name));
let _ = std::fs::remove_file(&sock);
if let Some(mut child) = self.children.remove(name) {
info!("stopping channel {} (pid {})", name, child.id());
let _ = child.kill();
let _ = child.wait();
}
}
/// Stop all managed daemons.
pub fn stop_all(&mut self) {
let names: Vec<String> = self.children.keys().cloned().collect();
for name in names {
self.stop_one(&name);
}
}
/// List configured channels and their status.
pub fn status(&self) -> Vec<(String, bool, bool)> {
self.config.iter()
.map(|(name, entry)| (name.clone(), entry.enabled, Self::is_alive(name)))
.collect()
}
}
impl Drop for Supervisor {
fn drop(&mut self) {
// Don't kill daemons on drop — they should outlive us
for (name, child) in &mut self.children {
match child.try_wait() {
Ok(Some(_)) => {}
_ => info!("leaving channel {} running (pid {})", name, child.id()),
}
}
}
}