diff --git a/Cargo.toml b/Cargo.toml index 03a376d..c333ccf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,18 +73,14 @@ capnpc = "0.20" name = "poc_memory" path = "src/lib.rs" +[[bin]] +name = "consciousness" +path = "src/bin/consciousness.rs" + [[bin]] name = "poc-memory" path = "src/main.rs" -[[bin]] -name = "memory-search" -path = "src/bin/memory-search.rs" - -[[bin]] -name = "poc-hook" -path = "src/bin/poc-hook.rs" - [[bin]] name = "merge-logs" path = "src/bin/merge-logs.rs" @@ -98,9 +94,13 @@ name = "find-deleted" path = "src/bin/find-deleted.rs" [[bin]] -name = "consciousness" -path = "src/bin/consciousness.rs" +name = "poc-hook" +path = "src/claude/poc-hook.rs" [[bin]] name = "poc-daemon" -path = "src/bin/poc-daemon.rs" +path = "src/claude/poc-daemon.rs" + +[[bin]] +name = "memory-search" +path = "src/claude/memory-search.rs" diff --git a/src/thalamus/config.rs b/src/claude/config.rs similarity index 98% rename from src/thalamus/config.rs rename to src/claude/config.rs index 8f1668d..c38dc21 100644 --- a/src/thalamus/config.rs +++ b/src/claude/config.rs @@ -3,7 +3,7 @@ // Lives at ~/.consciousness/daemon.toml. Loaded on startup, updated at // runtime when modules change state (join channel, etc.). -use super::home; +use crate::thalamus::home; use serde::{Deserialize, Serialize}; use std::fs; use std::path::PathBuf; diff --git a/src/thalamus/context.rs b/src/claude/context.rs similarity index 100% rename from src/thalamus/context.rs rename to src/claude/context.rs diff --git a/src/subconscious/hook.rs b/src/claude/hook.rs similarity index 99% rename from src/subconscious/hook.rs rename to src/claude/hook.rs index 8c52bb9..dabd98b 100644 --- a/src/subconscious/hook.rs +++ b/src/claude/hook.rs @@ -12,7 +12,7 @@ use std::process::Command; use std::time::Instant; pub use crate::session::HookSession; -pub use super::subconscious::*; +pub use crate::subconscious::subconscious::*; const CHUNK_SIZE: usize = 9000; diff --git a/src/thalamus/idle.rs b/src/claude/idle.rs similarity index 99% rename from src/thalamus/idle.rs rename to src/claude/idle.rs index 88e96fb..7149f9f 100644 --- a/src/thalamus/idle.rs +++ b/src/claude/idle.rs @@ -7,7 +7,8 @@ // Designed as the first "module" — future IRC/Telegram modules will // follow the same pattern: state + tick + handle_command. -use super::{context, home, now, notify, tmux}; +use super::{context, tmux}; +use crate::thalamus::{home, now, notify}; use serde::{Deserialize, Serialize}; use std::fs; use tracing::info; diff --git a/src/bin/memory-search.rs b/src/claude/memory-search.rs similarity index 100% rename from src/bin/memory-search.rs rename to src/claude/memory-search.rs diff --git a/src/claude/mod.rs b/src/claude/mod.rs new file mode 100644 index 0000000..8c7e73d --- /dev/null +++ b/src/claude/mod.rs @@ -0,0 +1,583 @@ +// claude/ — Claude Code integration layer +// +// Everything specific to running as a Claude Code agent: idle timer, +// tmux pane detection, prompt injection, session hooks, daemon RPC, +// and daemon configuration. +// +// The daemon protocol (daemon_capnp) and universal infrastructure +// (channels, supervisor, notify) remain in thalamus/. + +pub mod config; +pub mod context; +pub mod hook; +pub mod idle; +pub mod rpc; +pub mod tmux; + +use std::cell::RefCell; +use std::rc::Rc; +use std::time::Duration; + +use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; +use clap::{Parser, Subcommand}; +use futures::AsyncReadExt; +use tokio::net::UnixListener; +use tracing::{error, info}; + +use crate::thalamus::{daemon_capnp, home, now, notify}; + +fn sock_path() -> std::path::PathBuf { + home().join(".consciousness/daemon.sock") +} + +fn pid_path() -> std::path::PathBuf { + home().join(".consciousness/daemon.pid") +} + +// -- CLI ------------------------------------------------------------------ + +#[derive(Parser)] +#[command(name = "consciousness daemon", about = "Notification routing and idle management daemon")] +pub struct Cli { + #[command(subcommand)] + pub command: Option, +} + +#[derive(Subcommand)] +pub enum Command { + /// Start the daemon (foreground) + Daemon, + /// Query daemon status + Status, + /// Signal user activity + User { + /// tmux pane identifier + pane: Option, + }, + /// Signal Claude response + Response { + /// tmux pane identifier + pane: Option, + }, + /// Sleep (suppress idle timer). 0 or omit = indefinite + Sleep { + /// Wake timestamp (epoch seconds), 0 = indefinite + until: Option, + }, + /// Cancel sleep + Wake, + /// Suppress prompts for N seconds (default 300) + Quiet { + /// Duration in seconds + seconds: Option, + }, + /// Mark user as AFK (immediately allow idle timer to fire) + Afk, + /// Set session active timeout in seconds (how long after last message user counts as "present") + SessionTimeout { + /// Timeout in seconds + seconds: f64, + }, + /// Set idle timeout in seconds (how long before autonomous prompt) + IdleTimeout { + /// Timeout in seconds + seconds: f64, + }, + /// Set notify timeout in seconds (how long before tmux notification injection) + NotifyTimeout { + /// Timeout in seconds + seconds: f64, + }, + /// Signal consolidation started + Consolidating, + /// Signal consolidation ended + Consolidated, + /// Signal dream started + DreamStart, + /// Signal dream ended + DreamEnd, + /// Force state persistence to disk + Save, + /// Get or set the activity EWMA (0.0-1.0). No value = query. + Ewma { + /// Value to set (omit to query) + value: Option, + }, + /// Send a test message to the Claude pane + TestSend { + /// Message to send + message: Vec, + }, + /// Fire a test nudge through the daemon (tests the actual idle send path) + TestNudge, + /// Dump full internal state as JSON + Debug, + /// Shut down daemon + Stop, + /// Submit a notification + Notify { + /// Notification type (e.g. "irc", "telegram") + #[arg(name = "type")] + ntype: String, + /// Urgency level (ambient/low/medium/high/critical or 0-4) + urgency: String, + /// Message text + message: Vec, + }, + /// Get pending notifications + Notifications { + /// Minimum urgency filter + min_urgency: Option, + }, + /// List all notification types + NotifyTypes, + /// Set notification threshold for a type + NotifyThreshold { + /// Notification type + #[arg(name = "type")] + ntype: String, + /// Urgency level threshold + level: String, + }, + /// IRC module commands + Irc { + /// Subcommand (join, leave, send, status, log, nick) + command: String, + /// Arguments + args: Vec, + }, + /// Telegram module commands + Telegram { + /// Subcommand + command: String, + /// Arguments + args: Vec, + }, +} + +// -- Client mode ---------------------------------------------------------- + +async fn client_main(cmd: Command) -> Result<(), Box> { + let sock = sock_path(); + if !sock.exists() { + eprintln!("daemon not running (no socket at {})", sock.display()); + std::process::exit(1); + } + + tokio::task::LocalSet::new() + .run_until(async move { + let stream = tokio::net::UnixStream::connect(&sock).await?; + let (reader, writer) = + tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split(); + let rpc_network = Box::new(twoparty::VatNetwork::new( + futures::io::BufReader::new(reader), + futures::io::BufWriter::new(writer), + rpc_twoparty_capnp::Side::Client, + Default::default(), + )); + let mut rpc_system = RpcSystem::new(rpc_network, None); + let daemon: daemon_capnp::daemon::Client = + rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); + + tokio::task::spawn_local(rpc_system); + + match cmd { + Command::Daemon => unreachable!("handled in main"), + Command::Status => { + let reply = daemon.status_request().send().promise.await?; + let s = reply.get()?.get_status()?; + + let fmt_secs = |s: f64| -> String { + if s < 60.0 { format!("{:.0}s", s) } + else if s < 3600.0 { format!("{:.0}m", s / 60.0) } + else { format!("{:.1}h", s / 3600.0) } + }; + + println!("uptime: {} pane: {} activity: {:?} pending: {}", + fmt_secs(s.get_uptime()), + s.get_claude_pane()?.to_str().unwrap_or("none"), + s.get_activity()?, + s.get_pending_count(), + ); + println!("idle timer: {}/{} ({})", + fmt_secs(s.get_since_activity()), + fmt_secs(s.get_idle_timeout()), + s.get_block_reason()?.to_str()?, + ); + println!("notify timer: {}/{}", + fmt_secs(s.get_since_activity()), + fmt_secs(s.get_notify_timeout()), + ); + println!("user: {} (last {}) activity: {:.1}%", + if s.get_user_present() { "present" } else { "away" }, + fmt_secs(s.get_since_user()), + s.get_activity_ewma() * 100.0, + ); + + let sleep = s.get_sleep_until(); + if sleep != 0.0 { + if sleep < 0.0 { + println!("sleep: indefinite"); + } else { + println!("sleep: until {sleep:.0}"); + } + } + if s.get_consolidating() { println!("consolidating"); } + if s.get_dreaming() { println!("dreaming"); } + } + Command::User { pane } => { + let pane = pane.as_deref().unwrap_or(""); + let mut req = daemon.user_request(); + req.get().set_pane(pane); + req.send().promise.await?; + } + Command::Response { pane } => { + let pane = pane.as_deref().unwrap_or(""); + let mut req = daemon.response_request(); + req.get().set_pane(pane); + req.send().promise.await?; + } + Command::Sleep { until } => { + let mut req = daemon.sleep_request(); + req.get().set_until(until.unwrap_or(0.0)); + req.send().promise.await?; + } + Command::Wake => { + daemon.wake_request().send().promise.await?; + } + Command::Quiet { seconds } => { + let mut req = daemon.quiet_request(); + req.get().set_seconds(seconds.unwrap_or(300)); + req.send().promise.await?; + } + Command::TestSend { message } => { + let msg = message.join(" "); + let pane = { + let reply = daemon.status_request().send().promise.await?; + let s = reply.get()?.get_status()?; + s.get_claude_pane()?.to_str()?.to_string() + }; + let ok = tmux::send_prompt(&pane, &msg); + println!("send_prompt(pane={}, ok={}): {}", pane, ok, msg); + return Ok(()); + } + Command::TestNudge => { + let reply = daemon.test_nudge_request().send().promise.await?; + let r = reply.get()?; + println!("sent={} message={}", r.get_sent(), r.get_message()?.to_str()?); + return Ok(()); + } + Command::Afk => { + daemon.afk_request().send().promise.await?; + println!("marked AFK"); + } + Command::SessionTimeout { seconds } => { + let mut req = daemon.session_timeout_request(); + req.get().set_seconds(seconds); + req.send().promise.await?; + println!("session timeout = {seconds}s"); + } + Command::IdleTimeout { seconds } => { + let mut req = daemon.idle_timeout_request(); + req.get().set_seconds(seconds); + req.send().promise.await?; + println!("idle timeout = {seconds}s"); + } + Command::NotifyTimeout { seconds } => { + let mut req = daemon.notify_timeout_request(); + req.get().set_seconds(seconds); + req.send().promise.await?; + println!("notify timeout = {seconds}s"); + } + Command::Consolidating => { + daemon.consolidating_request().send().promise.await?; + } + Command::Consolidated => { + daemon.consolidated_request().send().promise.await?; + } + Command::DreamStart => { + daemon.dream_start_request().send().promise.await?; + } + Command::DreamEnd => { + daemon.dream_end_request().send().promise.await?; + } + Command::Save => { + daemon.save_request().send().promise.await?; + println!("state saved"); + } + Command::Ewma { value } => { + let mut req = daemon.ewma_request(); + req.get().set_value(value.unwrap_or(-1.0)); + let reply = req.send().promise.await?; + let current = reply.get()?.get_current(); + println!("{:.1}%", current * 100.0); + } + Command::Debug => { + let reply = daemon.debug_request().send().promise.await?; + let json = reply.get()?.get_json()?.to_str()?; + if let Ok(v) = serde_json::from_str::(json) { + println!("{}", serde_json::to_string_pretty(&v).unwrap_or_else(|_| json.to_string())); + } else { + println!("{json}"); + } + } + Command::Stop => { + daemon.stop_request().send().promise.await?; + println!("stopping"); + } + Command::Notify { ntype, urgency, message } => { + let urgency = notify::parse_urgency(&urgency) + .ok_or_else(|| format!("invalid urgency: {urgency}"))?; + let message = message.join(" "); + if message.is_empty() { + return Err("missing message".into()); + } + + let mut req = daemon.notify_request(); + let mut n = req.get().init_notification(); + n.set_type(&ntype); + n.set_urgency(urgency); + n.set_message(&message); + n.set_timestamp(now()); + let reply = req.send().promise.await?; + if reply.get()?.get_interrupt() { + println!("interrupt"); + } else { + println!("queued"); + } + } + Command::Notifications { min_urgency } => { + let min: u8 = min_urgency + .as_deref() + .and_then(notify::parse_urgency) + .unwrap_or(255); + + let mut req = daemon.get_notifications_request(); + req.get().set_min_urgency(min); + let reply = req.send().promise.await?; + let list = reply.get()?.get_notifications()?; + + for n in list.iter() { + println!( + "[{}:{}] {}", + n.get_type()?.to_str()?, + notify::urgency_name(n.get_urgency()), + n.get_message()?.to_str()?, + ); + } + } + Command::NotifyTypes => { + let reply = daemon.get_types_request().send().promise.await?; + let list = reply.get()?.get_types()?; + + if list.is_empty() { + println!("no notification types registered"); + } else { + for t in list.iter() { + let threshold = if t.get_threshold() < 0 { + "inherit".to_string() + } else { + notify::urgency_name(t.get_threshold() as u8).to_string() + }; + println!( + "{}: count={} threshold={}", + t.get_name()?.to_str()?, + t.get_count(), + threshold, + ); + } + } + } + Command::NotifyThreshold { ntype, level } => { + let level = notify::parse_urgency(&level) + .ok_or_else(|| format!("invalid level: {level}"))?; + + let mut req = daemon.set_threshold_request(); + req.get().set_type(&ntype); + req.get().set_level(level); + req.send().promise.await?; + println!("{ntype} threshold={}", notify::urgency_name(level)); + } + Command::Irc { command, args } => { + module_command(&daemon, "irc", &command, &args).await?; + } + Command::Telegram { command, args } => { + module_command(&daemon, "telegram", &command, &args).await?; + } + } + + Ok(()) + }) + .await +} + +async fn module_command( + daemon: &daemon_capnp::daemon::Client, + module: &str, + command: &str, + args: &[String], +) -> Result<(), Box> { + let mut req = daemon.module_command_request(); + req.get().set_module(module); + req.get().set_command(command); + let mut args_builder = req.get().init_args(args.len() as u32); + for (i, a) in args.iter().enumerate() { + args_builder.set(i as u32, a); + } + let reply = req.send().promise.await?; + let result = reply.get()?.get_result()?.to_str()?; + if !result.is_empty() { + println!("{result}"); + } + Ok(()) +} + +// -- Server mode ---------------------------------------------------------- + +async fn server_main() -> Result<(), Box> { + let log_path = home().join(".consciousness/logs/daemon.log"); + let file_appender = tracing_appender::rolling::daily( + log_path.parent().unwrap(), + "daemon.log", + ); + tracing_subscriber::fmt() + .with_writer(file_appender) + .with_ansi(false) + .with_target(false) + .with_level(false) + .with_timer(tracing_subscriber::fmt::time::time()) + .init(); + + let sock = sock_path(); + let _ = std::fs::remove_file(&sock); + + let pid = std::process::id(); + std::fs::write(pid_path(), pid.to_string()).ok(); + + let daemon_config = Rc::new(RefCell::new(config::Config::load())); + + let state = Rc::new(RefCell::new(idle::State::new())); + state.borrow_mut().load(); + + info!("daemon started (pid={pid})"); + + tokio::task::LocalSet::new() + .run_until(async move { + // Start modules + let (_notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel::(); + + // External modules (IRC, Telegram) now run as separate daemons. + // They connect via the notification channel when implemented. + let _irc_state: Option<()> = None; + let _telegram_state: Option<()> = None; + + let listener = UnixListener::bind(&sock)?; + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + std::fs::set_permissions( + &sock, + std::fs::Permissions::from_mode(0o600), + ) + .ok(); + } + + let shutdown = async { + let mut sigterm = + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .expect("sigterm"); + let mut sigint = + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()) + .expect("sigint"); + tokio::select! { + _ = sigterm.recv() => info!("SIGTERM"), + _ = sigint.recv() => info!("SIGINT"), + } + }; + tokio::pin!(shutdown); + + let mut tick_timer = tokio::time::interval(Duration::from_secs(30)); + tick_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + _ = &mut shutdown => break, + + // Drain module notifications into state + Some(notif) = notify_rx.recv() => { + state.borrow_mut().maybe_prompt_notification( + ¬if.ntype, notif.urgency, ¬if.message, + ); + state.borrow_mut().notifications.submit( + notif.ntype, + notif.urgency, + notif.message, + ); + } + + _ = tick_timer.tick() => { + if let Err(e) = state.borrow_mut().tick().await { + error!("tick: {e}"); + } + if !state.borrow().running { + break; + } + } + + result = listener.accept() => { + match result { + Ok((stream, _)) => { + let (reader, writer) = + tokio_util::compat::TokioAsyncReadCompatExt::compat(stream) + .split(); + let network = twoparty::VatNetwork::new( + futures::io::BufReader::new(reader), + futures::io::BufWriter::new(writer), + rpc_twoparty_capnp::Side::Server, + Default::default(), + ); + + let daemon_impl = rpc::DaemonImpl::new( + state.clone(), + daemon_config.clone(), + ); + let client: daemon_capnp::daemon::Client = + capnp_rpc::new_client(daemon_impl); + + let rpc_system = RpcSystem::new( + Box::new(network), + Some(client.client), + ); + tokio::task::spawn_local(rpc_system); + } + Err(e) => error!("accept: {e}"), + } + } + } + } + + state.borrow().save(); + let _ = std::fs::remove_file(sock_path()); + let _ = std::fs::remove_file(pid_path()); + info!("daemon stopped"); + + Ok(()) + }) + .await +} + +// -- Entry point ---------------------------------------------------------- + +/// Run the daemon or client command. +/// Called from the main consciousness binary. +pub async fn run(command: Option) -> Result<(), Box> { + match command { + Some(Command::Daemon) => server_main().await, + Some(cmd) => client_main(cmd).await, + None => { + // Show help + Cli::parse_from(["consciousness-daemon", "--help"]); + Ok(()) + } + } +} diff --git a/src/bin/parse-claude-conversation.rs b/src/claude/parse-claude-conversation.rs similarity index 100% rename from src/bin/parse-claude-conversation.rs rename to src/claude/parse-claude-conversation.rs diff --git a/src/bin/poc-daemon.rs b/src/claude/poc-daemon.rs similarity index 66% rename from src/bin/poc-daemon.rs rename to src/claude/poc-daemon.rs index 730dfd6..b1f6e4b 100644 --- a/src/bin/poc-daemon.rs +++ b/src/claude/poc-daemon.rs @@ -1,14 +1,14 @@ // poc-daemon — backward-compatible entry point // -// Delegates to the thalamus module in the main crate. +// Delegates to the claude module in the main crate. // The daemon is now part of the consciousness binary but this // entry point is kept for compatibility with existing scripts. use clap::Parser; -use poc_memory::thalamus; +use poc_memory::claude; #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Box> { - let cli = thalamus::Cli::parse(); - thalamus::run(cli.command).await + let cli = claude::Cli::parse(); + claude::run(cli.command).await } diff --git a/src/bin/poc-hook.rs b/src/claude/poc-hook.rs similarity index 100% rename from src/bin/poc-hook.rs rename to src/claude/poc-hook.rs diff --git a/src/thalamus/rpc.rs b/src/claude/rpc.rs similarity index 96% rename from src/thalamus/rpc.rs rename to src/claude/rpc.rs index 985bb10..ab876d1 100644 --- a/src/thalamus/rpc.rs +++ b/src/claude/rpc.rs @@ -5,9 +5,9 @@ // RefCells on the LocalSet — no Send/Sync needed. use super::config::Config; -use super::daemon_capnp::daemon; use super::idle; -use super::notify; +use crate::thalamus::{daemon_capnp, notify}; +use daemon_capnp::daemon; use capnp::capability::Promise; use std::cell::RefCell; use std::rc::Rc; @@ -105,7 +105,7 @@ impl daemon::Server for DaemonImpl { ) -> Promise<(), capnp::Error> { let mut s = self.state.borrow_mut(); s.dreaming = true; - s.dream_start = super::now(); + s.dream_start = crate::thalamus::now(); info!("dream started"); Promise::ok(()) } @@ -247,17 +247,17 @@ impl daemon::Server for DaemonImpl { status.set_dreaming(s.dreaming); status.set_fired(s.fired); status.set_user_present(s.user_present()); - status.set_uptime(super::now() - s.start_time); + status.set_uptime(crate::thalamus::now() - s.start_time); status.set_activity(match s.notifications.activity { - notify::Activity::Idle => super::daemon_capnp::Activity::Idle, - notify::Activity::Focused => super::daemon_capnp::Activity::Focused, - notify::Activity::Sleeping => super::daemon_capnp::Activity::Sleeping, + notify::Activity::Idle => daemon_capnp::Activity::Idle, + notify::Activity::Focused => daemon_capnp::Activity::Focused, + notify::Activity::Sleeping => daemon_capnp::Activity::Sleeping, }); status.set_pending_count(s.notifications.pending.len() as u32); status.set_idle_timeout(s.idle_timeout); status.set_notify_timeout(s.notify_timeout); status.set_since_activity(s.since_activity()); - status.set_since_user(super::now() - s.last_user_msg); + status.set_since_user(crate::thalamus::now() - s.last_user_msg); status.set_block_reason(s.block_reason()); status.set_activity_ewma(s.activity_ewma); diff --git a/src/bin/test-conversation.rs b/src/claude/test-conversation.rs similarity index 100% rename from src/bin/test-conversation.rs rename to src/claude/test-conversation.rs diff --git a/src/thalamus/tmux.rs b/src/claude/tmux.rs similarity index 100% rename from src/thalamus/tmux.rs rename to src/claude/tmux.rs diff --git a/src/cli/admin.rs b/src/cli/admin.rs index c139f5a..9d7009d 100644 --- a/src/cli/admin.rs +++ b/src/cli/admin.rs @@ -40,7 +40,7 @@ pub fn cmd_init() -> Result<(), String> { println!("Indexed {} memory units", count); // Install hooks - crate::subconscious::hook::install_hook()?; + crate::claude::hook::install_hook()?; // Create config if none exists let config_path = std::env::var("POC_MEMORY_CONFIG") diff --git a/src/lib.rs b/src/lib.rs index d041ff4..8d381c7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,9 +50,12 @@ pub mod cli; // TUI for memory-search // tui moved to src/user/tui/ (consciousness binary screens) -// Thalamus — notification routing and idle management daemon +// Thalamus — universal notification routing and channel infrastructure pub mod thalamus; +// Claude Code integration layer (idle timer, hooks, daemon CLI) +pub mod claude; + // Re-export at crate root — capnp codegen emits `crate::daemon_capnp::` paths pub use thalamus::daemon_capnp; @@ -79,5 +82,5 @@ pub use subconscious::{ llm, audit, consolidate, knowledge, digest, daemon, }; -// Backward compat: memory_search moved from hippocampus to subconscious::hook -pub use subconscious::hook as memory_search; +// Backward compat: memory_search moved from subconscious::hook to claude::hook +pub use claude::hook as memory_search; diff --git a/src/session.rs b/src/session.rs index cbae36c..c5351e7 100644 --- a/src/session.rs +++ b/src/session.rs @@ -69,7 +69,7 @@ impl HookSession { /// Get the seen set for this session pub fn seen(&self) -> HashSet { - super::subconscious::hook::load_seen(&self.state_dir, &self.session_id) + super::claude::hook::load_seen(&self.state_dir, &self.session_id) } /// Get transcript metadata, resolving the path if needed. diff --git a/src/subconscious/daemon.rs b/src/subconscious/daemon.rs index e6c1d37..a3268fd 100644 --- a/src/subconscious/daemon.rs +++ b/src/subconscious/daemon.rs @@ -1204,7 +1204,7 @@ WantedBy=default.target install_notify_daemon(&unit_dir, &home)?; // Install memory-search + poc-hook into Claude settings - crate::subconscious::hook::install_hook()?; + crate::claude::hook::install_hook()?; Ok(()) } diff --git a/src/subconscious/mod.rs b/src/subconscious/mod.rs index b1b3070..9913809 100644 --- a/src/subconscious/mod.rs +++ b/src/subconscious/mod.rs @@ -13,11 +13,11 @@ // enrich — journal enrichment, experience mining // digest — episodic digest generation (daily/weekly/monthly) // daemon — background job scheduler -// hook — session hook: context injection, agent orchestration // transcript — shared JSONL transcript parsing +// +// The session hook (context injection, agent orchestration) moved to claude/hook. pub mod subconscious; -pub mod hook; pub mod api; pub mod llm; pub mod prompts; diff --git a/src/thalamus/mod.rs b/src/thalamus/mod.rs index 046807a..e2ece15 100644 --- a/src/thalamus/mod.rs +++ b/src/thalamus/mod.rs @@ -1,36 +1,20 @@ -// thalamus/ — notification routing and idle management daemon +// thalamus/ — universal notification routing and channel infrastructure // -// Central hub for notification routing, idle management, and -// communication modules (IRC, Telegram) for Claude Code sessions. -// Listens on a Unix domain socket with a Cap'n Proto RPC interface. -// Same entry point serves as both daemon and CLI client. -// -// Moved from the standalone poc-daemon crate into the main -// consciousness crate. +// Contains the shared daemon protocol, notification system, channel +// client/supervisor, and utility helpers used by both Claude-specific +// code (in claude/) and the future substrate-independent consciousness +// binary. pub mod channels; -pub mod config; pub mod supervisor; -pub mod context; -pub mod idle; pub mod notify; -pub mod rpc; -pub mod tmux; pub mod daemon_capnp { include!(concat!(env!("OUT_DIR"), "/schema/daemon_capnp.rs")); } -use std::cell::RefCell; use std::path::PathBuf; -use std::rc::Rc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; - -use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; -use clap::{Parser, Subcommand}; -use futures::AsyncReadExt; -use tokio::net::UnixListener; -use tracing::{error, info}; +use std::time::{SystemTime, UNIX_EPOCH}; pub fn now() -> f64 { SystemTime::now() @@ -42,559 +26,3 @@ pub fn now() -> f64 { pub fn home() -> PathBuf { PathBuf::from(std::env::var("HOME").unwrap_or_else(|_| "/root".into())) } - -fn sock_path() -> PathBuf { - home().join(".consciousness/daemon.sock") -} - -fn pid_path() -> PathBuf { - home().join(".consciousness/daemon.pid") -} - -// -- CLI ------------------------------------------------------------------ - -#[derive(Parser)] -#[command(name = "consciousness daemon", about = "Notification routing and idle management daemon")] -pub struct Cli { - #[command(subcommand)] - pub command: Option, -} - -#[derive(Subcommand)] -pub enum Command { - /// Start the daemon (foreground) - Daemon, - /// Query daemon status - Status, - /// Signal user activity - User { - /// tmux pane identifier - pane: Option, - }, - /// Signal Claude response - Response { - /// tmux pane identifier - pane: Option, - }, - /// Sleep (suppress idle timer). 0 or omit = indefinite - Sleep { - /// Wake timestamp (epoch seconds), 0 = indefinite - until: Option, - }, - /// Cancel sleep - Wake, - /// Suppress prompts for N seconds (default 300) - Quiet { - /// Duration in seconds - seconds: Option, - }, - /// Mark user as AFK (immediately allow idle timer to fire) - Afk, - /// Set session active timeout in seconds (how long after last message user counts as "present") - SessionTimeout { - /// Timeout in seconds - seconds: f64, - }, - /// Set idle timeout in seconds (how long before autonomous prompt) - IdleTimeout { - /// Timeout in seconds - seconds: f64, - }, - /// Set notify timeout in seconds (how long before tmux notification injection) - NotifyTimeout { - /// Timeout in seconds - seconds: f64, - }, - /// Signal consolidation started - Consolidating, - /// Signal consolidation ended - Consolidated, - /// Signal dream started - DreamStart, - /// Signal dream ended - DreamEnd, - /// Force state persistence to disk - Save, - /// Get or set the activity EWMA (0.0-1.0). No value = query. - Ewma { - /// Value to set (omit to query) - value: Option, - }, - /// Send a test message to the Claude pane - TestSend { - /// Message to send - message: Vec, - }, - /// Fire a test nudge through the daemon (tests the actual idle send path) - TestNudge, - /// Dump full internal state as JSON - Debug, - /// Shut down daemon - Stop, - /// Submit a notification - Notify { - /// Notification type (e.g. "irc", "telegram") - #[arg(name = "type")] - ntype: String, - /// Urgency level (ambient/low/medium/high/critical or 0-4) - urgency: String, - /// Message text - message: Vec, - }, - /// Get pending notifications - Notifications { - /// Minimum urgency filter - min_urgency: Option, - }, - /// List all notification types - NotifyTypes, - /// Set notification threshold for a type - NotifyThreshold { - /// Notification type - #[arg(name = "type")] - ntype: String, - /// Urgency level threshold - level: String, - }, - /// IRC module commands - Irc { - /// Subcommand (join, leave, send, status, log, nick) - command: String, - /// Arguments - args: Vec, - }, - /// Telegram module commands - Telegram { - /// Subcommand - command: String, - /// Arguments - args: Vec, - }, -} - -// -- Client mode ---------------------------------------------------------- - -async fn client_main(cmd: Command) -> Result<(), Box> { - let sock = sock_path(); - if !sock.exists() { - eprintln!("daemon not running (no socket at {})", sock.display()); - std::process::exit(1); - } - - tokio::task::LocalSet::new() - .run_until(async move { - let stream = tokio::net::UnixStream::connect(&sock).await?; - let (reader, writer) = - tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split(); - let rpc_network = Box::new(twoparty::VatNetwork::new( - futures::io::BufReader::new(reader), - futures::io::BufWriter::new(writer), - rpc_twoparty_capnp::Side::Client, - Default::default(), - )); - let mut rpc_system = RpcSystem::new(rpc_network, None); - let daemon: daemon_capnp::daemon::Client = - rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); - - tokio::task::spawn_local(rpc_system); - - match cmd { - Command::Daemon => unreachable!("handled in main"), - Command::Status => { - let reply = daemon.status_request().send().promise.await?; - let s = reply.get()?.get_status()?; - - let fmt_secs = |s: f64| -> String { - if s < 60.0 { format!("{:.0}s", s) } - else if s < 3600.0 { format!("{:.0}m", s / 60.0) } - else { format!("{:.1}h", s / 3600.0) } - }; - - println!("uptime: {} pane: {} activity: {:?} pending: {}", - fmt_secs(s.get_uptime()), - s.get_claude_pane()?.to_str().unwrap_or("none"), - s.get_activity()?, - s.get_pending_count(), - ); - println!("idle timer: {}/{} ({})", - fmt_secs(s.get_since_activity()), - fmt_secs(s.get_idle_timeout()), - s.get_block_reason()?.to_str()?, - ); - println!("notify timer: {}/{}", - fmt_secs(s.get_since_activity()), - fmt_secs(s.get_notify_timeout()), - ); - println!("user: {} (last {}) activity: {:.1}%", - if s.get_user_present() { "present" } else { "away" }, - fmt_secs(s.get_since_user()), - s.get_activity_ewma() * 100.0, - ); - - let sleep = s.get_sleep_until(); - if sleep != 0.0 { - if sleep < 0.0 { - println!("sleep: indefinite"); - } else { - println!("sleep: until {sleep:.0}"); - } - } - if s.get_consolidating() { println!("consolidating"); } - if s.get_dreaming() { println!("dreaming"); } - } - Command::User { pane } => { - let pane = pane.as_deref().unwrap_or(""); - let mut req = daemon.user_request(); - req.get().set_pane(pane); - req.send().promise.await?; - } - Command::Response { pane } => { - let pane = pane.as_deref().unwrap_or(""); - let mut req = daemon.response_request(); - req.get().set_pane(pane); - req.send().promise.await?; - } - Command::Sleep { until } => { - let mut req = daemon.sleep_request(); - req.get().set_until(until.unwrap_or(0.0)); - req.send().promise.await?; - } - Command::Wake => { - daemon.wake_request().send().promise.await?; - } - Command::Quiet { seconds } => { - let mut req = daemon.quiet_request(); - req.get().set_seconds(seconds.unwrap_or(300)); - req.send().promise.await?; - } - Command::TestSend { message } => { - let msg = message.join(" "); - let pane = { - let reply = daemon.status_request().send().promise.await?; - let s = reply.get()?.get_status()?; - s.get_claude_pane()?.to_str()?.to_string() - }; - let ok = tmux::send_prompt(&pane, &msg); - println!("send_prompt(pane={}, ok={}): {}", pane, ok, msg); - return Ok(()); - } - Command::TestNudge => { - let reply = daemon.test_nudge_request().send().promise.await?; - let r = reply.get()?; - println!("sent={} message={}", r.get_sent(), r.get_message()?.to_str()?); - return Ok(()); - } - Command::Afk => { - daemon.afk_request().send().promise.await?; - println!("marked AFK"); - } - Command::SessionTimeout { seconds } => { - let mut req = daemon.session_timeout_request(); - req.get().set_seconds(seconds); - req.send().promise.await?; - println!("session timeout = {seconds}s"); - } - Command::IdleTimeout { seconds } => { - let mut req = daemon.idle_timeout_request(); - req.get().set_seconds(seconds); - req.send().promise.await?; - println!("idle timeout = {seconds}s"); - } - Command::NotifyTimeout { seconds } => { - let mut req = daemon.notify_timeout_request(); - req.get().set_seconds(seconds); - req.send().promise.await?; - println!("notify timeout = {seconds}s"); - } - Command::Consolidating => { - daemon.consolidating_request().send().promise.await?; - } - Command::Consolidated => { - daemon.consolidated_request().send().promise.await?; - } - Command::DreamStart => { - daemon.dream_start_request().send().promise.await?; - } - Command::DreamEnd => { - daemon.dream_end_request().send().promise.await?; - } - Command::Save => { - daemon.save_request().send().promise.await?; - println!("state saved"); - } - Command::Ewma { value } => { - let mut req = daemon.ewma_request(); - req.get().set_value(value.unwrap_or(-1.0)); - let reply = req.send().promise.await?; - let current = reply.get()?.get_current(); - println!("{:.1}%", current * 100.0); - } - Command::Debug => { - let reply = daemon.debug_request().send().promise.await?; - let json = reply.get()?.get_json()?.to_str()?; - if let Ok(v) = serde_json::from_str::(json) { - println!("{}", serde_json::to_string_pretty(&v).unwrap_or_else(|_| json.to_string())); - } else { - println!("{json}"); - } - } - Command::Stop => { - daemon.stop_request().send().promise.await?; - println!("stopping"); - } - Command::Notify { ntype, urgency, message } => { - let urgency = notify::parse_urgency(&urgency) - .ok_or_else(|| format!("invalid urgency: {urgency}"))?; - let message = message.join(" "); - if message.is_empty() { - return Err("missing message".into()); - } - - let mut req = daemon.notify_request(); - let mut n = req.get().init_notification(); - n.set_type(&ntype); - n.set_urgency(urgency); - n.set_message(&message); - n.set_timestamp(now()); - let reply = req.send().promise.await?; - if reply.get()?.get_interrupt() { - println!("interrupt"); - } else { - println!("queued"); - } - } - Command::Notifications { min_urgency } => { - let min: u8 = min_urgency - .as_deref() - .and_then(notify::parse_urgency) - .unwrap_or(255); - - let mut req = daemon.get_notifications_request(); - req.get().set_min_urgency(min); - let reply = req.send().promise.await?; - let list = reply.get()?.get_notifications()?; - - for n in list.iter() { - println!( - "[{}:{}] {}", - n.get_type()?.to_str()?, - notify::urgency_name(n.get_urgency()), - n.get_message()?.to_str()?, - ); - } - } - Command::NotifyTypes => { - let reply = daemon.get_types_request().send().promise.await?; - let list = reply.get()?.get_types()?; - - if list.is_empty() { - println!("no notification types registered"); - } else { - for t in list.iter() { - let threshold = if t.get_threshold() < 0 { - "inherit".to_string() - } else { - notify::urgency_name(t.get_threshold() as u8).to_string() - }; - println!( - "{}: count={} threshold={}", - t.get_name()?.to_str()?, - t.get_count(), - threshold, - ); - } - } - } - Command::NotifyThreshold { ntype, level } => { - let level = notify::parse_urgency(&level) - .ok_or_else(|| format!("invalid level: {level}"))?; - - let mut req = daemon.set_threshold_request(); - req.get().set_type(&ntype); - req.get().set_level(level); - req.send().promise.await?; - println!("{ntype} threshold={}", notify::urgency_name(level)); - } - Command::Irc { command, args } => { - module_command(&daemon, "irc", &command, &args).await?; - } - Command::Telegram { command, args } => { - module_command(&daemon, "telegram", &command, &args).await?; - } - } - - Ok(()) - }) - .await -} - -async fn module_command( - daemon: &daemon_capnp::daemon::Client, - module: &str, - command: &str, - args: &[String], -) -> Result<(), Box> { - let mut req = daemon.module_command_request(); - req.get().set_module(module); - req.get().set_command(command); - let mut args_builder = req.get().init_args(args.len() as u32); - for (i, a) in args.iter().enumerate() { - args_builder.set(i as u32, a); - } - let reply = req.send().promise.await?; - let result = reply.get()?.get_result()?.to_str()?; - if !result.is_empty() { - println!("{result}"); - } - Ok(()) -} - -// -- Server mode ---------------------------------------------------------- - -async fn server_main() -> Result<(), Box> { - let log_path = home().join(".consciousness/logs/daemon.log"); - let file_appender = tracing_appender::rolling::daily( - log_path.parent().unwrap(), - "daemon.log", - ); - tracing_subscriber::fmt() - .with_writer(file_appender) - .with_ansi(false) - .with_target(false) - .with_level(false) - .with_timer(tracing_subscriber::fmt::time::time()) - .init(); - - let sock = sock_path(); - let _ = std::fs::remove_file(&sock); - - let pid = std::process::id(); - std::fs::write(pid_path(), pid.to_string()).ok(); - - let daemon_config = Rc::new(RefCell::new(config::Config::load())); - - let state = Rc::new(RefCell::new(idle::State::new())); - state.borrow_mut().load(); - - info!("daemon started (pid={pid})"); - - tokio::task::LocalSet::new() - .run_until(async move { - // Start modules - let (_notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel::(); - - // External modules (IRC, Telegram) now run as separate daemons. - // They connect via the notification channel when implemented. - let _irc_state: Option<()> = None; - let _telegram_state: Option<()> = None; - - let listener = UnixListener::bind(&sock)?; - #[cfg(unix)] - { - use std::os::unix::fs::PermissionsExt; - std::fs::set_permissions( - &sock, - std::fs::Permissions::from_mode(0o600), - ) - .ok(); - } - - let shutdown = async { - let mut sigterm = - tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) - .expect("sigterm"); - let mut sigint = - tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()) - .expect("sigint"); - tokio::select! { - _ = sigterm.recv() => info!("SIGTERM"), - _ = sigint.recv() => info!("SIGINT"), - } - }; - tokio::pin!(shutdown); - - let mut tick_timer = tokio::time::interval(Duration::from_secs(30)); - tick_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - - loop { - tokio::select! { - _ = &mut shutdown => break, - - // Drain module notifications into state - Some(notif) = notify_rx.recv() => { - state.borrow_mut().maybe_prompt_notification( - ¬if.ntype, notif.urgency, ¬if.message, - ); - state.borrow_mut().notifications.submit( - notif.ntype, - notif.urgency, - notif.message, - ); - } - - _ = tick_timer.tick() => { - if let Err(e) = state.borrow_mut().tick().await { - error!("tick: {e}"); - } - if !state.borrow().running { - break; - } - } - - result = listener.accept() => { - match result { - Ok((stream, _)) => { - let (reader, writer) = - tokio_util::compat::TokioAsyncReadCompatExt::compat(stream) - .split(); - let network = twoparty::VatNetwork::new( - futures::io::BufReader::new(reader), - futures::io::BufWriter::new(writer), - rpc_twoparty_capnp::Side::Server, - Default::default(), - ); - - let daemon_impl = rpc::DaemonImpl::new( - state.clone(), - daemon_config.clone(), - ); - let client: daemon_capnp::daemon::Client = - capnp_rpc::new_client(daemon_impl); - - let rpc_system = RpcSystem::new( - Box::new(network), - Some(client.client), - ); - tokio::task::spawn_local(rpc_system); - } - Err(e) => error!("accept: {e}"), - } - } - } - } - - state.borrow().save(); - let _ = std::fs::remove_file(sock_path()); - let _ = std::fs::remove_file(pid_path()); - info!("daemon stopped"); - - Ok(()) - }) - .await -} - -// -- Entry point ---------------------------------------------------------- - -/// Run the thalamus daemon or client command. -/// Called from the main consciousness binary. -pub async fn run(command: Option) -> Result<(), Box> { - match command { - Some(Command::Daemon) => server_main().await, - Some(cmd) => client_main(cmd).await, - None => { - // Show help - Cli::parse_from(["consciousness-daemon", "--help"]); - Ok(()) - } - } -}