// PoC daemon. // // Central hub for notification routing, idle management, and // communication modules (IRC, Telegram) for Claude Code sessions. // Listens on a Unix domain socket with a Cap'n Proto RPC interface. // Same binary serves as both daemon and CLI client. mod config; mod context; mod idle; mod modules; pub mod notify; mod rpc; mod tmux; pub mod daemon_capnp { include!(concat!(env!("OUT_DIR"), "/schema/daemon_capnp.rs")); } use std::cell::RefCell; use std::path::PathBuf; use std::rc::Rc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; use clap::{Parser, Subcommand}; use futures::AsyncReadExt; use tokio::net::UnixListener; use tracing::{error, info}; pub fn now() -> f64 { SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs_f64() } pub fn home() -> PathBuf { PathBuf::from(std::env::var("HOME").unwrap_or_else(|_| "/root".into())) } fn sock_path() -> PathBuf { home().join(".consciousness/daemon.sock") } fn pid_path() -> PathBuf { home().join(".consciousness/daemon.pid") } // ── CLI ────────────────────────────────────────────────────────── #[derive(Parser)] #[command(name = "poc-daemon", about = "Notification routing and idle management daemon")] struct Cli { #[command(subcommand)] command: Option, } #[derive(Subcommand)] enum Command { /// Start the daemon (foreground) Daemon, /// Query daemon status Status, /// Signal user activity User { /// tmux pane identifier pane: Option, }, /// Signal Claude response Response { /// tmux pane identifier pane: Option, }, /// Sleep (suppress idle timer). 0 or omit = indefinite Sleep { /// Wake timestamp (epoch seconds), 0 = indefinite until: Option, }, /// Cancel sleep Wake, /// Suppress prompts for N seconds (default 300) Quiet { /// Duration in seconds seconds: Option, }, /// Mark Kent as AFK (immediately allow idle timer to fire) Afk, /// Set session active timeout in seconds (how long after last message Kent counts as "present") SessionTimeout { /// Timeout in seconds seconds: f64, }, /// Set idle timeout in seconds (how long before autonomous prompt) IdleTimeout { /// Timeout in seconds seconds: f64, }, /// Set notify timeout in seconds (how long before tmux notification injection) NotifyTimeout { /// Timeout in seconds seconds: f64, }, /// Signal consolidation started Consolidating, /// Signal consolidation ended Consolidated, /// Signal dream started DreamStart, /// Signal dream ended DreamEnd, /// Force state persistence to disk Save, /// Get or set the activity EWMA (0.0-1.0). No value = query. Ewma { /// Value to set (omit to query) value: Option, }, /// Send a test message to the Claude pane TestSend { /// Message to send message: Vec, }, /// Fire a test nudge through the daemon (tests the actual idle send path) TestNudge, /// Dump full internal state as JSON Debug, /// Shut down daemon Stop, /// Submit a notification Notify { /// Notification type (e.g. "irc", "telegram") #[arg(name = "type")] ntype: String, /// Urgency level (ambient/low/medium/high/critical or 0-4) urgency: String, /// Message text message: Vec, }, /// Get pending notifications Notifications { /// Minimum urgency filter min_urgency: Option, }, /// List all notification types NotifyTypes, /// Set notification threshold for a type NotifyThreshold { /// Notification type #[arg(name = "type")] ntype: String, /// Urgency level threshold level: String, }, /// IRC module commands Irc { /// Subcommand (join, leave, send, status, log, nick) command: String, /// Arguments args: Vec, }, /// Telegram module commands Telegram { /// Subcommand command: String, /// Arguments args: Vec, }, } // ── Client mode ────────────────────────────────────────────────── async fn client_main(cmd: Command) -> Result<(), Box> { let sock = sock_path(); if !sock.exists() { eprintln!("daemon not running (no socket at {})", sock.display()); std::process::exit(1); } tokio::task::LocalSet::new() .run_until(async move { let stream = tokio::net::UnixStream::connect(&sock).await?; let (reader, writer) = tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split(); let rpc_network = Box::new(twoparty::VatNetwork::new( futures::io::BufReader::new(reader), futures::io::BufWriter::new(writer), rpc_twoparty_capnp::Side::Client, Default::default(), )); let mut rpc_system = RpcSystem::new(rpc_network, None); let daemon: daemon_capnp::daemon::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); tokio::task::spawn_local(rpc_system); match cmd { Command::Daemon => unreachable!("handled in main"), Command::Status => { let reply = daemon.status_request().send().promise.await?; let s = reply.get()?.get_status()?; let fmt_secs = |s: f64| -> String { if s < 60.0 { format!("{:.0}s", s) } else if s < 3600.0 { format!("{:.0}m", s / 60.0) } else { format!("{:.1}h", s / 3600.0) } }; println!("uptime: {} pane: {} activity: {:?} pending: {}", fmt_secs(s.get_uptime()), s.get_claude_pane()?.to_str().unwrap_or("none"), s.get_activity()?, s.get_pending_count(), ); println!("idle timer: {}/{} ({})", fmt_secs(s.get_since_activity()), fmt_secs(s.get_idle_timeout()), s.get_block_reason()?.to_str()?, ); println!("notify timer: {}/{}", fmt_secs(s.get_since_activity()), fmt_secs(s.get_notify_timeout()), ); println!("kent: {} (last {}) activity: {:.1}%", if s.get_kent_present() { "present" } else { "away" }, fmt_secs(s.get_since_user()), s.get_activity_ewma() * 100.0, ); let sleep = s.get_sleep_until(); if sleep != 0.0 { if sleep < 0.0 { println!("sleep: indefinite"); } else { println!("sleep: until {sleep:.0}"); } } if s.get_consolidating() { println!("consolidating"); } if s.get_dreaming() { println!("dreaming"); } } Command::User { pane } => { let pane = pane.as_deref().unwrap_or(""); let mut req = daemon.user_request(); req.get().set_pane(pane); req.send().promise.await?; } Command::Response { pane } => { let pane = pane.as_deref().unwrap_or(""); let mut req = daemon.response_request(); req.get().set_pane(pane); req.send().promise.await?; } Command::Sleep { until } => { let mut req = daemon.sleep_request(); req.get().set_until(until.unwrap_or(0.0)); req.send().promise.await?; } Command::Wake => { daemon.wake_request().send().promise.await?; } Command::Quiet { seconds } => { let mut req = daemon.quiet_request(); req.get().set_seconds(seconds.unwrap_or(300)); req.send().promise.await?; } Command::TestSend { message } => { let msg = message.join(" "); let pane = { let reply = daemon.status_request().send().promise.await?; let s = reply.get()?.get_status()?; s.get_claude_pane()?.to_str()?.to_string() }; let ok = crate::tmux::send_prompt(&pane, &msg); println!("send_prompt(pane={}, ok={}): {}", pane, ok, msg); return Ok(()); } Command::TestNudge => { let reply = daemon.test_nudge_request().send().promise.await?; let r = reply.get()?; println!("sent={} message={}", r.get_sent(), r.get_message()?.to_str()?); return Ok(()); } Command::Afk => { daemon.afk_request().send().promise.await?; println!("marked AFK"); } Command::SessionTimeout { seconds } => { let mut req = daemon.session_timeout_request(); req.get().set_seconds(seconds); req.send().promise.await?; println!("session timeout = {seconds}s"); } Command::IdleTimeout { seconds } => { let mut req = daemon.idle_timeout_request(); req.get().set_seconds(seconds); req.send().promise.await?; println!("idle timeout = {seconds}s"); } Command::NotifyTimeout { seconds } => { let mut req = daemon.notify_timeout_request(); req.get().set_seconds(seconds); req.send().promise.await?; println!("notify timeout = {seconds}s"); } Command::Consolidating => { daemon.consolidating_request().send().promise.await?; } Command::Consolidated => { daemon.consolidated_request().send().promise.await?; } Command::DreamStart => { daemon.dream_start_request().send().promise.await?; } Command::DreamEnd => { daemon.dream_end_request().send().promise.await?; } Command::Save => { daemon.save_request().send().promise.await?; println!("state saved"); } Command::Ewma { value } => { let mut req = daemon.ewma_request(); req.get().set_value(value.unwrap_or(-1.0)); let reply = req.send().promise.await?; let current = reply.get()?.get_current(); println!("{:.1}%", current * 100.0); } Command::Debug => { let reply = daemon.debug_request().send().promise.await?; let json = reply.get()?.get_json()?.to_str()?; if let Ok(v) = serde_json::from_str::(json) { println!("{}", serde_json::to_string_pretty(&v).unwrap_or_else(|_| json.to_string())); } else { println!("{json}"); } } Command::Stop => { daemon.stop_request().send().promise.await?; println!("stopping"); } Command::Notify { ntype, urgency, message } => { let urgency = notify::parse_urgency(&urgency) .ok_or_else(|| format!("invalid urgency: {urgency}"))?; let message = message.join(" "); if message.is_empty() { return Err("missing message".into()); } let mut req = daemon.notify_request(); let mut n = req.get().init_notification(); n.set_type(&ntype); n.set_urgency(urgency); n.set_message(&message); n.set_timestamp(crate::now()); let reply = req.send().promise.await?; if reply.get()?.get_interrupt() { println!("interrupt"); } else { println!("queued"); } } Command::Notifications { min_urgency } => { let min: u8 = min_urgency .as_deref() .and_then(notify::parse_urgency) .unwrap_or(255); let mut req = daemon.get_notifications_request(); req.get().set_min_urgency(min); let reply = req.send().promise.await?; let list = reply.get()?.get_notifications()?; for n in list.iter() { println!( "[{}:{}] {}", n.get_type()?.to_str()?, notify::urgency_name(n.get_urgency()), n.get_message()?.to_str()?, ); } } Command::NotifyTypes => { let reply = daemon.get_types_request().send().promise.await?; let list = reply.get()?.get_types()?; if list.is_empty() { println!("no notification types registered"); } else { for t in list.iter() { let threshold = if t.get_threshold() < 0 { "inherit".to_string() } else { notify::urgency_name(t.get_threshold() as u8).to_string() }; println!( "{}: count={} threshold={}", t.get_name()?.to_str()?, t.get_count(), threshold, ); } } } Command::NotifyThreshold { ntype, level } => { let level = notify::parse_urgency(&level) .ok_or_else(|| format!("invalid level: {level}"))?; let mut req = daemon.set_threshold_request(); req.get().set_type(&ntype); req.get().set_level(level); req.send().promise.await?; println!("{ntype} threshold={}", notify::urgency_name(level)); } Command::Irc { command, args } => { module_command(&daemon, "irc", &command, &args).await?; } Command::Telegram { command, args } => { module_command(&daemon, "telegram", &command, &args).await?; } } Ok(()) }) .await } async fn module_command( daemon: &daemon_capnp::daemon::Client, module: &str, command: &str, args: &[String], ) -> Result<(), Box> { let mut req = daemon.module_command_request(); req.get().set_module(module); req.get().set_command(command); let mut args_builder = req.get().init_args(args.len() as u32); for (i, a) in args.iter().enumerate() { args_builder.set(i as u32, a); } let reply = req.send().promise.await?; let result = reply.get()?.get_result()?.to_str()?; if !result.is_empty() { println!("{result}"); } Ok(()) } // ── Server mode ────────────────────────────────────────────────── async fn server_main() -> Result<(), Box> { let log_path = home().join(".consciousness/logs/daemon.log"); let file_appender = tracing_appender::rolling::daily( log_path.parent().unwrap(), "daemon.log", ); tracing_subscriber::fmt() .with_writer(file_appender) .with_ansi(false) .with_target(false) .with_level(false) .with_timer(tracing_subscriber::fmt::time::time()) .init(); let sock = sock_path(); let _ = std::fs::remove_file(&sock); let pid = std::process::id(); std::fs::write(pid_path(), pid.to_string()).ok(); let daemon_config = Rc::new(RefCell::new(config::Config::load())); let state = Rc::new(RefCell::new(idle::State::new())); state.borrow_mut().load(); info!("daemon started (pid={pid})"); tokio::task::LocalSet::new() .run_until(async move { // Start modules let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel(); let irc_state = if daemon_config.borrow().irc.enabled { let irc_config = daemon_config.borrow().irc.clone(); info!("starting irc module: {}:{}", irc_config.server, irc_config.port); Some(modules::irc::start(irc_config, notify_tx.clone(), daemon_config.clone())) } else { info!("irc module disabled"); None }; let telegram_state = if daemon_config.borrow().telegram.enabled { info!("starting telegram module"); Some(modules::telegram::start( daemon_config.borrow().telegram.clone(), notify_tx.clone(), daemon_config.clone(), )) } else { info!("telegram module disabled"); None }; let listener = UnixListener::bind(&sock)?; #[cfg(unix)] { use std::os::unix::fs::PermissionsExt; std::fs::set_permissions( &sock, std::fs::Permissions::from_mode(0o600), ) .ok(); } let shutdown = async { let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) .expect("sigterm"); let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()) .expect("sigint"); tokio::select! { _ = sigterm.recv() => info!("SIGTERM"), _ = sigint.recv() => info!("SIGINT"), } }; tokio::pin!(shutdown); let mut tick_timer = tokio::time::interval(Duration::from_secs(30)); tick_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { tokio::select! { _ = &mut shutdown => break, // Drain module notifications into state Some(notif) = notify_rx.recv() => { state.borrow_mut().maybe_prompt_notification( ¬if.ntype, notif.urgency, ¬if.message, ); state.borrow_mut().notifications.submit( notif.ntype, notif.urgency, notif.message, ); } _ = tick_timer.tick() => { if let Err(e) = state.borrow_mut().tick().await { error!("tick: {e}"); } if !state.borrow().running { break; } } result = listener.accept() => { match result { Ok((stream, _)) => { let (reader, writer) = tokio_util::compat::TokioAsyncReadCompatExt::compat(stream) .split(); let network = twoparty::VatNetwork::new( futures::io::BufReader::new(reader), futures::io::BufWriter::new(writer), rpc_twoparty_capnp::Side::Server, Default::default(), ); let daemon_impl = rpc::DaemonImpl::new( state.clone(), irc_state.clone(), telegram_state.clone(), daemon_config.clone(), ); let client: daemon_capnp::daemon::Client = capnp_rpc::new_client(daemon_impl); let rpc_system = RpcSystem::new( Box::new(network), Some(client.client), ); tokio::task::spawn_local(rpc_system); } Err(e) => error!("accept: {e}"), } } } } state.borrow().save(); let _ = std::fs::remove_file(sock_path()); let _ = std::fs::remove_file(pid_path()); info!("daemon stopped"); Ok(()) }) .await } // ── Entry point ────────────────────────────────────────────────── #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Box> { let cli = Cli::parse(); match cli.command { Some(Command::Daemon) => server_main().await, Some(cmd) => client_main(cmd).await, None => { Cli::parse_from(["poc-daemon", "--help"]); Ok(()) } } }