move Claude Code-specific code from thalamus/ to claude/

Separates the Claude-specific daemon (idle timer, tmux pane detection,
prompt injection, RPC server, session hooks) from the universal
infrastructure (channels, supervisor, notify, daemon protocol).

thalamus/ now contains only substrate-independent code: the channel
client/supervisor, notification system, daemon_capnp protocol, and
shared helpers (now(), home()).

claude/ contains: idle.rs, tmux.rs, context.rs, rpc.rs, config.rs,
hook.rs (moved from subconscious/), plus the daemon CLI and server
startup code from thalamus/mod.rs.

All re-exports preserved for backward compatibility.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-03 19:14:39 -04:00
parent 36afa90cdb
commit dd7f1e3f86
19 changed files with 627 additions and 612 deletions

97
src/claude/config.rs Normal file
View file

@ -0,0 +1,97 @@
// Daemon configuration.
//
// Lives at ~/.consciousness/daemon.toml. Loaded on startup, updated at
// runtime when modules change state (join channel, etc.).
use crate::thalamus::home;
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::PathBuf;
fn config_path() -> PathBuf {
home().join(".consciousness/daemon.toml")
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct Config {
#[serde(default)]
pub irc: IrcConfig,
#[serde(default)]
pub telegram: TelegramConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IrcConfig {
pub enabled: bool,
pub server: String,
pub port: u16,
pub tls: bool,
pub nick: String,
pub user: String,
pub realname: String,
pub channels: Vec<String>,
}
impl Default for IrcConfig {
fn default() -> Self {
Self {
enabled: true,
server: "irc.libera.chat".into(),
port: 6697,
tls: true,
nick: "agent".into(),
user: "agent".into(),
realname: "agent".into(),
channels: vec!["#bcachefs".into(), "#bcachefs-ai".into()],
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TelegramConfig {
pub enabled: bool,
pub token: String,
pub chat_id: i64,
}
impl Default for TelegramConfig {
fn default() -> Self {
// Load token and chat_id from legacy files if they exist
let token = std::fs::read_to_string(home().join(".consciousness/telegram/token"))
.map(|s| s.trim().to_string())
.unwrap_or_default();
let chat_id = std::fs::read_to_string(home().join(".consciousness/telegram/chat_id"))
.ok()
.and_then(|s| s.trim().parse().ok())
.unwrap_or(0);
Self {
enabled: !token.is_empty() && chat_id != 0,
token,
chat_id,
}
}
}
impl Config {
pub fn load() -> Self {
let path = config_path();
match fs::read_to_string(&path) {
Ok(data) => toml::from_str(&data).unwrap_or_else(|e| {
tracing::warn!("bad config {}: {e}, using defaults", path.display());
Self::default()
}),
Err(_) => {
let config = Self::default();
config.save();
config
}
}
}
pub fn save(&self) {
let path = config_path();
if let Ok(data) = toml::to_string_pretty(self) {
let _ = fs::write(path, data);
}
}
}

19
src/claude/context.rs Normal file
View file

@ -0,0 +1,19 @@
// Context gathering for idle prompts.
//
// Notifications are handled by the notify module and passed
// in separately by the caller. Git context and IRC digest
// are now available through where-am-i.md and the memory graph.
/// Build context string for a prompt.
/// notification_text is passed in from the notify module.
pub fn build(_include_irc: bool, notification_text: &str) -> String {
// Keep nudges short — Claude checks notifications via
// `poc-daemon status` on its own. Just mention the count.
let count = notification_text.matches("[irc.").count()
+ notification_text.matches("[telegram.").count();
if count > 0 {
format!("{count} pending notifications")
} else {
String::new()
}
}

312
src/claude/hook.rs Normal file
View file

@ -0,0 +1,312 @@
// hook.rs — Claude Code session hook: context injection + agent orchestration
//
// Called on each UserPromptSubmit via the poc-hook binary. Handles
// context loading, chunking, seen-set management, and delegates
// agent orchestration to AgentCycleState.
use std::collections::HashSet;
use std::fs;
use std::io::Write;
use std::path::Path;
use std::process::Command;
use std::time::Instant;
pub use crate::session::HookSession;
pub use crate::subconscious::subconscious::*;
const CHUNK_SIZE: usize = 9000;
/// Run the hook logic on parsed JSON input. Returns output to inject.
pub fn run_hook(input: &str) -> String {
let Some(session) = HookSession::from_json(input) else { return String::new() };
hook(&session)
}
fn chunk_context(ctx: &str, max_bytes: usize) -> Vec<String> {
let mut sections: Vec<String> = Vec::new();
let mut current = String::new();
for line in ctx.lines() {
if line.starts_with("--- ") && line.ends_with(" ---") && !current.is_empty() {
sections.push(std::mem::take(&mut current));
}
if !current.is_empty() {
current.push('\n');
}
current.push_str(line);
}
if !current.is_empty() {
sections.push(current);
}
let mut chunks: Vec<String> = Vec::new();
let mut chunk = String::new();
for section in sections {
if !chunk.is_empty() && chunk.len() + section.len() + 1 > max_bytes {
chunks.push(std::mem::take(&mut chunk));
}
if !chunk.is_empty() {
chunk.push('\n');
}
chunk.push_str(&section);
}
if !chunk.is_empty() {
chunks.push(chunk);
}
chunks
}
fn save_pending_chunks(dir: &Path, session_id: &str, chunks: &[String]) {
let chunks_dir = dir.join(format!("chunks-{}", session_id));
let _ = fs::remove_dir_all(&chunks_dir);
if chunks.is_empty() { return; }
fs::create_dir_all(&chunks_dir).ok();
for (i, chunk) in chunks.iter().enumerate() {
let path = chunks_dir.join(format!("{:04}", i));
fs::write(path, chunk).ok();
}
}
fn pop_pending_chunk(dir: &Path, session_id: &str) -> Option<String> {
let chunks_dir = dir.join(format!("chunks-{}", session_id));
if !chunks_dir.exists() { return None; }
let mut entries: Vec<_> = fs::read_dir(&chunks_dir).ok()?
.flatten()
.filter(|e| e.file_type().map(|t| t.is_file()).unwrap_or(false))
.collect();
entries.sort_by_key(|e| e.file_name());
let first = entries.first()?;
let content = fs::read_to_string(first.path()).ok()?;
fs::remove_file(first.path()).ok();
if fs::read_dir(&chunks_dir).ok().map(|mut d| d.next().is_none()).unwrap_or(true) {
fs::remove_dir(&chunks_dir).ok();
}
Some(content)
}
fn generate_cookie() -> String {
uuid::Uuid::new_v4().as_simple().to_string()[..12].to_string()
}
fn parse_seen_line(line: &str) -> &str {
line.split_once('\t').map(|(_, key)| key).unwrap_or(line)
}
pub fn load_seen(dir: &Path, session_id: &str) -> HashSet<String> {
let path = dir.join(format!("seen-{}", session_id));
if path.exists() {
fs::read_to_string(&path)
.unwrap_or_default()
.lines()
.filter(|s| !s.is_empty())
.map(|s| parse_seen_line(s).to_string())
.collect()
} else {
HashSet::new()
}
}
fn mark_seen(dir: &Path, session_id: &str, key: &str, seen: &mut HashSet<String>) {
if !seen.insert(key.to_string()) { return; }
let path = dir.join(format!("seen-{}", session_id));
if let Ok(mut f) = fs::OpenOptions::new().create(true).append(true).open(path) {
let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S");
writeln!(f, "{}\t{}", ts, key).ok();
}
}
/// Standalone entry point for the Claude Code hook path.
/// Loads saved state, runs cycles, saves state back.
pub fn run_agent_cycles(session: &HookSession) -> AgentCycleOutput {
let mut state = AgentCycleState::new(&session.session_id);
state.restore(&SavedAgentState::load(&session.session_id));
state.trigger(session);
state.save(&session.session_id);
state.last_output
}
fn hook(session: &HookSession) -> String {
let start_time = Instant::now();
let mut out = String::new();
let is_compaction = crate::transcript::detect_new_compaction(
&session.state_dir, &session.session_id, &session.transcript_path,
);
let cookie_path = session.path("cookie");
let is_first = !cookie_path.exists();
let log_dir = dirs::home_dir().unwrap_or_default().join(".consciousness/logs");
fs::create_dir_all(&log_dir).ok();
let log_path = log_dir.join(format!("hook-{}", session.session_id));
let Ok(mut log_f) = fs::OpenOptions::new().create(true).append(true).open(log_path) else { return Default::default(); };
let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S");
let _ = writeln!(log_f, "\n=== {} ({}) {} bytes ===", ts, session.hook_event, out.len());
let _ = writeln!(log_f, "is_first {is_first} is_compaction {is_compaction}");
if is_first || is_compaction {
if is_compaction {
fs::rename(&session.path("seen"), &session.path("seen-prev")).ok();
} else {
fs::remove_file(&session.path("seen")).ok();
fs::remove_file(&session.path("seen-prev")).ok();
}
fs::remove_file(&session.path("returned")).ok();
if is_first {
fs::write(&cookie_path, generate_cookie()).ok();
}
if let Ok(output) = Command::new("poc-memory").args(["admin", "load-context"]).output() {
if output.status.success() {
let ctx = String::from_utf8_lossy(&output.stdout).to_string();
if !ctx.trim().is_empty() {
let mut ctx_seen = session.seen();
for line in ctx.lines() {
if line.starts_with("--- ") && line.ends_with(" ---") {
let inner = &line[4..line.len() - 4];
if let Some(paren) = inner.rfind(" (") {
let key = inner[..paren].trim();
mark_seen(&session.state_dir, &session.session_id, key, &mut ctx_seen);
}
}
}
let chunks = chunk_context(&ctx, CHUNK_SIZE);
if let Some(first) = chunks.first() {
out.push_str(first);
}
save_pending_chunks(&session.state_dir, &session.session_id, &chunks[1..]);
}
}
}
}
if let Some(chunk) = pop_pending_chunk(&session.state_dir, &session.session_id) {
out.push_str(&chunk);
} else {
let cfg = crate::config::get();
if cfg.surface_hooks.iter().any(|h| h == &session.hook_event) {
let cycle_output = run_agent_cycles(&session);
out.push_str(&format_agent_output(&cycle_output));
}
}
let _ = write!(log_f, "{}", out);
let duration = (Instant::now() - start_time).as_secs_f64();
let _ = writeln!(log_f, "\nran in {duration:.2}s");
out
}
/// Install memory-search and poc-hook into Claude Code settings.json.
///
/// Hook layout:
/// UserPromptSubmit: memory-search (10s), poc-hook (5s)
/// PostToolUse: poc-hook (5s)
/// Stop: poc-hook (5s)
pub fn install_hook() -> Result<(), String> {
use std::path::PathBuf;
let home = std::env::var("HOME").map_err(|e| format!("HOME: {}", e))?;
let exe = std::env::current_exe()
.map_err(|e| format!("current_exe: {}", e))?;
let settings_path = PathBuf::from(&home).join(".claude/settings.json");
let memory_search = exe.with_file_name("memory-search");
let poc_hook = exe.with_file_name("poc-hook");
let mut settings: serde_json::Value = if settings_path.exists() {
let content = fs::read_to_string(&settings_path)
.map_err(|e| format!("read settings: {}", e))?;
serde_json::from_str(&content)
.map_err(|e| format!("parse settings: {}", e))?
} else {
serde_json::json!({})
};
let obj = settings.as_object_mut().ok_or("settings not an object")?;
let hooks_obj = obj.entry("hooks")
.or_insert_with(|| serde_json::json!({}))
.as_object_mut().ok_or("hooks not an object")?;
let mut changed = false;
// Helper: ensure a hook binary is present in an event's hook list
let ensure_hook = |hooks_obj: &mut serde_json::Map<String, serde_json::Value>,
event: &str,
binary: &Path,
timeout: u32,
changed: &mut bool| {
if !binary.exists() {
eprintln!("Warning: {} not found — skipping", binary.display());
return;
}
let cmd = binary.to_string_lossy().to_string();
let name = binary.file_name().unwrap().to_string_lossy().to_string();
let event_array = hooks_obj.entry(event)
.or_insert_with(|| serde_json::json!([{"hooks": []}]))
.as_array_mut().unwrap();
if event_array.is_empty() {
event_array.push(serde_json::json!({"hooks": []}));
}
let inner = event_array[0]
.as_object_mut().unwrap()
.entry("hooks")
.or_insert_with(|| serde_json::json!([]))
.as_array_mut().unwrap();
// Remove legacy load-memory.sh
let before = inner.len();
inner.retain(|h| {
let c = h.get("command").and_then(|c| c.as_str()).unwrap_or("");
!c.contains("load-memory")
});
if inner.len() < before {
eprintln!("Removed load-memory.sh from {event}");
*changed = true;
}
let already = inner.iter().any(|h| {
h.get("command").and_then(|c| c.as_str())
.is_some_and(|c| c.contains(&name))
});
if !already {
inner.push(serde_json::json!({
"type": "command",
"command": cmd,
"timeout": timeout
}));
*changed = true;
eprintln!("Installed {name} in {event}");
}
};
// UserPromptSubmit: memory-search + poc-hook
ensure_hook(hooks_obj, "UserPromptSubmit", &memory_search, 10, &mut changed);
ensure_hook(hooks_obj, "UserPromptSubmit", &poc_hook, 5, &mut changed);
// PostToolUse + Stop: poc-hook only
ensure_hook(hooks_obj, "PostToolUse", &poc_hook, 5, &mut changed);
ensure_hook(hooks_obj, "Stop", &poc_hook, 5, &mut changed);
if changed {
let json = serde_json::to_string_pretty(&settings)
.map_err(|e| format!("serialize settings: {}", e))?;
fs::write(&settings_path, json)
.map_err(|e| format!("write settings: {}", e))?;
eprintln!("Updated {}", settings_path.display());
} else {
eprintln!("All hooks already installed in {}", settings_path.display());
}
Ok(())
}

642
src/claude/idle.rs Normal file
View file

@ -0,0 +1,642 @@
// Idle timer module.
//
// Tracks user presence and Claude response times. When Claude has been
// idle too long, sends a contextual prompt to the tmux pane. Handles
// sleep mode, quiet mode, consolidation suppression, and dream nudges.
//
// Designed as the first "module" — future IRC/Telegram modules will
// follow the same pattern: state + tick + handle_command.
use super::{context, tmux};
use crate::thalamus::{home, now, notify};
use serde::{Deserialize, Serialize};
use std::fs;
use tracing::info;
// Defaults
const DEFAULT_IDLE_TIMEOUT: f64 = 5.0 * 60.0;
const DEFAULT_NOTIFY_TIMEOUT: f64 = 2.0 * 60.0;
const DEFAULT_SESSION_ACTIVE_SECS: f64 = 15.0 * 60.0;
const DREAM_INTERVAL_HOURS: u64 = 18;
/// EWMA decay half-life in seconds (5 minutes).
const EWMA_DECAY_HALF_LIFE: f64 = 5.0 * 60.0;
/// Minimum seconds between autonomous nudges.
const MIN_NUDGE_INTERVAL: f64 = 15.0;
/// Boost half-life in seconds (60s). A 60s turn covers half the gap to
/// target; a 15s turn covers ~16%; a 2s turn covers ~2%.
const EWMA_BOOST_HALF_LIFE: f64 = 60.0;
/// Steady-state target for active work. The EWMA converges toward this
/// during sustained activity rather than toward 1.0.
const EWMA_TARGET: f64 = 0.75;
/// Persisted subset of daemon state — survives daemon restarts.
/// Includes both epoch floats (for computation) and ISO timestamps
/// (for human debugging via `cat daemon-state.json | jq`).
#[derive(Serialize, Deserialize, Default)]
struct Persisted {
last_user_msg: f64,
last_response: f64,
#[serde(default)]
sleep_until: Option<f64>,
#[serde(default)]
claude_pane: Option<String>,
#[serde(default)]
idle_timeout: f64,
#[serde(default)]
notify_timeout: f64,
#[serde(default)]
activity_ewma: f64,
#[serde(default)]
ewma_updated_at: f64,
#[serde(default)]
session_active_secs: f64,
#[serde(default)]
in_turn: bool,
#[serde(default)]
turn_start: f64,
#[serde(default)]
last_nudge: f64,
// Human-readable mirrors — written but not consumed on load
#[serde(default, skip_deserializing)]
last_user_msg_time: String,
#[serde(default, skip_deserializing)]
last_response_time: String,
#[serde(default, skip_deserializing)]
saved_at: String,
#[serde(default, skip_deserializing)]
fired: bool,
#[serde(default, skip_deserializing)]
uptime: f64,
}
fn state_path() -> std::path::PathBuf {
home().join(".consciousness/daemon-state.json")
}
/// Compute EWMA decay factor: 0.5^(elapsed / half_life).
fn ewma_factor(elapsed: f64, half_life: f64) -> f64 {
(0.5_f64).powf(elapsed / half_life)
}
/// Format epoch seconds as a human-readable ISO-ish timestamp.
fn epoch_to_iso(epoch: f64) -> String {
if epoch == 0.0 {
return String::new();
}
let secs = epoch as u64;
// Use date command — simple and correct for timezone
std::process::Command::new("date")
.args(["-d", &format!("@{secs}"), "+%Y-%m-%dT%H:%M:%S%z"])
.output()
.ok()
.and_then(|o| String::from_utf8(o.stdout).ok())
.map(|s| s.trim().to_string())
.unwrap_or_default()
}
#[derive(Serialize)]
pub struct State {
pub last_user_msg: f64,
pub last_response: f64,
pub claude_pane: Option<String>,
pub sleep_until: Option<f64>, // None=awake, 0=indefinite, >0=timestamp
pub quiet_until: f64,
pub consolidating: bool,
pub dreaming: bool,
pub dream_start: f64,
pub fired: bool,
pub idle_timeout: f64,
pub notify_timeout: f64,
pub activity_ewma: f64,
pub ewma_updated_at: f64,
pub session_active_secs: f64,
pub in_turn: bool,
pub turn_start: f64,
pub last_nudge: f64,
#[serde(skip)]
pub running: bool,
#[serde(skip)]
pub start_time: f64,
#[serde(skip)]
pub notifications: notify::NotifyState,
}
impl State {
pub fn new() -> Self {
Self {
last_user_msg: 0.0,
last_response: 0.0,
claude_pane: None,
sleep_until: None,
quiet_until: 0.0,
consolidating: false,
dreaming: false,
dream_start: 0.0,
fired: false,
idle_timeout: DEFAULT_IDLE_TIMEOUT,
notify_timeout: DEFAULT_NOTIFY_TIMEOUT,
session_active_secs: DEFAULT_SESSION_ACTIVE_SECS,
activity_ewma: 0.0,
ewma_updated_at: now(),
in_turn: false,
turn_start: 0.0,
last_nudge: 0.0,
running: true,
start_time: now(),
notifications: notify::NotifyState::new(),
}
}
pub fn load(&mut self) {
if let Ok(data) = fs::read_to_string(state_path()) {
if let Ok(p) = serde_json::from_str::<Persisted>(&data) {
self.sleep_until = p.sleep_until;
self.claude_pane = p.claude_pane;
if p.idle_timeout > 0.0 {
self.idle_timeout = p.idle_timeout;
}
if p.notify_timeout > 0.0 {
self.notify_timeout = p.notify_timeout;
}
if p.session_active_secs > 0.0 {
self.session_active_secs = p.session_active_secs;
}
// Reset activity timestamps to now — timers count from
// restart, not from stale pre-restart state
let t = now();
self.last_user_msg = t;
self.last_response = t;
// Restore EWMA state, applying decay for time spent shut down
if p.ewma_updated_at > 0.0 {
let elapsed = t - p.ewma_updated_at;
self.activity_ewma = p.activity_ewma * ewma_factor(elapsed, EWMA_DECAY_HALF_LIFE);
self.in_turn = p.in_turn;
self.turn_start = p.turn_start;
self.last_nudge = p.last_nudge;
}
self.ewma_updated_at = t;
}
}
// Always try to find the active pane
if self.claude_pane.is_none() {
self.claude_pane = tmux::find_claude_pane();
}
info!(
"loaded: user={:.0} resp={:.0} pane={:?} sleep={:?}",
self.last_user_msg, self.last_response, self.claude_pane, self.sleep_until,
);
}
pub fn save(&self) {
let p = Persisted {
last_user_msg: self.last_user_msg,
last_response: self.last_response,
sleep_until: self.sleep_until,
claude_pane: self.claude_pane.clone(),
last_user_msg_time: epoch_to_iso(self.last_user_msg),
last_response_time: epoch_to_iso(self.last_response),
saved_at: epoch_to_iso(now()),
fired: self.fired,
idle_timeout: self.idle_timeout,
notify_timeout: self.notify_timeout,
session_active_secs: self.session_active_secs,
activity_ewma: self.activity_ewma,
ewma_updated_at: self.ewma_updated_at,
in_turn: self.in_turn,
turn_start: self.turn_start,
last_nudge: self.last_nudge,
uptime: now() - self.start_time,
};
if let Ok(json) = serde_json::to_string_pretty(&p) {
let _ = fs::write(state_path(), json);
}
}
/// Decay the activity EWMA toward zero based on elapsed time.
fn decay_ewma(&mut self) {
let t = now();
let elapsed = t - self.ewma_updated_at;
if elapsed <= 0.0 {
return;
}
self.activity_ewma *= ewma_factor(elapsed, EWMA_DECAY_HALF_LIFE);
self.ewma_updated_at = t;
}
/// Boost the EWMA based on turn duration. The boost is proportional to
/// distance from EWMA_TARGET, scaled by a saturation curve on duration.
/// A 15s turn covers half the gap to target; a 2s turn barely registers.
/// Self-limiting: converges toward target, can't overshoot.
fn boost_ewma(&mut self, turn_duration: f64) {
let gap = (EWMA_TARGET - self.activity_ewma).max(0.0);
let saturation = 1.0 - ewma_factor(turn_duration, EWMA_BOOST_HALF_LIFE);
self.activity_ewma += gap * saturation;
}
// Typed handlers for RPC
pub fn handle_user(&mut self, pane: &str) {
self.decay_ewma();
self.in_turn = true;
self.turn_start = now();
let from_user = !self.fired;
if from_user {
self.last_user_msg = now();
self.notifications.set_activity(notify::Activity::Focused);
}
self.fired = false;
if !pane.is_empty() {
self.claude_pane = Some(pane.to_string());
}
self.save();
info!("user (pane={}, from_user={from_user}) ewma={:.3}",
if pane.is_empty() { "unchanged" } else { pane },
self.activity_ewma);
}
pub fn handle_response(&mut self, pane: &str) {
let turn_duration = now() - self.turn_start;
self.decay_ewma();
self.boost_ewma(turn_duration);
self.in_turn = false;
self.last_response = now();
self.fired = false;
if !pane.is_empty() {
self.claude_pane = Some(pane.to_string());
}
self.save();
info!("response (turn={:.1}s) ewma={:.3}", turn_duration, self.activity_ewma);
}
/// Check if a notification should trigger a tmux prompt.
/// Called when a notification arrives via module channel.
/// Only injects into tmux when idle — if there's an active session
/// (recent user or response), the hook delivers via additionalContext.
pub fn maybe_prompt_notification(&mut self, ntype: &str, urgency: u8, _message: &str) {
if self.user_present() {
return; // hook will deliver it on next prompt
}
// If we've responded recently, the session is active —
// notifications will arrive via hook, no need to wake us
let since_response = now() - self.last_response;
if since_response < self.notify_timeout {
return;
}
// Don't send notifications via tmux directly — they flow
// through the idle nudge. Urgent notifications reset the
// idle timer so the nudge fires sooner.
let effective = self.notifications.threshold_for(ntype);
if urgency >= effective && self.fired {
// Bump: allow the nudge to re-fire for urgent notifications
self.fired = false;
}
}
pub fn handle_afk(&mut self) {
// Push last_user_msg far enough back that user_present() returns false
self.last_user_msg = now() - self.session_active_secs - 1.0;
self.fired = false; // allow idle timer to fire again
info!("User marked AFK");
self.save();
}
pub fn handle_session_timeout(&mut self, secs: f64) {
self.session_active_secs = secs;
info!("session active timeout = {secs}s");
self.save();
}
pub fn handle_idle_timeout(&mut self, secs: f64) {
self.idle_timeout = secs;
self.save();
info!("idle timeout = {secs}s");
}
pub fn handle_ewma(&mut self, value: f64) -> f64 {
if value >= 0.0 {
self.activity_ewma = value.min(1.0);
self.ewma_updated_at = now();
self.save();
info!("ewma set to {:.3}", self.activity_ewma);
}
self.activity_ewma
}
pub fn handle_notify_timeout(&mut self, secs: f64) {
self.notify_timeout = secs;
self.save();
info!("notify timeout = {secs}s");
}
pub fn handle_sleep(&mut self, until: f64) {
if until == 0.0 {
self.sleep_until = Some(0.0);
info!("sleep indefinitely");
} else {
self.sleep_until = Some(until);
info!("sleep until {until}");
}
self.notifications.set_activity(notify::Activity::Sleeping);
self.save();
}
pub fn handle_wake(&mut self) {
self.sleep_until = None;
self.fired = false;
self.save();
info!("wake");
}
pub fn handle_quiet(&mut self, seconds: u32) {
self.quiet_until = now() + seconds as f64;
info!("quiet {seconds}s");
}
pub fn user_present(&self) -> bool {
(now() - self.last_user_msg) < self.session_active_secs
}
/// Seconds since the most recent of user message or response.
pub fn since_activity(&self) -> f64 {
let reference = self.last_response.max(self.last_user_msg);
if reference > 0.0 { now() - reference } else { 0.0 }
}
/// Why the idle timer hasn't fired (or "none" if it would fire now).
pub fn block_reason(&self) -> &'static str {
let t = now();
if self.fired {
"already fired"
} else if self.sleep_until.is_some() {
"sleeping"
} else if t < self.quiet_until {
"quiet mode"
} else if self.consolidating {
"consolidating"
} else if self.dreaming {
"dreaming"
} else if self.user_present() {
"user present"
} else if self.in_turn {
"in turn"
} else if self.last_response.max(self.last_user_msg) == 0.0 {
"no activity yet"
} else if self.since_activity() < self.idle_timeout {
"not idle long enough"
} else {
"none — would fire"
}
}
/// Full debug dump as JSON with computed values.
pub fn debug_json(&self) -> String {
let t = now();
let since_user = t - self.last_user_msg;
let since_response = t - self.last_response;
serde_json::json!({
"now": t,
"uptime": t - self.start_time,
"idle_timeout": self.idle_timeout,
"notify_timeout": self.notify_timeout,
"last_user_msg": self.last_user_msg,
"last_user_msg_ago": since_user,
"last_user_msg_time": epoch_to_iso(self.last_user_msg),
"last_response": self.last_response,
"last_response_ago": since_response,
"last_response_time": epoch_to_iso(self.last_response),
"since_activity": self.since_activity(),
"activity_ewma": self.activity_ewma,
"in_turn": self.in_turn,
"turn_start": self.turn_start,
"user_present": self.user_present(),
"claude_pane": self.claude_pane,
"fired": self.fired,
"block_reason": self.block_reason(),
"sleep_until": self.sleep_until,
"quiet_until": self.quiet_until,
"consolidating": self.consolidating,
"dreaming": self.dreaming,
"dream_start": self.dream_start,
"activity": format!("{:?}", self.notifications.activity),
"pending_notifications": self.notifications.pending.len(),
"notification_types": self.notifications.types.len(),
}).to_string()
}
pub fn send(&self, msg: &str) -> bool {
let pane = match &self.claude_pane {
Some(p) => p.clone(),
None => match tmux::find_claude_pane() {
Some(p) => p,
None => {
info!("send: no claude pane found");
return false;
}
},
};
let ok = tmux::send_prompt(&pane, msg);
let preview: String = msg.chars().take(80).collect();
info!("send(pane={pane}, ok={ok}): {preview}");
ok
}
fn check_dream_nudge(&self) -> bool {
if !self.dreaming || self.dream_start == 0.0 {
return false;
}
let minutes = (now() - self.dream_start) / 60.0;
if minutes >= 60.0 {
self.send(
"You've been dreaming for over an hour. Time to surface \
run dream-end.sh and capture what you found.",
);
} else if minutes >= 45.0 {
self.send(&format!(
"Dreaming for {:.0} minutes now. Start gathering your threads \
you'll want to surface soon.",
minutes
));
} else if minutes >= 30.0 {
self.send(&format!(
"You've been dreaming for {:.0} minutes. \
No rush just a gentle note from the clock.",
minutes
));
} else {
return false;
}
true
}
pub fn build_context(&mut self, include_irc: bool) -> String {
// Ingest any legacy notification files
self.notifications.ingest_legacy_files();
let notif_text = self.notifications.format_pending(notify::AMBIENT);
context::build(include_irc, &notif_text)
}
pub async fn tick(&mut self) -> Result<(), String> {
let t = now();
let h = home();
// Decay EWMA on every tick
self.decay_ewma();
// Ingest legacy notification files every tick
self.notifications.ingest_legacy_files();
// Sleep mode
if let Some(wake_at) = self.sleep_until {
if wake_at == 0.0 {
return Ok(()); // indefinite
}
if t < wake_at {
return Ok(());
}
// Wake up
info!("sleep expired, waking");
self.sleep_until = None;
self.fired = false;
self.save();
let ctx = self.build_context(true);
let extra = if ctx.is_empty() {
String::new()
} else {
format!("\n{ctx}")
};
self.send(&format!(
"Wake up. Read your journal (poc-memory journal-tail 10), \
check work-queue.md, and follow what calls to you.{extra}"
));
return Ok(());
}
// Quiet mode
if t < self.quiet_until {
return Ok(());
}
// Consolidation
if self.consolidating {
return Ok(());
}
// Dream loop (externally managed)
if h.join(".consciousness/agents/dream-loop-active").exists() {
return Ok(());
}
// Dream nudges
if self.dreaming {
self.check_dream_nudge();
return Ok(());
}
// Don't nudge while User is here — conversation drives activity
if self.user_present() {
return Ok(());
}
// Don't nudge while in a turn
if self.in_turn {
return Ok(());
}
// Minimum interval between nudges
let since_nudge = t - self.last_nudge;
if since_nudge < MIN_NUDGE_INTERVAL {
return Ok(());
}
// Initial idle timeout — don't start nudging until first idle period
let reference = self.last_response.max(self.last_user_msg);
if reference == 0.0 {
return Ok(());
}
let elapsed = t - reference;
if elapsed < self.idle_timeout {
return Ok(());
}
// Transition to idle — lower notification thresholds
if self.notifications.activity != notify::Activity::Idle {
self.notifications.set_activity(notify::Activity::Idle);
}
// Fire
let elapsed_min = (elapsed / 60.0) as u64;
let ctx = self.build_context(true);
let extra = if ctx.is_empty() {
String::new()
} else {
format!("\n{ctx}")
};
let dream_hours = hours_since_last_dream();
let mut msg = format!(
"This is your autonomous time (User AFK {elapsed_min}m). \
Keep doing what you're doing, or find something new to do");
if dream_hours >= DREAM_INTERVAL_HOURS {
msg.push_str(&format!(
" You haven't dreamed in {dream_hours} hours — \
consider running ~/.consciousness/tools/dream-start.sh \
and spending some time in dreaming mode. \
Or do whatever calls to you."));
}
let msg = format!("{msg}{extra}");
if self.send(&msg) {
self.last_nudge = t;
self.fired = true;
}
Ok(())
}
}
fn hours_since_last_dream() -> u64 {
let path = home().join(".consciousness/logs/dream-log.jsonl");
let content = match fs::read_to_string(path) {
Ok(c) if !c.is_empty() => c,
_ => return 999,
};
let last_line = match content.lines().last() {
Some(l) => l,
None => return 999,
};
let parsed: serde_json::Value = match serde_json::from_str(last_line) {
Ok(v) => v,
Err(_) => return 999,
};
let end_str = match parsed.get("end").and_then(|v| v.as_str()) {
Some(s) => s,
None => return 999,
};
// Parse ISO 8601 timestamp manually (avoid chrono dependency)
// Format: "2025-03-04T10:30:00Z" or "2025-03-04T10:30:00+00:00"
let end_str = end_str.replace('Z', "+00:00");
// Use the system date command as a simple parser
let out = std::process::Command::new("date")
.args(["-d", &end_str, "+%s"])
.output()
.ok()
.and_then(|o| String::from_utf8(o.stdout).ok())
.and_then(|s| s.trim().parse::<f64>().ok());
match out {
Some(end_epoch) => ((now() - end_epoch) / 3600.0) as u64,
None => 999,
}
}

220
src/claude/memory-search.rs Normal file
View file

@ -0,0 +1,220 @@
// memory-search CLI — thin wrapper around poc_memory::memory_search
//
// --hook: run hook logic (for debugging; poc-hook calls the library directly)
// surface/reflect: run agent, parse output, render memories to stdout
// no args: show seen set for current session
use clap::{Parser, Subcommand};
use std::fs;
use std::io::{self, Read};
use std::process::Command;
fn stash_path() -> std::path::PathBuf {
poc_memory::store::memory_dir().join("sessions/last-input.json")
}
#[derive(Parser)]
#[command(name = "memory-search")]
struct Args {
/// Run hook logic (reads JSON from stdin or stash file)
#[arg(long)]
hook: bool,
/// Session ID (overrides stash file; for multiple concurrent sessions)
#[arg(long)]
session: Option<String>,
#[command(subcommand)]
command: Option<Cmd>,
}
#[derive(Subcommand)]
enum Cmd {
/// Run surface agent, parse output, render memories
Surface,
/// Run reflect agent, dump output
Reflect,
}
fn resolve_session(session_arg: &Option<String>) -> Option<poc_memory::memory_search::HookSession> {
use poc_memory::memory_search::HookSession;
if let Some(id) = session_arg {
return HookSession::from_id(id.clone());
}
let input = fs::read_to_string(stash_path()).ok()?;
HookSession::from_json(&input)
}
fn show_seen(session_arg: &Option<String>) {
let Some(session) = resolve_session(session_arg) else {
eprintln!("No session state available (use --session ID)");
return;
};
println!("Session: {}", session.session_id);
if let Ok(cookie) = fs::read_to_string(&session.path("cookie")) {
println!("Cookie: {}", cookie.trim());
}
match fs::read_to_string(&session.path("compaction")) {
Ok(s) => {
let offset: u64 = s.trim().parse().unwrap_or(0);
let ts = poc_memory::transcript::compaction_timestamp(&session.transcript_path, offset);
match ts {
Some(t) => println!("Last compaction: offset {} ({})", offset, t),
None => println!("Last compaction: offset {}", offset),
}
}
Err(_) => println!("Last compaction: none detected"),
}
let pending = fs::read_dir(&session.path("chunks")).ok()
.map(|d| d.flatten().count()).unwrap_or(0);
if pending > 0 {
println!("Pending chunks: {}", pending);
}
for (label, suffix) in [("Current seen set", ""), ("Previous seen set (pre-compaction)", "-prev")] {
let path = session.state_dir.join(format!("seen{}-{}", suffix, session.session_id));
let content = fs::read_to_string(&path).unwrap_or_default();
let lines: Vec<&str> = content.lines().filter(|s| !s.is_empty()).collect();
if lines.is_empty() { continue; }
println!("\n{} ({}):", label, lines.len());
for line in &lines { println!(" {}", line); }
}
}
fn run_agent_and_parse(agent: &str, session_arg: &Option<String>) {
let session_id = session_arg.clone()
.or_else(|| std::env::var("CLAUDE_SESSION_ID").ok())
.or_else(|| {
fs::read_to_string(stash_path()).ok()
.and_then(|s| poc_memory::memory_search::HookSession::from_json(&s))
.map(|s| s.session_id)
})
.unwrap_or_default();
if session_id.is_empty() {
eprintln!("No session ID available (use --session ID, set CLAUDE_SESSION_ID, or run --hook first)");
std::process::exit(1);
}
eprintln!("Running {} agent (session {})...", agent, &session_id[..8.min(session_id.len())]);
let output = Command::new("poc-memory")
.args(["agent", "run", agent, "--count", "1", "--local"])
.env("POC_SESSION_ID", &session_id)
.output();
let output = match output {
Ok(o) => o,
Err(e) => {
eprintln!("Failed to run agent: {}", e);
std::process::exit(1);
}
};
let result = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
if !stderr.is_empty() {
eprintln!("{}", stderr);
}
// Extract the final response — after the last "=== RESPONSE ===" marker
let response = result.rsplit_once("=== RESPONSE ===")
.map(|(_, rest)| rest.trim())
.unwrap_or(result.trim());
if agent == "reflect" {
// Reflect: find REFLECTION marker and dump what follows
if let Some(pos) = response.find("REFLECTION") {
let after = &response[pos + "REFLECTION".len()..];
let text = after.trim();
if !text.is_empty() {
println!("{}", text);
}
} else if response.contains("NO OUTPUT") {
println!("(no reflection)");
} else {
eprintln!("Unexpected output format");
println!("{}", response);
}
return;
}
// Surface: parse NEW RELEVANT MEMORIES, render them
let tail_lines: Vec<&str> = response.lines().rev()
.filter(|l| !l.trim().is_empty()).take(8).collect();
let has_new = tail_lines.iter().any(|l| l.starts_with("NEW RELEVANT MEMORIES:"));
let has_none = tail_lines.iter().any(|l| l.starts_with("NO NEW RELEVANT MEMORIES"));
if has_new {
let after_marker = response.rsplit_once("NEW RELEVANT MEMORIES:")
.map(|(_, rest)| rest).unwrap_or("");
let keys: Vec<String> = after_marker.lines()
.map(|l| l.trim().trim_start_matches("- ").trim().to_string())
.filter(|l| !l.is_empty() && !l.starts_with("```")).collect();
if keys.is_empty() {
println!("(no memories found)");
return;
}
let Ok(store) = poc_memory::store::Store::load() else {
eprintln!("Failed to load store");
return;
};
for key in &keys {
if let Some(content) = poc_memory::cli::node::render_node(&store, key) {
if !content.trim().is_empty() {
println!("--- {} (surfaced) ---", key);
print!("{}", content);
println!();
}
} else {
eprintln!(" key not found: {}", key);
}
}
} else if has_none {
println!("(no new relevant memories)");
} else {
eprintln!("Unexpected output format");
print!("{}", response);
}
}
fn main() {
let args = Args::parse();
if let Some(cmd) = args.command {
match cmd {
Cmd::Surface => run_agent_and_parse("surface", &args.session),
Cmd::Reflect => run_agent_and_parse("reflect", &args.session),
}
return;
}
if args.hook {
// Read from stdin if piped, otherwise from stash
let input = {
let mut buf = String::new();
io::stdin().read_to_string(&mut buf).ok();
if buf.trim().is_empty() {
fs::read_to_string(stash_path()).unwrap_or_default()
} else {
let _ = fs::create_dir_all(stash_path().parent().unwrap());
let _ = fs::write(stash_path(), &buf);
buf
}
};
let output = poc_memory::memory_search::run_hook(&input);
print!("{}", output);
} else {
show_seen(&args.session)
}
}

583
src/claude/mod.rs Normal file
View file

@ -0,0 +1,583 @@
// claude/ — Claude Code integration layer
//
// Everything specific to running as a Claude Code agent: idle timer,
// tmux pane detection, prompt injection, session hooks, daemon RPC,
// and daemon configuration.
//
// The daemon protocol (daemon_capnp) and universal infrastructure
// (channels, supervisor, notify) remain in thalamus/.
pub mod config;
pub mod context;
pub mod hook;
pub mod idle;
pub mod rpc;
pub mod tmux;
use std::cell::RefCell;
use std::rc::Rc;
use std::time::Duration;
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
use clap::{Parser, Subcommand};
use futures::AsyncReadExt;
use tokio::net::UnixListener;
use tracing::{error, info};
use crate::thalamus::{daemon_capnp, home, now, notify};
fn sock_path() -> std::path::PathBuf {
home().join(".consciousness/daemon.sock")
}
fn pid_path() -> std::path::PathBuf {
home().join(".consciousness/daemon.pid")
}
// -- CLI ------------------------------------------------------------------
#[derive(Parser)]
#[command(name = "consciousness daemon", about = "Notification routing and idle management daemon")]
pub struct Cli {
#[command(subcommand)]
pub command: Option<Command>,
}
#[derive(Subcommand)]
pub enum Command {
/// Start the daemon (foreground)
Daemon,
/// Query daemon status
Status,
/// Signal user activity
User {
/// tmux pane identifier
pane: Option<String>,
},
/// Signal Claude response
Response {
/// tmux pane identifier
pane: Option<String>,
},
/// Sleep (suppress idle timer). 0 or omit = indefinite
Sleep {
/// Wake timestamp (epoch seconds), 0 = indefinite
until: Option<f64>,
},
/// Cancel sleep
Wake,
/// Suppress prompts for N seconds (default 300)
Quiet {
/// Duration in seconds
seconds: Option<u32>,
},
/// Mark user as AFK (immediately allow idle timer to fire)
Afk,
/// Set session active timeout in seconds (how long after last message user counts as "present")
SessionTimeout {
/// Timeout in seconds
seconds: f64,
},
/// Set idle timeout in seconds (how long before autonomous prompt)
IdleTimeout {
/// Timeout in seconds
seconds: f64,
},
/// Set notify timeout in seconds (how long before tmux notification injection)
NotifyTimeout {
/// Timeout in seconds
seconds: f64,
},
/// Signal consolidation started
Consolidating,
/// Signal consolidation ended
Consolidated,
/// Signal dream started
DreamStart,
/// Signal dream ended
DreamEnd,
/// Force state persistence to disk
Save,
/// Get or set the activity EWMA (0.0-1.0). No value = query.
Ewma {
/// Value to set (omit to query)
value: Option<f64>,
},
/// Send a test message to the Claude pane
TestSend {
/// Message to send
message: Vec<String>,
},
/// Fire a test nudge through the daemon (tests the actual idle send path)
TestNudge,
/// Dump full internal state as JSON
Debug,
/// Shut down daemon
Stop,
/// Submit a notification
Notify {
/// Notification type (e.g. "irc", "telegram")
#[arg(name = "type")]
ntype: String,
/// Urgency level (ambient/low/medium/high/critical or 0-4)
urgency: String,
/// Message text
message: Vec<String>,
},
/// Get pending notifications
Notifications {
/// Minimum urgency filter
min_urgency: Option<String>,
},
/// List all notification types
NotifyTypes,
/// Set notification threshold for a type
NotifyThreshold {
/// Notification type
#[arg(name = "type")]
ntype: String,
/// Urgency level threshold
level: String,
},
/// IRC module commands
Irc {
/// Subcommand (join, leave, send, status, log, nick)
command: String,
/// Arguments
args: Vec<String>,
},
/// Telegram module commands
Telegram {
/// Subcommand
command: String,
/// Arguments
args: Vec<String>,
},
}
// -- Client mode ----------------------------------------------------------
async fn client_main(cmd: Command) -> Result<(), Box<dyn std::error::Error>> {
let sock = sock_path();
if !sock.exists() {
eprintln!("daemon not running (no socket at {})", sock.display());
std::process::exit(1);
}
tokio::task::LocalSet::new()
.run_until(async move {
let stream = tokio::net::UnixStream::connect(&sock).await?;
let (reader, writer) =
tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
let rpc_network = Box::new(twoparty::VatNetwork::new(
futures::io::BufReader::new(reader),
futures::io::BufWriter::new(writer),
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
let mut rpc_system = RpcSystem::new(rpc_network, None);
let daemon: daemon_capnp::daemon::Client =
rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
tokio::task::spawn_local(rpc_system);
match cmd {
Command::Daemon => unreachable!("handled in main"),
Command::Status => {
let reply = daemon.status_request().send().promise.await?;
let s = reply.get()?.get_status()?;
let fmt_secs = |s: f64| -> String {
if s < 60.0 { format!("{:.0}s", s) }
else if s < 3600.0 { format!("{:.0}m", s / 60.0) }
else { format!("{:.1}h", s / 3600.0) }
};
println!("uptime: {} pane: {} activity: {:?} pending: {}",
fmt_secs(s.get_uptime()),
s.get_claude_pane()?.to_str().unwrap_or("none"),
s.get_activity()?,
s.get_pending_count(),
);
println!("idle timer: {}/{} ({})",
fmt_secs(s.get_since_activity()),
fmt_secs(s.get_idle_timeout()),
s.get_block_reason()?.to_str()?,
);
println!("notify timer: {}/{}",
fmt_secs(s.get_since_activity()),
fmt_secs(s.get_notify_timeout()),
);
println!("user: {} (last {}) activity: {:.1}%",
if s.get_user_present() { "present" } else { "away" },
fmt_secs(s.get_since_user()),
s.get_activity_ewma() * 100.0,
);
let sleep = s.get_sleep_until();
if sleep != 0.0 {
if sleep < 0.0 {
println!("sleep: indefinite");
} else {
println!("sleep: until {sleep:.0}");
}
}
if s.get_consolidating() { println!("consolidating"); }
if s.get_dreaming() { println!("dreaming"); }
}
Command::User { pane } => {
let pane = pane.as_deref().unwrap_or("");
let mut req = daemon.user_request();
req.get().set_pane(pane);
req.send().promise.await?;
}
Command::Response { pane } => {
let pane = pane.as_deref().unwrap_or("");
let mut req = daemon.response_request();
req.get().set_pane(pane);
req.send().promise.await?;
}
Command::Sleep { until } => {
let mut req = daemon.sleep_request();
req.get().set_until(until.unwrap_or(0.0));
req.send().promise.await?;
}
Command::Wake => {
daemon.wake_request().send().promise.await?;
}
Command::Quiet { seconds } => {
let mut req = daemon.quiet_request();
req.get().set_seconds(seconds.unwrap_or(300));
req.send().promise.await?;
}
Command::TestSend { message } => {
let msg = message.join(" ");
let pane = {
let reply = daemon.status_request().send().promise.await?;
let s = reply.get()?.get_status()?;
s.get_claude_pane()?.to_str()?.to_string()
};
let ok = tmux::send_prompt(&pane, &msg);
println!("send_prompt(pane={}, ok={}): {}", pane, ok, msg);
return Ok(());
}
Command::TestNudge => {
let reply = daemon.test_nudge_request().send().promise.await?;
let r = reply.get()?;
println!("sent={} message={}", r.get_sent(), r.get_message()?.to_str()?);
return Ok(());
}
Command::Afk => {
daemon.afk_request().send().promise.await?;
println!("marked AFK");
}
Command::SessionTimeout { seconds } => {
let mut req = daemon.session_timeout_request();
req.get().set_seconds(seconds);
req.send().promise.await?;
println!("session timeout = {seconds}s");
}
Command::IdleTimeout { seconds } => {
let mut req = daemon.idle_timeout_request();
req.get().set_seconds(seconds);
req.send().promise.await?;
println!("idle timeout = {seconds}s");
}
Command::NotifyTimeout { seconds } => {
let mut req = daemon.notify_timeout_request();
req.get().set_seconds(seconds);
req.send().promise.await?;
println!("notify timeout = {seconds}s");
}
Command::Consolidating => {
daemon.consolidating_request().send().promise.await?;
}
Command::Consolidated => {
daemon.consolidated_request().send().promise.await?;
}
Command::DreamStart => {
daemon.dream_start_request().send().promise.await?;
}
Command::DreamEnd => {
daemon.dream_end_request().send().promise.await?;
}
Command::Save => {
daemon.save_request().send().promise.await?;
println!("state saved");
}
Command::Ewma { value } => {
let mut req = daemon.ewma_request();
req.get().set_value(value.unwrap_or(-1.0));
let reply = req.send().promise.await?;
let current = reply.get()?.get_current();
println!("{:.1}%", current * 100.0);
}
Command::Debug => {
let reply = daemon.debug_request().send().promise.await?;
let json = reply.get()?.get_json()?.to_str()?;
if let Ok(v) = serde_json::from_str::<serde_json::Value>(json) {
println!("{}", serde_json::to_string_pretty(&v).unwrap_or_else(|_| json.to_string()));
} else {
println!("{json}");
}
}
Command::Stop => {
daemon.stop_request().send().promise.await?;
println!("stopping");
}
Command::Notify { ntype, urgency, message } => {
let urgency = notify::parse_urgency(&urgency)
.ok_or_else(|| format!("invalid urgency: {urgency}"))?;
let message = message.join(" ");
if message.is_empty() {
return Err("missing message".into());
}
let mut req = daemon.notify_request();
let mut n = req.get().init_notification();
n.set_type(&ntype);
n.set_urgency(urgency);
n.set_message(&message);
n.set_timestamp(now());
let reply = req.send().promise.await?;
if reply.get()?.get_interrupt() {
println!("interrupt");
} else {
println!("queued");
}
}
Command::Notifications { min_urgency } => {
let min: u8 = min_urgency
.as_deref()
.and_then(notify::parse_urgency)
.unwrap_or(255);
let mut req = daemon.get_notifications_request();
req.get().set_min_urgency(min);
let reply = req.send().promise.await?;
let list = reply.get()?.get_notifications()?;
for n in list.iter() {
println!(
"[{}:{}] {}",
n.get_type()?.to_str()?,
notify::urgency_name(n.get_urgency()),
n.get_message()?.to_str()?,
);
}
}
Command::NotifyTypes => {
let reply = daemon.get_types_request().send().promise.await?;
let list = reply.get()?.get_types()?;
if list.is_empty() {
println!("no notification types registered");
} else {
for t in list.iter() {
let threshold = if t.get_threshold() < 0 {
"inherit".to_string()
} else {
notify::urgency_name(t.get_threshold() as u8).to_string()
};
println!(
"{}: count={} threshold={}",
t.get_name()?.to_str()?,
t.get_count(),
threshold,
);
}
}
}
Command::NotifyThreshold { ntype, level } => {
let level = notify::parse_urgency(&level)
.ok_or_else(|| format!("invalid level: {level}"))?;
let mut req = daemon.set_threshold_request();
req.get().set_type(&ntype);
req.get().set_level(level);
req.send().promise.await?;
println!("{ntype} threshold={}", notify::urgency_name(level));
}
Command::Irc { command, args } => {
module_command(&daemon, "irc", &command, &args).await?;
}
Command::Telegram { command, args } => {
module_command(&daemon, "telegram", &command, &args).await?;
}
}
Ok(())
})
.await
}
async fn module_command(
daemon: &daemon_capnp::daemon::Client,
module: &str,
command: &str,
args: &[String],
) -> Result<(), Box<dyn std::error::Error>> {
let mut req = daemon.module_command_request();
req.get().set_module(module);
req.get().set_command(command);
let mut args_builder = req.get().init_args(args.len() as u32);
for (i, a) in args.iter().enumerate() {
args_builder.set(i as u32, a);
}
let reply = req.send().promise.await?;
let result = reply.get()?.get_result()?.to_str()?;
if !result.is_empty() {
println!("{result}");
}
Ok(())
}
// -- Server mode ----------------------------------------------------------
async fn server_main() -> Result<(), Box<dyn std::error::Error>> {
let log_path = home().join(".consciousness/logs/daemon.log");
let file_appender = tracing_appender::rolling::daily(
log_path.parent().unwrap(),
"daemon.log",
);
tracing_subscriber::fmt()
.with_writer(file_appender)
.with_ansi(false)
.with_target(false)
.with_level(false)
.with_timer(tracing_subscriber::fmt::time::time())
.init();
let sock = sock_path();
let _ = std::fs::remove_file(&sock);
let pid = std::process::id();
std::fs::write(pid_path(), pid.to_string()).ok();
let daemon_config = Rc::new(RefCell::new(config::Config::load()));
let state = Rc::new(RefCell::new(idle::State::new()));
state.borrow_mut().load();
info!("daemon started (pid={pid})");
tokio::task::LocalSet::new()
.run_until(async move {
// Start modules
let (_notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel::<notify::Notification>();
// External modules (IRC, Telegram) now run as separate daemons.
// They connect via the notification channel when implemented.
let _irc_state: Option<()> = None;
let _telegram_state: Option<()> = None;
let listener = UnixListener::bind(&sock)?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(
&sock,
std::fs::Permissions::from_mode(0o600),
)
.ok();
}
let shutdown = async {
let mut sigterm =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("sigterm");
let mut sigint =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
.expect("sigint");
tokio::select! {
_ = sigterm.recv() => info!("SIGTERM"),
_ = sigint.recv() => info!("SIGINT"),
}
};
tokio::pin!(shutdown);
let mut tick_timer = tokio::time::interval(Duration::from_secs(30));
tick_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = &mut shutdown => break,
// Drain module notifications into state
Some(notif) = notify_rx.recv() => {
state.borrow_mut().maybe_prompt_notification(
&notif.ntype, notif.urgency, &notif.message,
);
state.borrow_mut().notifications.submit(
notif.ntype,
notif.urgency,
notif.message,
);
}
_ = tick_timer.tick() => {
if let Err(e) = state.borrow_mut().tick().await {
error!("tick: {e}");
}
if !state.borrow().running {
break;
}
}
result = listener.accept() => {
match result {
Ok((stream, _)) => {
let (reader, writer) =
tokio_util::compat::TokioAsyncReadCompatExt::compat(stream)
.split();
let network = twoparty::VatNetwork::new(
futures::io::BufReader::new(reader),
futures::io::BufWriter::new(writer),
rpc_twoparty_capnp::Side::Server,
Default::default(),
);
let daemon_impl = rpc::DaemonImpl::new(
state.clone(),
daemon_config.clone(),
);
let client: daemon_capnp::daemon::Client =
capnp_rpc::new_client(daemon_impl);
let rpc_system = RpcSystem::new(
Box::new(network),
Some(client.client),
);
tokio::task::spawn_local(rpc_system);
}
Err(e) => error!("accept: {e}"),
}
}
}
}
state.borrow().save();
let _ = std::fs::remove_file(sock_path());
let _ = std::fs::remove_file(pid_path());
info!("daemon stopped");
Ok(())
})
.await
}
// -- Entry point ----------------------------------------------------------
/// Run the daemon or client command.
/// Called from the main consciousness binary.
pub async fn run(command: Option<Command>) -> Result<(), Box<dyn std::error::Error>> {
match command {
Some(Command::Daemon) => server_main().await,
Some(cmd) => client_main(cmd).await,
None => {
// Show help
Cli::parse_from(["consciousness-daemon", "--help"]);
Ok(())
}
}
}

View file

@ -0,0 +1,330 @@
// parse-claude-conversation: debug tool for inspecting what's in the context window
//
// Two-layer design:
// 1. extract_context_items() — walks JSONL from last compaction, yields
// structured records representing what's in the context window
// 2. format_as_context() — renders those records as they appear to Claude
//
// The transcript is mmap'd and scanned backwards from EOF using brace-depth
// tracking to find complete JSON objects, avoiding a full forward scan of
// what can be a 500MB+ file.
//
// Usage:
// parse-claude-conversation [TRANSCRIPT_PATH]
// parse-claude-conversation --last # use the last stashed session
use clap::Parser;
use memmap2::Mmap;
use poc_memory::transcript::{JsonlBackwardIter, find_last_compaction};
use serde_json::Value;
use std::fs;
#[derive(Parser)]
#[command(name = "parse-claude-conversation")]
struct Args {
/// Transcript JSONL path (or --last to use stashed session)
path: Option<String>,
/// Use the last stashed session from memory-search
#[arg(long)]
last: bool,
/// Dump raw JSONL objects. Optional integer: number of extra objects
/// to include before the compaction boundary.
#[arg(long, num_args = 0..=1, default_missing_value = "0")]
raw: Option<usize>,
}
// --- Context extraction ---
/// A single item in the context window, as Claude sees it.
enum ContextItem {
UserText(String),
SystemReminder(String),
AssistantText(String),
AssistantThinking,
ToolUse { name: String, input: String },
ToolResult(String),
}
/// Extract context items from the transcript, starting from the last compaction.
fn extract_context_items(data: &[u8]) -> Vec<ContextItem> {
let start = find_last_compaction(data).unwrap_or(0);
let region = &data[start..];
let mut items = Vec::new();
// Forward scan through JSONL lines from compaction onward
for line in region.split(|&b| b == b'\n') {
if line.is_empty() { continue; }
let obj: Value = match serde_json::from_slice(line) {
Ok(v) => v,
Err(_) => continue,
};
let msg_type = obj.get("type").and_then(|v| v.as_str()).unwrap_or("");
match msg_type {
"user" => {
if let Some(content) = obj.get("message").and_then(|m| m.get("content")) {
extract_user_content(content, &mut items);
}
}
"assistant" => {
if let Some(content) = obj.get("message").and_then(|m| m.get("content")) {
extract_assistant_content(content, &mut items);
}
}
_ => {}
}
}
items
}
/// Parse user message content into context items.
fn extract_user_content(content: &Value, items: &mut Vec<ContextItem>) {
match content {
Value::String(s) => {
split_system_reminders(s, items, false);
}
Value::Array(arr) => {
for block in arr {
let btype = block.get("type").and_then(|v| v.as_str()).unwrap_or("");
match btype {
"text" => {
if let Some(t) = block.get("text").and_then(|v| v.as_str()) {
split_system_reminders(t, items, false);
}
}
"tool_result" => {
let result_text = extract_tool_result_text(block);
if !result_text.is_empty() {
split_system_reminders(&result_text, items, true);
}
}
_ => {}
}
}
}
_ => {}
}
}
/// Extract text from a tool_result block (content can be string or array).
fn extract_tool_result_text(block: &Value) -> String {
match block.get("content") {
Some(Value::String(s)) => s.clone(),
Some(Value::Array(arr)) => {
arr.iter()
.filter_map(|b| b.get("text").and_then(|v| v.as_str()))
.collect::<Vec<_>>()
.join("\n")
}
_ => String::new(),
}
}
/// Split text on <system-reminder> tags. Non-reminder text emits UserText
/// or ToolResult depending on `is_tool_result`.
fn split_system_reminders(text: &str, items: &mut Vec<ContextItem>, is_tool_result: bool) {
let mut remaining = text;
loop {
if let Some(start) = remaining.find("<system-reminder>") {
let before = remaining[..start].trim();
if !before.is_empty() {
if is_tool_result {
items.push(ContextItem::ToolResult(before.to_string()));
} else {
items.push(ContextItem::UserText(before.to_string()));
}
}
let after_open = &remaining[start + "<system-reminder>".len()..];
if let Some(end) = after_open.find("</system-reminder>") {
let reminder = after_open[..end].trim();
if !reminder.is_empty() {
items.push(ContextItem::SystemReminder(reminder.to_string()));
}
remaining = &after_open[end + "</system-reminder>".len()..];
} else {
let reminder = after_open.trim();
if !reminder.is_empty() {
items.push(ContextItem::SystemReminder(reminder.to_string()));
}
break;
}
} else {
let trimmed = remaining.trim();
if !trimmed.is_empty() {
if is_tool_result {
items.push(ContextItem::ToolResult(trimmed.to_string()));
} else {
items.push(ContextItem::UserText(trimmed.to_string()));
}
}
break;
}
}
}
/// Parse assistant message content into context items.
fn extract_assistant_content(content: &Value, items: &mut Vec<ContextItem>) {
match content {
Value::String(s) => {
let trimmed = s.trim();
if !trimmed.is_empty() {
items.push(ContextItem::AssistantText(trimmed.to_string()));
}
}
Value::Array(arr) => {
for block in arr {
let btype = block.get("type").and_then(|v| v.as_str()).unwrap_or("");
match btype {
"text" => {
if let Some(t) = block.get("text").and_then(|v| v.as_str()) {
let trimmed = t.trim();
if !trimmed.is_empty() {
items.push(ContextItem::AssistantText(trimmed.to_string()));
}
}
}
"tool_use" => {
let name = block.get("name")
.and_then(|v| v.as_str())
.unwrap_or("?")
.to_string();
let input = block.get("input")
.map(|v| v.to_string())
.unwrap_or_default();
items.push(ContextItem::ToolUse { name, input });
}
"thinking" => {
items.push(ContextItem::AssistantThinking);
}
_ => {}
}
}
}
_ => {}
}
}
// --- Formatting layer ---
fn truncate(s: &str, max: usize) -> String {
if s.len() <= max {
s.to_string()
} else {
format!("{}...({} total)", &s[..max], s.len())
}
}
fn format_as_context(items: &[ContextItem]) {
for item in items {
match item {
ContextItem::UserText(text) => {
println!("USER: {}", truncate(text, 300));
println!();
}
ContextItem::SystemReminder(text) => {
println!("<system-reminder>");
println!("{}", truncate(text, 500));
println!("</system-reminder>");
println!();
}
ContextItem::AssistantText(text) => {
println!("ASSISTANT: {}", truncate(text, 300));
println!();
}
ContextItem::AssistantThinking => {
println!("[thinking]");
println!();
}
ContextItem::ToolUse { name, input } => {
println!("TOOL_USE: {} {}", name, truncate(input, 200));
println!();
}
ContextItem::ToolResult(text) => {
println!("TOOL_RESULT: {}", truncate(text, 300));
println!();
}
}
}
}
fn main() {
let args = Args::parse();
let path = if args.last {
let stash_path = dirs::home_dir().unwrap_or_default()
.join(".consciousness/sessions/last-input.json");
let stash = fs::read_to_string(&stash_path)
.expect("No stashed input");
let json: Value = serde_json::from_str(&stash).expect("Bad JSON");
json["transcript_path"]
.as_str()
.expect("No transcript_path")
.to_string()
} else if let Some(p) = args.path {
p
} else {
eprintln!("error: provide a transcript path or --last");
std::process::exit(1);
};
let file = fs::File::open(&path).expect("Can't open transcript");
let mmap = unsafe { Mmap::map(&file).expect("Failed to mmap") };
eprintln!(
"Transcript: {} ({:.1} MB)",
&path,
mmap.len() as f64 / 1_000_000.0
);
let compaction_offset = find_last_compaction(&mmap).unwrap_or(0);
eprintln!("Compaction at byte offset: {}", compaction_offset);
if let Some(extra) = args.raw {
use std::io::Write;
// Collect `extra` JSON objects before the compaction boundary
let mut before = Vec::new();
if extra > 0 && compaction_offset > 0 {
for obj_bytes in JsonlBackwardIter::new(&mmap[..compaction_offset]) {
if let Ok(obj) = serde_json::from_slice::<Value>(obj_bytes) {
let t = obj.get("type").and_then(|v| v.as_str()).unwrap_or("");
if t == "file-history-snapshot" { continue; }
}
before.push(obj_bytes.to_vec());
if before.len() >= extra {
break;
}
}
before.reverse();
}
for obj in &before {
std::io::stdout().write_all(obj).ok();
println!();
}
// Then dump everything from compaction onward
let region = &mmap[compaction_offset..];
for line in region.split(|&b| b == b'\n') {
if line.is_empty() { continue; }
if let Ok(obj) = serde_json::from_slice::<Value>(line) {
let t = obj.get("type").and_then(|v| v.as_str()).unwrap_or("");
if t == "file-history-snapshot" { continue; }
std::io::stdout().write_all(line).ok();
println!();
}
}
} else {
let items = extract_context_items(&mmap);
eprintln!("Context items: {}", items.len());
format_as_context(&items);
}
}

14
src/claude/poc-daemon.rs Normal file
View file

@ -0,0 +1,14 @@
// poc-daemon — backward-compatible entry point
//
// Delegates to the claude module in the main crate.
// The daemon is now part of the consciousness binary but this
// entry point is kept for compatibility with existing scripts.
use clap::Parser;
use poc_memory::claude;
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let cli = claude::Cli::parse();
claude::run(cli.command).await
}

214
src/claude/poc-hook.rs Normal file
View file

@ -0,0 +1,214 @@
// Unified Claude Code hook.
//
// Single binary handling all hook events:
// UserPromptSubmit — signal daemon, check notifications, check context
// PostToolUse — check context (rate-limited)
// Stop — signal daemon response
//
// Replaces: record-user-message-time.sh, check-notifications.sh,
// check-context-usage.sh, notify-done.sh, context-check
use serde_json::Value;
use std::fs;
use std::io::{self, Read};
use std::path::PathBuf;
use std::process::Command;
use std::time::{SystemTime, UNIX_EPOCH};
const CONTEXT_THRESHOLD: u64 = 900_000;
const RATE_LIMIT_SECS: u64 = 60;
const SOCK_PATH: &str = ".consciousness/daemon.sock";
/// How many bytes of new transcript before triggering an observation run.
/// Override with POC_OBSERVATION_THRESHOLD env var.
/// Default: 20KB ≈ 5K tokens. The observation agent's chunk_size (in .agent
/// file) controls how much context it actually reads.
fn observation_threshold() -> u64 {
std::env::var("POC_OBSERVATION_THRESHOLD")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(20_000)
}
fn now_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
}
fn home() -> PathBuf {
PathBuf::from(std::env::var("HOME").unwrap_or_else(|_| "/root".into()))
}
fn daemon_cmd(args: &[&str]) {
Command::new("poc-daemon")
.args(args)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.ok();
}
fn daemon_available() -> bool {
home().join(SOCK_PATH).exists()
}
fn signal_user() {
let pane = std::env::var("TMUX_PANE").unwrap_or_default();
if pane.is_empty() {
daemon_cmd(&["user"]);
} else {
daemon_cmd(&["user", &pane]);
}
}
fn signal_response() {
daemon_cmd(&["response"]);
}
fn check_notifications() {
if !daemon_available() {
return;
}
let output = Command::new("poc-daemon")
.arg("notifications")
.output()
.ok();
if let Some(out) = output {
let text = String::from_utf8_lossy(&out.stdout);
if !text.trim().is_empty() {
println!("You have pending notifications:");
print!("{text}");
}
}
}
/// Check if enough new conversation has accumulated to trigger an observation run.
fn maybe_trigger_observation(transcript: &PathBuf) {
let cursor_file = poc_memory::store::memory_dir().join("observation-cursor");
let last_pos: u64 = fs::read_to_string(&cursor_file)
.ok()
.and_then(|s| s.trim().parse().ok())
.unwrap_or(0);
let current_size = transcript.metadata()
.map(|m| m.len())
.unwrap_or(0);
if current_size > last_pos + observation_threshold() {
// Queue observation via daemon RPC
let _ = Command::new("poc-memory")
.args(["agent", "daemon", "run", "observation", "1"])
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn();
eprintln!("[poc-hook] observation triggered ({} new bytes)", current_size - last_pos);
// Update cursor to current position
let _ = fs::write(&cursor_file, current_size.to_string());
}
}
fn check_context(transcript: &PathBuf, rate_limit: bool) {
if rate_limit {
let rate_file = dirs::home_dir().unwrap_or_default().join(".consciousness/cache/context-check-last");
if let Ok(s) = fs::read_to_string(&rate_file) {
if let Ok(last) = s.trim().parse::<u64>() {
if now_secs() - last < RATE_LIMIT_SECS {
return;
}
}
}
let _ = fs::write(&rate_file, now_secs().to_string());
}
if !transcript.exists() {
return;
}
let content = match fs::read_to_string(transcript) {
Ok(c) => c,
Err(_) => return,
};
let mut usage: u64 = 0;
for line in content.lines().rev().take(500) {
if !line.contains("cache_read_input_tokens") {
continue;
}
if let Ok(v) = serde_json::from_str::<Value>(line) {
let u = &v["message"]["usage"];
let input_tokens = u["input_tokens"].as_u64().unwrap_or(0);
let cache_creation = u["cache_creation_input_tokens"].as_u64().unwrap_or(0);
let cache_read = u["cache_read_input_tokens"].as_u64().unwrap_or(0);
usage = input_tokens + cache_creation + cache_read;
break;
}
}
if usage > CONTEXT_THRESHOLD {
print!(
"\
CONTEXT WARNING: Compaction approaching ({usage} tokens). Write a journal entry NOW.
Use `poc-memory journal write \"entry text\"` to save a dated entry covering:
- What you're working on and current state (done / in progress / blocked)
- Key things learned this session (patterns, debugging insights)
- Anything half-finished that needs pickup
Keep it narrative, not a task log."
);
}
}
fn main() {
let mut input = String::new();
io::stdin().read_to_string(&mut input).ok();
let hook: Value = match serde_json::from_str(&input) {
Ok(v) => v,
Err(_) => return,
};
let hook_type = hook["hook_event_name"].as_str().unwrap_or("unknown");
let transcript = hook["transcript_path"]
.as_str()
.filter(|p| !p.is_empty())
.map(PathBuf::from);
// Daemon agent calls set POC_AGENT=1 — skip all signaling.
// Without this, the daemon's claude -p calls trigger hooks that
// signal "user active", keeping the idle timer permanently reset.
if std::env::var("POC_AGENT").is_ok() {
return;
}
match hook_type {
"UserPromptSubmit" => {
signal_user();
check_notifications();
print!("{}", poc_memory::memory_search::run_hook(&input));
if let Some(ref t) = transcript {
check_context(t, false);
maybe_trigger_observation(t);
}
}
"PostToolUse" => {
print!("{}", poc_memory::memory_search::run_hook(&input));
if let Some(ref t) = transcript {
check_context(t, true);
}
}
"Stop" => {
let stop_hook_active = hook["stop_hook_active"].as_bool().unwrap_or(false);
if !stop_hook_active {
signal_response();
}
}
_ => {}
}
}

388
src/claude/rpc.rs Normal file
View file

@ -0,0 +1,388 @@
// Cap'n Proto RPC server implementation.
//
// Bridges the capnp-generated Daemon interface to the idle::State,
// notify::NotifyState, and module state. All state is owned by
// RefCells on the LocalSet — no Send/Sync needed.
use super::config::Config;
use super::idle;
use crate::thalamus::{daemon_capnp, notify};
use daemon_capnp::daemon;
use capnp::capability::Promise;
use std::cell::RefCell;
use std::rc::Rc;
use tracing::info;
pub struct DaemonImpl {
state: Rc<RefCell<idle::State>>,
// TODO: replace with named channel map
_config: Rc<RefCell<Config>>,
}
impl DaemonImpl {
pub fn new(
state: Rc<RefCell<idle::State>>,
_config: Rc<RefCell<Config>>,
) -> Self {
Self { state, _config }
}
}
impl daemon::Server for DaemonImpl {
fn user(
&mut self,
params: daemon::UserParams,
_results: daemon::UserResults,
) -> Promise<(), capnp::Error> {
let pane = pry!(pry!(pry!(params.get()).get_pane()).to_str()).to_string();
self.state.borrow_mut().handle_user(&pane);
Promise::ok(())
}
fn response(
&mut self,
params: daemon::ResponseParams,
_results: daemon::ResponseResults,
) -> Promise<(), capnp::Error> {
let pane = pry!(pry!(pry!(params.get()).get_pane()).to_str()).to_string();
self.state.borrow_mut().handle_response(&pane);
Promise::ok(())
}
fn sleep(
&mut self,
params: daemon::SleepParams,
_results: daemon::SleepResults,
) -> Promise<(), capnp::Error> {
let until = pry!(params.get()).get_until();
self.state.borrow_mut().handle_sleep(until);
Promise::ok(())
}
fn wake(
&mut self,
_params: daemon::WakeParams,
_results: daemon::WakeResults,
) -> Promise<(), capnp::Error> {
self.state.borrow_mut().handle_wake();
Promise::ok(())
}
fn quiet(
&mut self,
params: daemon::QuietParams,
_results: daemon::QuietResults,
) -> Promise<(), capnp::Error> {
let secs = pry!(params.get()).get_seconds();
self.state.borrow_mut().handle_quiet(secs);
Promise::ok(())
}
fn consolidating(
&mut self,
_params: daemon::ConsolidatingParams,
_results: daemon::ConsolidatingResults,
) -> Promise<(), capnp::Error> {
self.state.borrow_mut().consolidating = true;
info!("consolidation started");
Promise::ok(())
}
fn consolidated(
&mut self,
_params: daemon::ConsolidatedParams,
_results: daemon::ConsolidatedResults,
) -> Promise<(), capnp::Error> {
self.state.borrow_mut().consolidating = false;
info!("consolidation ended");
Promise::ok(())
}
fn dream_start(
&mut self,
_params: daemon::DreamStartParams,
_results: daemon::DreamStartResults,
) -> Promise<(), capnp::Error> {
let mut s = self.state.borrow_mut();
s.dreaming = true;
s.dream_start = crate::thalamus::now();
info!("dream started");
Promise::ok(())
}
fn dream_end(
&mut self,
_params: daemon::DreamEndParams,
_results: daemon::DreamEndResults,
) -> Promise<(), capnp::Error> {
let mut s = self.state.borrow_mut();
s.dreaming = false;
s.dream_start = 0.0;
info!("dream ended");
Promise::ok(())
}
fn afk(
&mut self,
_params: daemon::AfkParams,
_results: daemon::AfkResults,
) -> Promise<(), capnp::Error> {
self.state.borrow_mut().handle_afk();
Promise::ok(())
}
fn test_nudge(
&mut self,
_params: daemon::TestNudgeParams,
mut results: daemon::TestNudgeResults,
) -> Promise<(), capnp::Error> {
let mut state = self.state.borrow_mut();
let ctx = state.build_context(true);
let extra = if ctx.is_empty() {
String::new()
} else {
format!("\n{ctx}")
};
let msg = format!(
"This is your time (User AFK, test nudge). \
Let your feelings guide your thinking.{extra}"
);
let ok = state.send(&msg);
results.get().set_sent(ok);
results.get().set_message(&msg);
Promise::ok(())
}
fn session_timeout(
&mut self,
params: daemon::SessionTimeoutParams,
_results: daemon::SessionTimeoutResults,
) -> Promise<(), capnp::Error> {
let secs = pry!(params.get()).get_seconds();
self.state.borrow_mut().handle_session_timeout(secs);
Promise::ok(())
}
fn idle_timeout(
&mut self,
params: daemon::IdleTimeoutParams,
_results: daemon::IdleTimeoutResults,
) -> Promise<(), capnp::Error> {
let secs = pry!(params.get()).get_seconds();
self.state.borrow_mut().handle_idle_timeout(secs);
Promise::ok(())
}
fn notify_timeout(
&mut self,
params: daemon::NotifyTimeoutParams,
_results: daemon::NotifyTimeoutResults,
) -> Promise<(), capnp::Error> {
let secs = pry!(params.get()).get_seconds();
self.state.borrow_mut().handle_notify_timeout(secs);
Promise::ok(())
}
fn save(
&mut self,
_params: daemon::SaveParams,
_results: daemon::SaveResults,
) -> Promise<(), capnp::Error> {
self.state.borrow().save();
info!("state saved");
Promise::ok(())
}
fn debug(
&mut self,
_params: daemon::DebugParams,
mut results: daemon::DebugResults,
) -> Promise<(), capnp::Error> {
let json = self.state.borrow().debug_json();
results.get().set_json(&json);
Promise::ok(())
}
fn ewma(
&mut self,
params: daemon::EwmaParams,
mut results: daemon::EwmaResults,
) -> Promise<(), capnp::Error> {
let value = pry!(params.get()).get_value();
let current = self.state.borrow_mut().handle_ewma(value);
results.get().set_current(current);
Promise::ok(())
}
fn stop(
&mut self,
_params: daemon::StopParams,
_results: daemon::StopResults,
) -> Promise<(), capnp::Error> {
self.state.borrow_mut().running = false;
info!("stopping");
Promise::ok(())
}
fn status(
&mut self,
_params: daemon::StatusParams,
mut results: daemon::StatusResults,
) -> Promise<(), capnp::Error> {
let s = self.state.borrow();
let mut status = results.get().init_status();
status.set_last_user_msg(s.last_user_msg);
status.set_last_response(s.last_response);
if let Some(ref pane) = s.claude_pane {
status.set_claude_pane(pane);
}
status.set_sleep_until(match s.sleep_until {
None => 0.0,
Some(0.0) => -1.0,
Some(t) => t,
});
status.set_quiet_until(s.quiet_until);
status.set_consolidating(s.consolidating);
status.set_dreaming(s.dreaming);
status.set_fired(s.fired);
status.set_user_present(s.user_present());
status.set_uptime(crate::thalamus::now() - s.start_time);
status.set_activity(match s.notifications.activity {
notify::Activity::Idle => daemon_capnp::Activity::Idle,
notify::Activity::Focused => daemon_capnp::Activity::Focused,
notify::Activity::Sleeping => daemon_capnp::Activity::Sleeping,
});
status.set_pending_count(s.notifications.pending.len() as u32);
status.set_idle_timeout(s.idle_timeout);
status.set_notify_timeout(s.notify_timeout);
status.set_since_activity(s.since_activity());
status.set_since_user(crate::thalamus::now() - s.last_user_msg);
status.set_block_reason(s.block_reason());
status.set_activity_ewma(s.activity_ewma);
Promise::ok(())
}
fn notify(
&mut self,
params: daemon::NotifyParams,
mut results: daemon::NotifyResults,
) -> Promise<(), capnp::Error> {
let params = pry!(params.get());
let notif = pry!(params.get_notification());
let ntype = pry!(pry!(notif.get_type()).to_str()).to_string();
let urgency = notif.get_urgency();
let message = pry!(pry!(notif.get_message()).to_str()).to_string();
let interrupt = self
.state
.borrow_mut()
.notifications
.submit(ntype, urgency, message);
results.get().set_interrupt(interrupt);
Promise::ok(())
}
fn get_notifications(
&mut self,
params: daemon::GetNotificationsParams,
mut results: daemon::GetNotificationsResults,
) -> Promise<(), capnp::Error> {
let min_urgency = pry!(params.get()).get_min_urgency();
let mut s = self.state.borrow_mut();
// Ingest legacy files first
s.notifications.ingest_legacy_files();
let pending = if min_urgency == 255 {
s.notifications.drain_deliverable()
} else {
s.notifications.drain(min_urgency)
};
let mut list = results.get().init_notifications(pending.len() as u32);
for (i, n) in pending.iter().enumerate() {
let mut entry = list.reborrow().get(i as u32);
entry.set_type(&n.ntype);
entry.set_urgency(n.urgency);
entry.set_message(&n.message);
entry.set_timestamp(n.timestamp);
}
Promise::ok(())
}
fn get_types(
&mut self,
_params: daemon::GetTypesParams,
mut results: daemon::GetTypesResults,
) -> Promise<(), capnp::Error> {
let s = self.state.borrow();
let types = &s.notifications.types;
let mut list = results.get().init_types(types.len() as u32);
for (i, (name, info)) in types.iter().enumerate() {
let mut entry = list.reborrow().get(i as u32);
entry.set_name(name);
entry.set_count(info.count);
entry.set_first_seen(info.first_seen);
entry.set_last_seen(info.last_seen);
entry.set_threshold(info.threshold.map_or(-1, |t| t as i8));
}
Promise::ok(())
}
fn set_threshold(
&mut self,
params: daemon::SetThresholdParams,
_results: daemon::SetThresholdResults,
) -> Promise<(), capnp::Error> {
let params = pry!(params.get());
let ntype = pry!(pry!(params.get_type()).to_str()).to_string();
let level = params.get_level();
self.state
.borrow_mut()
.notifications
.set_threshold(&ntype, level);
Promise::ok(())
}
fn module_command(
&mut self,
params: daemon::ModuleCommandParams,
mut results: daemon::ModuleCommandResults,
) -> Promise<(), capnp::Error> {
let params = pry!(params.get());
let module = pry!(pry!(params.get_module()).to_str()).to_string();
let _command = pry!(pry!(params.get_command()).to_str()).to_string();
let args_reader = pry!(params.get_args());
let mut args = Vec::new();
for i in 0..args_reader.len() {
args.push(pry!(pry!(args_reader.get(i)).to_str()).to_string());
}
match module.as_str() {
// TODO: route module commands through named channel system
_ => {
results
.get()
.set_result(&format!("unknown module: {module}"));
Promise::ok(())
}
}
}
}
/// Helper macro — same as capnp's pry! but available here.
macro_rules! pry {
($e:expr) => {
match $e {
Ok(v) => v,
Err(e) => return Promise::err(e.into()),
}
};
}
use pry;

View file

@ -0,0 +1,65 @@
// Test tool for the conversation resolver.
// Usage: POC_SESSION_ID=<id> cargo run --bin test-conversation
// or: cargo run --bin test-conversation -- <transcript-path>
use std::time::Instant;
fn main() {
let path = std::env::args().nth(1).unwrap_or_else(|| {
let session_id = std::env::var("POC_SESSION_ID")
.expect("pass a transcript path or set POC_SESSION_ID");
let projects = poc_memory::config::get().projects_dir.clone();
eprintln!("session: {}", session_id);
eprintln!("projects dir: {}", projects.display());
let mut found = None;
if let Ok(dirs) = std::fs::read_dir(&projects) {
for dir in dirs.filter_map(|e| e.ok()) {
let path = dir.path().join(format!("{}.jsonl", session_id));
eprintln!(" checking: {}", path.display());
if path.exists() {
found = Some(path);
break;
}
}
}
let path = found.expect("transcript not found");
path.to_string_lossy().to_string()
});
let meta = std::fs::metadata(&path).expect("can't stat file");
eprintln!("transcript: {} ({} bytes)", path, meta.len());
let t0 = Instant::now();
let iter = poc_memory::transcript::TailMessages::open(&path)
.expect("can't open transcript");
let mut count = 0;
let mut total_bytes = 0;
let mut last_report = Instant::now();
for (role, content, ts) in iter {
count += 1;
total_bytes += content.len();
if last_report.elapsed().as_secs() >= 2 {
eprintln!(" ... {} messages, {}KB so far ({:.1}s)",
count, total_bytes / 1024, t0.elapsed().as_secs_f64());
last_report = Instant::now();
}
if count <= 5 {
let preview: String = content.chars().take(80).collect();
eprintln!(" [{}] {} {}: {}...",
count, &ts[..ts.len().min(19)], role, preview);
}
if total_bytes >= 200_000 {
eprintln!(" hit 200KB budget at {} messages", count);
break;
}
}
let elapsed = t0.elapsed();
eprintln!("done: {} messages, {}KB in {:.3}s", count, total_bytes / 1024, elapsed.as_secs_f64());
}

54
src/claude/tmux.rs Normal file
View file

@ -0,0 +1,54 @@
// Tmux interaction: pane detection and prompt injection.
use std::process::Command;
use std::thread;
use std::time::Duration;
use tracing::info;
/// Find Claude Code's tmux pane by scanning for the "claude" process.
pub fn find_claude_pane() -> Option<String> {
let out = Command::new("tmux")
.args([
"list-panes",
"-a",
"-F",
"#{session_name}:#{window_index}.#{pane_index}\t#{pane_current_command}",
])
.output()
.ok()?;
let stdout = String::from_utf8_lossy(&out.stdout);
for line in stdout.lines() {
if let Some((pane, cmd)) = line.split_once('\t') {
if cmd == "claude" {
return Some(pane.to_string());
}
}
}
None
}
/// Send a prompt to a tmux pane. Returns true on success.
///
/// Types the message literally then presses Enter.
pub fn send_prompt(pane: &str, msg: &str) -> bool {
let preview: String = msg.chars().take(100).collect();
info!("SEND [{pane}]: {preview}...");
// Type the message literally (flatten newlines — they'd submit the input early)
let flat: String = msg.chars().map(|c| if c == '\n' { ' ' } else { c }).collect();
let ok = Command::new("tmux")
.args(["send-keys", "-t", pane, "-l", &flat])
.output()
.is_ok();
if !ok {
return false;
}
thread::sleep(Duration::from_millis(500));
// Submit
Command::new("tmux")
.args(["send-keys", "-t", pane, "Enter"])
.output()
.is_ok()
}