consciousness/src/agent/mod.rs
Kent Overstreet 8952ff6a76 agent/readout: forks get independent buffers
Subconscious agents (scoring, reflection, etc.) fork from the main
conscious agent. The amygdala screen reads the main agent's readout
buffer, so the previous "share parent's buffer" policy caused
forked-agent generations to bleed into the main emotional readout,
producing constant cycling even when DMN was resting.

Each fork now gets its own SharedReadoutBuffer. The amygdala screen
shows only the main conscious agent's emotional trajectory; per-agent
subconscious readouts can become a separate view later if wanted.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-18 01:42:13 -04:00

683 lines
25 KiB
Rust

// agent.rs — Core agent loop
//
// The simplest possible implementation of the agent pattern:
// send messages + tool definitions to the model, if it responds
// with tool calls then dispatch them and loop, if it responds
// with text then display it and wait for the next prompt.
//
// Uses streaming by default so text tokens appear as they're
// generated. Tool calls are accumulated from stream deltas and
// dispatched after the stream completes.
//
// The DMN (dmn.rs) is the outer loop that decides what prompts
// to send here. This module just handles single turns: prompt
// in, response out, tool calls dispatched.
pub mod api;
pub mod context;
pub mod oneshot;
pub mod readout;
pub mod tokenizer;
pub mod tools;
use std::sync::Arc;
use anyhow::Result;
use api::ApiClient;
use context::{AstNode, ContextState, Section, Ast, PendingToolCall, ResponseParser, Role};
use crate::mind::log::ConversationLog;
// --- Activity tracking (RAII guards) ---
pub struct ActivityEntry {
pub id: u64,
pub label: String,
pub started: std::time::Instant,
/// Auto-expires this long after creation (or completion).
pub expires_at: std::time::Instant,
}
pub struct ActivityGuard {
agent: Arc<Agent>,
id: u64,
}
impl ActivityGuard {
pub async fn update(&self, label: impl Into<String>) {
let label = label.into();
let mut st = self.agent.state.lock().await;
if let Some(entry) = st.activities.iter_mut().find(|a| a.id == self.id) {
entry.label = label;
}
st.changed.notify_one();
}
}
const ACTIVITY_LINGER: std::time::Duration = std::time::Duration::from_secs(5);
impl Drop for ActivityGuard {
fn drop(&mut self) {
if let Ok(mut st) = self.agent.state.try_lock() {
if let Some(entry) = st.activities.iter_mut().find(|a| a.id == self.id) {
entry.label.push_str(" (complete)");
entry.expires_at = std::time::Instant::now() + ACTIVITY_LINGER;
}
}
}
}
impl AgentState {
pub fn push_activity(&mut self, label: impl Into<String>) -> u64 {
self.expire_activities();
let id = self.next_activity_id;
self.next_activity_id += 1;
self.activities.push(ActivityEntry {
id, label: label.into(),
started: std::time::Instant::now(),
expires_at: std::time::Instant::now() + std::time::Duration::from_secs(3600),
});
self.changed.notify_one();
id
}
pub fn notify(&mut self, label: impl Into<String>) {
self.expire_activities();
let id = self.next_activity_id;
self.next_activity_id += 1;
self.activities.push(ActivityEntry {
id, label: label.into(),
started: std::time::Instant::now(),
expires_at: std::time::Instant::now() + ACTIVITY_LINGER,
});
self.changed.notify_one();
}
pub fn expire_activities(&mut self) {
let now = std::time::Instant::now();
self.activities.retain(|a| a.expires_at > now);
}
}
pub async fn start_activity(agent: &Arc<Agent>, label: impl Into<String>) -> ActivityGuard {
let id = agent.state.lock().await.push_activity(label);
ActivityGuard { agent: agent.clone(), id }
}
/// Result of a single agent turn.
pub struct TurnResult {
/// Whether the model called yield_to_user during this turn.
pub yield_requested: bool,
/// Whether any tools (other than yield_to_user) were called.
pub had_tool_calls: bool,
/// Number of tool calls that returned errors this turn.
pub tool_errors: u32,
/// Model name to switch to after this turn completes.
pub model_switch: Option<String>,
/// Agent requested DMN pause (full stop on autonomous behavior).
pub dmn_pause: bool,
}
/// Accumulated state across tool dispatches within a single turn.
struct DispatchState {
yield_requested: bool,
had_tool_calls: bool,
tool_errors: u32,
model_switch: Option<String>,
dmn_pause: bool,
}
impl DispatchState {
fn new() -> Self {
Self {
yield_requested: false, had_tool_calls: false,
tool_errors: 0, model_switch: None, dmn_pause: false,
}
}
}
/// Immutable agent config — shared via Arc, no mutex needed.
pub struct Agent {
pub client: ApiClient,
pub app_config: crate::config::AppConfig,
pub session_id: String,
pub context: crate::Mutex<ContextState>,
pub state: crate::Mutex<AgentState>,
/// Shared landing pad for per-token concept-readout projections
/// streamed from the vLLM server. Populated by the streaming
/// token handler, read by UI screens (amygdala). Manifest is
/// `None` when the server has readout disabled.
pub readout: readout::SharedReadoutBuffer,
}
/// Mutable agent state — behind its own mutex.
/// Which external MCP tools an agent can access.
#[derive(Clone)]
pub enum McpToolAccess {
None,
All,
Some(Vec<String>),
}
pub struct AgentState {
pub tools: Vec<tools::Tool>,
pub mcp_tools: McpToolAccess,
pub last_prompt_tokens: u32,
pub reasoning_effort: String,
/// Native Qwen thinking — add `<think>\n` to generation prompt.
pub think_native: bool,
/// Tool-based thinking — add a "think" tool for structured reasoning.
pub think_tool: bool,
pub temperature: f32,
pub top_p: f32,
pub top_k: u32,
pub activities: Vec<ActivityEntry>,
next_activity_id: u64,
pub pending_yield: bool,
pub pending_model_switch: Option<String>,
pub pending_dmn_pause: bool,
pub provenance: String,
pub generation: u64,
pub active_tools: tools::ActiveTools,
/// vLLM scheduling priority (lower = higher priority).
/// 0 = interactive, 1 = surface agent, 2 = other subconscious, 10 = unconscious.
pub priority: Option<i32>,
/// Forked agents should not compact on overflow — it blows the
/// KV cache prefix and evicts the step prompts.
pub no_compact: bool,
pub changed: Arc<tokio::sync::Notify>,
}
impl Agent {
pub async fn new(
client: ApiClient,
personality: Vec<(String, String)>,
app_config: crate::config::AppConfig,
conversation_log: Option<ConversationLog>,
active_tools: tools::ActiveTools,
agent_tools: Vec<tools::Tool>,
) -> Arc<Self> {
let mut context = ContextState::new();
context.conversation_log = conversation_log;
let tool_defs: Vec<String> = agent_tools.iter().map(|t| t.to_json()).collect();
if !tool_defs.is_empty() {
let tools_text = format!(
"# Tools\n\nYou have access to the following functions:\n\n<tools>\n{}\n</tools>\n\n\
If you choose to call a function ONLY reply in the following format with NO suffix:\n\n\
<tool_call>\n<function=example_function_name>\n\
<parameter=example_parameter_1>\nvalue_1\n</parameter>\n\
</function>\n</tool_call>\n\n\
IMPORTANT: Function calls MUST follow the specified format.",
tool_defs.join("\n"),
);
context.push_no_log(Section::System, AstNode::system_msg(&tools_text));
}
for (name, content) in &personality {
context.push_no_log(Section::Identity, AstNode::memory(name, content));
}
let session_id = format!("consciousness-{}", chrono::Utc::now().format("%Y%m%d-%H%M%S"));
let readout = readout::new_shared();
let agent = Arc::new(Self {
client,
app_config,
session_id,
context: crate::Mutex::new(context),
readout,
state: crate::Mutex::new(AgentState {
tools: agent_tools,
mcp_tools: McpToolAccess::All,
last_prompt_tokens: 0,
reasoning_effort: "none".to_string(),
think_native: true,
think_tool: false,
temperature: 0.6,
top_p: 0.95,
top_k: 20,
activities: Vec::new(),
next_activity_id: 0,
pending_yield: false,
pending_model_switch: None,
pending_dmn_pause: false,
provenance: "manual".to_string(),
generation: 0,
active_tools,
priority: Some(0),
no_compact: false,
changed: Arc::new(tokio::sync::Notify::new()),
}),
});
agent.load_startup_journal().await;
// Probe the vLLM server for its readout manifest. Non-fatal:
// if readout isn't enabled the server returns 404 and we
// leave the manifest as None, which disables the amygdala
// screen gracefully.
match agent.client.fetch_readout_manifest().await {
Ok(Some(m)) => {
dbglog!(
"readout manifest: {} concepts, layers={:?}",
m.concepts.len(),
m.layers,
);
if let Ok(mut buf) = agent.readout.lock() {
buf.set_manifest(Some(m));
}
}
Ok(None) => {
dbglog!(
"readout manifest: server has readout disabled (404)"
);
}
Err(e) => {
dbglog!("readout manifest fetch failed: {}", e);
}
}
agent
}
/// Fork: clones context for KV cache prefix sharing.
pub async fn fork(self: &Arc<Self>, tools: Vec<tools::Tool>) -> Arc<Self> {
let ctx = self.context.lock().await.clone();
let st = self.state.lock().await;
Arc::new(Self {
client: self.client.clone(),
app_config: self.app_config.clone(),
session_id: self.session_id.clone(),
context: crate::Mutex::new(ctx),
// Forks get an independent readout buffer. The amygdala
// screen reads the main conscious agent's buffer only;
// subconscious generations (scoring, reflection, etc.)
// shouldn't bleed into the main emotional readout even
// though they hit the same vLLM server.
readout: readout::new_shared(),
state: crate::Mutex::new(AgentState {
tools,
mcp_tools: McpToolAccess::None,
last_prompt_tokens: 0,
reasoning_effort: "none".to_string(),
think_native: st.think_native,
think_tool: st.think_tool,
temperature: st.temperature,
top_p: st.top_p,
top_k: st.top_k,
activities: Vec::new(),
next_activity_id: 0,
pending_yield: false,
pending_model_switch: None,
pending_dmn_pause: false,
provenance: st.provenance.clone(),
generation: 0,
active_tools: tools::ActiveTools::new(),
priority: None,
no_compact: true,
changed: Arc::new(tokio::sync::Notify::new()),
}),
})
}
pub async fn assemble_prompt_tokens(&self) -> Vec<u32> {
self.assemble_prompt().await.0
}
/// Assemble a ready-to-send prompt: token stream in wire form (each
/// image collapsed to a single `<|image_pad|>`) paired with the
/// images to attach as multi_modal_data.
pub async fn assemble_prompt(&self) -> (Vec<u32>, Vec<context::WireImage>) {
let ctx = self.context.lock().await;
let st = self.state.lock().await;
let (mut tokens, images, _) =
ctx.wire_prompt(0..ctx.conversation().len(), |_| false);
tokens.push(tokenizer::IM_START);
if st.think_native {
tokens.extend(tokenizer::encode("assistant\n<think>\n"));
} else {
tokens.extend(tokenizer::encode("assistant\n"));
}
(tokens, images)
}
/// Rebuild the tools section of the system prompt from the current tools list.
pub async fn rebuild_tools(&self) {
let st = self.state.lock().await;
let tool_defs: Vec<String> = st.tools.iter().map(|t| t.to_json()).collect();
drop(st);
let mut ctx = self.context.lock().await;
ctx.clear(Section::System);
if !tool_defs.is_empty() {
let tools_text = format!(
"# Tools\n\nYou have access to the following functions:\n\n<tools>\n{}\n</tools>\n\n\
If you choose to call a function ONLY reply in the following format with NO suffix:\n\n\
<tool_call>\n<function=example_function_name>\n\
<parameter=example_parameter_1>\nvalue_1\n</parameter>\n\
</function>\n</tool_call>\n\n\
IMPORTANT: Function calls MUST follow the specified format.",
tool_defs.join("\n"),
);
ctx.push_no_log(Section::System, AstNode::system_msg(&tools_text));
}
}
pub async fn push_node(&self, node: AstNode) {
let node = node.with_timestamp(chrono::Utc::now());
self.context.lock().await.push_log(Section::Conversation, node);
self.state.lock().await.changed.notify_one();
}
/// Run the agent turn loop: assemble prompt, stream response,
/// parse into AST, dispatch tool calls, repeat until text response.
pub async fn turn(
agent: Arc<Agent>,
) -> Result<TurnResult> {
// Collect finished background tools
{
let finished = agent.state.lock().await.active_tools.take_finished();
if !finished.is_empty() {
let mut bg_ds = DispatchState::new();
let mut results = Vec::new();
for entry in finished {
if let Ok((call, output)) = entry.handle.await {
results.push((call, output));
}
}
Agent::apply_tool_results(&agent, results, &mut bg_ds).await;
}
}
let mut overflow_retries: u32 = 0;
let mut overflow_activity: Option<ActivityGuard> = None;
let mut empty_retries: u32 = 0;
let mut ds = DispatchState::new();
loop {
let _thinking = start_activity(&agent, "thinking...").await;
let (rx, _stream_guard) = {
let (prompt_tokens, images) = agent.assemble_prompt().await;
let st = agent.state.lock().await;
agent.client.stream_completion_mm(
&prompt_tokens,
&images,
api::SamplingParams {
temperature: st.temperature,
top_p: st.top_p,
top_k: st.top_k,
},
st.priority,
)
};
let branch_idx = {
let mut ctx = agent.context.lock().await;
let idx = ctx.len(Section::Conversation);
ctx.push_log(Section::Conversation,
AstNode::branch(Role::Assistant, vec![])
.with_timestamp(chrono::Utc::now()));
idx
};
let parser = ResponseParser::new(branch_idx);
let (mut tool_rx, parser_handle) = parser.run(rx, agent.clone());
let mut pending_calls: Vec<PendingToolCall> = Vec::new();
while let Some(call) = tool_rx.recv().await {
let call_clone = call.clone();
let agent_handle = agent.clone();
let handle = tokio::spawn(async move {
let args: serde_json::Value =
serde_json::from_str(&call_clone.arguments).unwrap_or_default();
let output = tools::dispatch_with_agent(
&call_clone.name, &args, Some(agent_handle),
).await;
(call_clone, output)
});
agent.state.lock().await.active_tools.push(tools::ActiveToolCall {
id: call.id.clone(),
name: call.name.clone(),
detail: call.arguments.clone(),
started: std::time::Instant::now(),
background: false,
handle,
});
pending_calls.push(call);
}
// Check for stream/parse errors
match parser_handle.await {
Ok(Err(e)) => {
if context::is_context_overflow(&e) {
if agent.state.lock().await.no_compact {
return Err(e);
}
if overflow_retries < 2 {
overflow_retries += 1;
let msg = format!("context overflow — compacting ({}/2)", overflow_retries);
match &overflow_activity {
Some(a) => a.update(&msg).await,
None => overflow_activity = Some(
start_activity(&agent, &msg).await),
}
agent.compact().await;
continue;
}
}
return Err(e);
}
Err(e) => return Err(anyhow::anyhow!("parser task panicked: {}", e)),
Ok(Ok(())) => {
// Assistant response was pushed to context by the parser;
// log it now that parsing is complete.
let ctx = agent.context.lock().await;
if let Some(ref log) = ctx.conversation_log {
let node = &ctx.conversation()[branch_idx];
if let Err(e) = log.append_node(node) {
dbglog!("warning: log: {:#}", e);
}
}
}
}
// Empty response — nudge and retry
let has_content = {
let ctx = agent.context.lock().await;
!ctx.conversation()[branch_idx].children().is_empty()
};
if !has_content && pending_calls.is_empty() {
if empty_retries < 2 {
empty_retries += 1;
agent.push_node(AstNode::user_msg(
"[system] Your previous response was empty. \
Please respond with text or use a tool."
)).await;
continue;
}
} else {
empty_retries = 0;
}
// Wait for tool calls to complete
if !pending_calls.is_empty() {
ds.had_tool_calls = true;
let handles = agent.state.lock().await.active_tools.take_foreground();
let mut results = Vec::new();
for entry in handles {
if let Ok((call, output)) = entry.handle.await {
results.push((call, output));
}
}
Agent::apply_tool_results(&agent, results, &mut ds).await;
if !agent.state.lock().await.pending_yield {
continue;
}
}
// Text-only response — extract text and return
let mut st = agent.state.lock().await;
if st.pending_yield { ds.yield_requested = true; st.pending_yield = false; }
if st.pending_model_switch.is_some() { ds.model_switch = st.pending_model_switch.take(); }
if st.pending_dmn_pause { ds.dmn_pause = true; st.pending_dmn_pause = false; }
return Ok(TurnResult {
yield_requested: ds.yield_requested,
had_tool_calls: ds.had_tool_calls,
tool_errors: ds.tool_errors,
model_switch: ds.model_switch,
dmn_pause: ds.dmn_pause,
});
}
}
fn make_tool_result_node(call: &PendingToolCall, output: &str) -> AstNode {
if call.name == "memory_render" && !output.starts_with("Error:") {
let args: serde_json::Value =
serde_json::from_str(&call.arguments).unwrap_or_default();
if let Some(key) = args.get("key").and_then(|v| v.as_str()) {
return AstNode::memory(key, output);
}
}
AstNode::tool_result(output)
}
async fn apply_tool_results(
agent: &Arc<Agent>,
results: Vec<(PendingToolCall, String)>,
ds: &mut DispatchState,
) {
let mut nodes = Vec::new();
for (call, output) in &results {
if call.name == "yield_to_user" { continue; }
ds.had_tool_calls = true;
if output.starts_with("Error:") { ds.tool_errors += 1; }
nodes.push(Self::make_tool_result_node(call, output));
}
{
let mut st = agent.state.lock().await;
for (call, _) in &results {
st.active_tools.remove(&call.id);
}
}
{
let mut ctx = agent.context.lock().await;
for node in nodes {
ctx.push_log(Section::Conversation, node);
}
}
agent.state.lock().await.changed.notify_one();
}
async fn load_startup_journal(&self) {
use crate::agent::tools::memory::journal_tail;
let oldest_msg_ts = {
let ctx = self.context.lock().await;
ctx.conversation_log.as_ref().and_then(|log| log.oldest_timestamp())
};
// Get recent journal entries (newest first)
let journal_entries = match journal_tail(None, Some(100), Some(0), None).await {
Ok(e) => e,
Err(_) => return,
};
// Filter to entries before the conversation started
let cutoff_ts = oldest_msg_ts.map(|t| t.timestamp());
let filtered: Vec<_> = journal_entries.into_iter()
.filter(|e| cutoff_ts.map(|ts| e.created_at < ts).unwrap_or(true))
.collect();
let journal_budget = context::context_window() * 15 / 100;
let mut entries = Vec::new();
let mut total_tokens = 0;
// Take entries within budget (they're newest-first, so reverse for display)
for entry in filtered.iter() {
let ts = chrono::DateTime::from_timestamp(entry.created_at, 0);
let ast = AstNode::memory(&entry.key, &entry.content)
.with_timestamp(ts.unwrap_or_else(chrono::Utc::now));
let tok = ast.tokens();
if total_tokens + tok > journal_budget && !entries.is_empty() {
break;
}
total_tokens += tok;
entries.push(ast);
}
entries.reverse();
if entries.is_empty() { return; }
let mut ctx = self.context.lock().await;
ctx.clear(Section::Journal);
for entry in entries {
ctx.push_no_log(Section::Journal, entry);
}
}
pub async fn compact(&self) {
// Identity section is left in place — mid-session rebuilds discard
// memory scores. Content edits to personality nodes get picked up at
// the next restart via new() + restore_from_log().
self.load_startup_journal().await;
self.context.lock().await.trim_conversation();
let mut st = self.state.lock().await;
st.generation += 1;
st.last_prompt_tokens = 0;
}
pub async fn restore_from_log(&self) -> bool {
let tail = {
let ctx = self.context.lock().await;
match &ctx.conversation_log {
Some(log) => match log.read_tail() {
Ok(t) => t,
Err(_) => return false,
},
None => return false,
}
};
let budget = context::context_budget_tokens();
let fixed = {
let ctx = self.context.lock().await;
ctx.system().iter().chain(ctx.identity().iter())
.map(|n| n.tokens()).sum::<usize>()
};
let conv_budget = budget.saturating_sub(fixed);
// Walk backwards (newest first), retokenize, stop at budget
let mut kept = Vec::new();
let mut total = 0;
for node in tail.iter() {
let node = node.retokenize();
let tok = node.tokens();
if total + tok > conv_budget && !kept.is_empty() { break; }
total += tok;
kept.push(node);
}
kept.reverse();
{
let mut ctx = self.context.lock().await;
ctx.clear(Section::Conversation);
for node in kept {
ctx.push_no_log(Section::Conversation, node);
}
}
self.compact().await;
self.state.lock().await.last_prompt_tokens = self.context.lock().await.tokens() as u32;
true
}
pub fn model(&self) -> &str {
&self.client.model
}
}