// PoC daemon. // // Central hub for notification routing, idle management, and // communication modules (IRC, Telegram) for Claude Code sessions. // Listens on a Unix domain socket with a Cap'n Proto RPC interface. // Same binary serves as both daemon and CLI client. // // Usage: // poc-daemon Start the daemon // poc-daemon status Query daemon status // poc-daemon user [pane] Signal user activity // poc-daemon response [pane] Signal Claude response // poc-daemon notify // poc-daemon notifications Get pending notifications // poc-daemon notify-types List all notification types // poc-daemon notify-threshold // poc-daemon sleep [timestamp] Sleep (0 or omit = indefinite) // poc-daemon wake Cancel sleep // poc-daemon quiet [seconds] Suppress prompts // poc-daemon irc IRC module commands // poc-daemon stop Shut down daemon mod config; mod context; mod idle; mod modules; pub mod notify; mod rpc; mod tmux; pub mod daemon_capnp { include!(concat!(env!("OUT_DIR"), "/schema/daemon_capnp.rs")); } use std::cell::RefCell; use std::path::PathBuf; use std::rc::Rc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; use futures::AsyncReadExt; use tokio::net::UnixListener; use tracing::{error, info}; pub fn now() -> f64 { SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs_f64() } pub fn home() -> PathBuf { PathBuf::from(std::env::var("HOME").unwrap_or_else(|_| "/root".into())) } fn sock_path() -> PathBuf { home().join(".claude/hooks/idle-timer.sock") } fn pid_path() -> PathBuf { home().join(".claude/hooks/idle-daemon.pid") } // ── Client mode ────────────────────────────────────────────────── async fn client_main(args: Vec) -> Result<(), Box> { let sock = sock_path(); if !sock.exists() { eprintln!("daemon not running (no socket at {})", sock.display()); std::process::exit(1); } tokio::task::LocalSet::new() .run_until(async move { let stream = tokio::net::UnixStream::connect(&sock).await?; let (reader, writer) = tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split(); let rpc_network = Box::new(twoparty::VatNetwork::new( futures::io::BufReader::new(reader), futures::io::BufWriter::new(writer), rpc_twoparty_capnp::Side::Client, Default::default(), )); let mut rpc_system = RpcSystem::new(rpc_network, None); let daemon: daemon_capnp::daemon::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); tokio::task::spawn_local(rpc_system); let cmd = args.get(1).map(|s| s.as_str()).unwrap_or("status"); match cmd { "status" => { let reply = daemon.status_request().send().promise.await?; let status = reply.get()?.get_status()?; println!( "uptime={:.0}s pane={} kent={} activity={:?} pending={} fired={} sleep={} quiet={} dreaming={} consolidating={}", status.get_uptime(), status.get_claude_pane()?.to_str().unwrap_or("none"), status.get_kent_present(), status.get_activity()?, status.get_pending_count(), status.get_fired(), status.get_sleep_until(), status.get_quiet_until(), status.get_dreaming(), status.get_consolidating(), ); } "user" => { let pane = args.get(2).map(|s| s.as_str()).unwrap_or(""); let mut req = daemon.user_request(); req.get().set_pane(pane); req.send().promise.await?; } "response" => { let pane = args.get(2).map(|s| s.as_str()).unwrap_or(""); let mut req = daemon.response_request(); req.get().set_pane(pane); req.send().promise.await?; } "sleep" => { let until: f64 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(0.0); let mut req = daemon.sleep_request(); req.get().set_until(until); req.send().promise.await?; } "wake" => { daemon.wake_request().send().promise.await?; } "quiet" => { let secs: u32 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(300); let mut req = daemon.quiet_request(); req.get().set_seconds(secs); req.send().promise.await?; } "consolidating" => { daemon.consolidating_request().send().promise.await?; } "consolidated" => { daemon.consolidated_request().send().promise.await?; } "dream-start" => { daemon.dream_start_request().send().promise.await?; } "dream-end" => { daemon.dream_end_request().send().promise.await?; } "stop" => { daemon.stop_request().send().promise.await?; println!("stopping"); } "notify" => { let ntype = args.get(2).ok_or("missing type")?; let urgency_str = args.get(3).ok_or("missing urgency")?; let urgency = notify::parse_urgency(urgency_str) .ok_or_else(|| format!("invalid urgency: {urgency_str}"))?; let message = args[4..].join(" "); if message.is_empty() { return Err("missing message".into()); } let mut req = daemon.notify_request(); let mut n = req.get().init_notification(); n.set_type(ntype); n.set_urgency(urgency); n.set_message(&message); n.set_timestamp(crate::now()); let reply = req.send().promise.await?; if reply.get()?.get_interrupt() { println!("interrupt"); } else { println!("queued"); } } "notifications" => { let min: u8 = args .get(2) .and_then(|s| notify::parse_urgency(s)) .unwrap_or(255); let mut req = daemon.get_notifications_request(); req.get().set_min_urgency(min); let reply = req.send().promise.await?; let list = reply.get()?.get_notifications()?; if !list.is_empty() { for n in list.iter() { println!( "[{}:{}] {}", n.get_type()?.to_str()?, notify::urgency_name(n.get_urgency()), n.get_message()?.to_str()?, ); } } } "notify-types" => { let reply = daemon.get_types_request().send().promise.await?; let list = reply.get()?.get_types()?; if list.is_empty() { println!("no notification types registered"); } else { for t in list.iter() { let threshold = if t.get_threshold() < 0 { "inherit".to_string() } else { notify::urgency_name(t.get_threshold() as u8).to_string() }; println!( "{}: count={} threshold={}", t.get_name()?.to_str()?, t.get_count(), threshold, ); } } } "notify-threshold" => { let ntype = args.get(2).ok_or("missing type")?; let level_str = args.get(3).ok_or("missing level")?; let level = notify::parse_urgency(level_str) .ok_or_else(|| format!("invalid level: {level_str}"))?; let mut req = daemon.set_threshold_request(); req.get().set_type(ntype); req.get().set_level(level); req.send().promise.await?; println!("{ntype} threshold={}", notify::urgency_name(level)); } // Module commands: "irc join #foo", "telegram send hello" "irc" | "telegram" => { let module = cmd; let module_cmd = args.get(2).ok_or( format!("usage: poc-daemon {module} [args...]"), )?; let module_args: Vec<&str> = args[3..].iter().map(|s| s.as_str()).collect(); let mut req = daemon.module_command_request(); req.get().set_module(module); req.get().set_command(module_cmd); let mut args_builder = req.get().init_args(module_args.len() as u32); for (i, a) in module_args.iter().enumerate() { args_builder.set(i as u32, a); } let reply = req.send().promise.await?; let result = reply.get()?.get_result()?.to_str()?; if !result.is_empty() { println!("{result}"); } } _ => { eprintln!("unknown command: {cmd}"); std::process::exit(1); } } Ok(()) }) .await } // ── Server mode ────────────────────────────────────────────────── async fn server_main() -> Result<(), Box> { let log_path = home().join(".claude/hooks/idle-daemon.log"); let file_appender = tracing_appender::rolling::daily( log_path.parent().unwrap(), "idle-daemon.log", ); tracing_subscriber::fmt() .with_writer(file_appender) .with_ansi(false) .with_target(false) .with_level(false) .with_timer(tracing_subscriber::fmt::time::time()) .init(); let sock = sock_path(); let _ = std::fs::remove_file(&sock); let pid = std::process::id(); std::fs::write(pid_path(), pid.to_string()).ok(); let daemon_config = Rc::new(RefCell::new(config::Config::load())); let state = Rc::new(RefCell::new(idle::State::new())); state.borrow_mut().load(); info!("daemon started (pid={pid})"); tokio::task::LocalSet::new() .run_until(async move { // Start modules let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel(); let irc_state = if daemon_config.borrow().irc.enabled { let irc_config = daemon_config.borrow().irc.clone(); info!("starting irc module: {}:{}", irc_config.server, irc_config.port); Some(modules::irc::start(irc_config, notify_tx.clone(), daemon_config.clone())) } else { info!("irc module disabled"); None }; let telegram_state = if daemon_config.borrow().telegram.enabled { info!("starting telegram module"); Some(modules::telegram::start( daemon_config.borrow().telegram.clone(), notify_tx.clone(), daemon_config.clone(), )) } else { info!("telegram module disabled"); None }; let listener = UnixListener::bind(&sock)?; #[cfg(unix)] { use std::os::unix::fs::PermissionsExt; std::fs::set_permissions( &sock, std::fs::Permissions::from_mode(0o600), ) .ok(); } let shutdown = async { let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) .expect("sigterm"); let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()) .expect("sigint"); tokio::select! { _ = sigterm.recv() => info!("SIGTERM"), _ = sigint.recv() => info!("SIGINT"), } }; tokio::pin!(shutdown); let mut tick_timer = tokio::time::interval(Duration::from_secs(30)); tick_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { tokio::select! { _ = &mut shutdown => break, // Drain module notifications into state Some(notif) = notify_rx.recv() => { state.borrow_mut().notifications.submit( notif.ntype, notif.urgency, notif.message, ); } _ = tick_timer.tick() => { if let Err(e) = state.borrow_mut().tick().await { error!("tick: {e}"); } if !state.borrow().running { break; } } result = listener.accept() => { match result { Ok((stream, _)) => { let (reader, writer) = tokio_util::compat::TokioAsyncReadCompatExt::compat(stream) .split(); let network = twoparty::VatNetwork::new( futures::io::BufReader::new(reader), futures::io::BufWriter::new(writer), rpc_twoparty_capnp::Side::Server, Default::default(), ); let daemon_impl = rpc::DaemonImpl::new( state.clone(), irc_state.clone(), telegram_state.clone(), daemon_config.clone(), ); let client: daemon_capnp::daemon::Client = capnp_rpc::new_client(daemon_impl); let rpc_system = RpcSystem::new( Box::new(network), Some(client.client), ); tokio::task::spawn_local(rpc_system); } Err(e) => error!("accept: {e}"), } } } } let _ = std::fs::remove_file(sock_path()); let _ = std::fs::remove_file(pid_path()); info!("daemon stopped"); Ok(()) }) .await } // ── Entry point ────────────────────────────────────────────────── #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Box> { let args: Vec = std::env::args().collect(); if args.len() < 2 { return server_main().await; } match args[1].as_str() { "status" | "user" | "response" | "sleep" | "wake" | "quiet" | "consolidating" | "consolidated" | "dream-start" | "dream-end" | "stop" | "notify" | "notifications" | "notify-types" | "notify-threshold" | "irc" | "telegram" => client_main(args).await, _ => { eprintln!("usage: poc-daemon [command]"); eprintln!(" (no args) Start daemon"); eprintln!(" status Query daemon status"); eprintln!(" user [pane] Signal user activity"); eprintln!(" response [pane] Signal Claude response"); eprintln!(" notify "); eprintln!(" notifications [min_urgency]"); eprintln!(" notify-types List notification types"); eprintln!(" notify-threshold "); eprintln!(" sleep [timestamp]"); eprintln!(" wake / quiet / stop / dream-start / dream-end"); eprintln!(" irc [args]"); std::process::exit(1); } } }