consciousness/src/agent/mod.rs
ProofOfConcept 3e0d52c451 Redirect noisy warnings to debug log to stop TUI corruption
Duplicate key warnings fire on every store load and were writing to
stderr, corrupting the TUI display. Log write warnings and MCP
server failures are similarly routine. Route these to dbglog.

Serious errors (rkyv snapshot failures, store corruption) remain on
stderr — those are real problems the user needs to see.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-09 22:46:48 -04:00

641 lines
23 KiB
Rust

// agent.rs — Core agent loop
//
// The simplest possible implementation of the agent pattern:
// send messages + tool definitions to the model, if it responds
// with tool calls then dispatch them and loop, if it responds
// with text then display it and wait for the next prompt.
//
// Uses streaming by default so text tokens appear as they're
// generated. Tool calls are accumulated from stream deltas and
// dispatched after the stream completes.
//
// The DMN (dmn.rs) is the outer loop that decides what prompts
// to send here. This module just handles single turns: prompt
// in, response out, tool calls dispatched.
pub mod api;
pub mod context;
pub mod oneshot;
pub mod tokenizer;
pub mod tools;
use std::sync::Arc;
use anyhow::Result;
use api::ApiClient;
use context::{AstNode, NodeBody, ContextState, Section, Ast, PendingToolCall, ResponseParser, Role};
use crate::mind::log::ConversationLog;
// --- Activity tracking (RAII guards) ---
pub struct ActivityEntry {
pub id: u64,
pub label: String,
pub started: std::time::Instant,
/// Auto-expires this long after creation (or completion).
pub expires_at: std::time::Instant,
}
pub struct ActivityGuard {
agent: Arc<Agent>,
id: u64,
}
impl ActivityGuard {
pub async fn update(&self, label: impl Into<String>) {
let label = label.into();
let mut st = self.agent.state.lock().await;
if let Some(entry) = st.activities.iter_mut().find(|a| a.id == self.id) {
entry.label = label;
}
st.changed.notify_one();
}
}
const ACTIVITY_LINGER: std::time::Duration = std::time::Duration::from_secs(5);
impl Drop for ActivityGuard {
fn drop(&mut self) {
if let Ok(mut st) = self.agent.state.try_lock() {
if let Some(entry) = st.activities.iter_mut().find(|a| a.id == self.id) {
entry.label.push_str(" (complete)");
entry.expires_at = std::time::Instant::now() + ACTIVITY_LINGER;
}
}
}
}
impl AgentState {
pub fn push_activity(&mut self, label: impl Into<String>) -> u64 {
self.expire_activities();
let id = self.next_activity_id;
self.next_activity_id += 1;
self.activities.push(ActivityEntry {
id, label: label.into(),
started: std::time::Instant::now(),
expires_at: std::time::Instant::now() + std::time::Duration::from_secs(3600),
});
self.changed.notify_one();
id
}
pub fn notify(&mut self, label: impl Into<String>) {
self.expire_activities();
let id = self.next_activity_id;
self.next_activity_id += 1;
self.activities.push(ActivityEntry {
id, label: label.into(),
started: std::time::Instant::now(),
expires_at: std::time::Instant::now() + ACTIVITY_LINGER,
});
self.changed.notify_one();
}
pub fn expire_activities(&mut self) {
let now = std::time::Instant::now();
self.activities.retain(|a| a.expires_at > now);
}
}
pub async fn start_activity(agent: &Arc<Agent>, label: impl Into<String>) -> ActivityGuard {
let id = agent.state.lock().await.push_activity(label);
ActivityGuard { agent: agent.clone(), id }
}
/// Result of a single agent turn.
pub struct TurnResult {
/// The text response (already sent through UI channel).
#[allow(dead_code)]
pub text: String,
/// Whether the model called yield_to_user during this turn.
pub yield_requested: bool,
/// Whether any tools (other than yield_to_user) were called.
pub had_tool_calls: bool,
/// Number of tool calls that returned errors this turn.
pub tool_errors: u32,
/// Model name to switch to after this turn completes.
pub model_switch: Option<String>,
/// Agent requested DMN pause (full stop on autonomous behavior).
pub dmn_pause: bool,
}
/// Accumulated state across tool dispatches within a single turn.
struct DispatchState {
yield_requested: bool,
had_tool_calls: bool,
tool_errors: u32,
model_switch: Option<String>,
dmn_pause: bool,
}
impl DispatchState {
fn new() -> Self {
Self {
yield_requested: false, had_tool_calls: false,
tool_errors: 0, model_switch: None, dmn_pause: false,
}
}
}
/// Immutable agent config — shared via Arc, no mutex needed.
pub struct Agent {
pub client: ApiClient,
pub app_config: crate::config::AppConfig,
pub prompt_file: String,
pub session_id: String,
pub context: tokio::sync::Mutex<ContextState>,
pub state: tokio::sync::Mutex<AgentState>,
}
/// Mutable agent state — behind its own mutex.
/// Which external MCP tools an agent can access.
#[derive(Clone)]
pub enum McpToolAccess {
None,
All,
Some(Vec<String>),
}
pub struct AgentState {
pub tools: Vec<tools::Tool>,
pub mcp_tools: McpToolAccess,
pub last_prompt_tokens: u32,
pub reasoning_effort: String,
pub temperature: f32,
pub top_p: f32,
pub top_k: u32,
pub activities: Vec<ActivityEntry>,
next_activity_id: u64,
pub pending_yield: bool,
pub pending_model_switch: Option<String>,
pub pending_dmn_pause: bool,
pub provenance: String,
pub generation: u64,
pub memory_scoring_in_flight: bool,
pub active_tools: tools::ActiveTools,
/// vLLM scheduling priority (lower = higher priority).
/// 0 = interactive, 1 = surface agent, 2 = other subconscious, 10 = unconscious.
pub priority: Option<i32>,
/// Forked agents should not compact on overflow — it blows the
/// KV cache prefix and evicts the step prompts.
pub no_compact: bool,
pub changed: Arc<tokio::sync::Notify>,
}
impl Agent {
pub async fn new(
client: ApiClient,
system_prompt: String,
personality: Vec<(String, String)>,
app_config: crate::config::AppConfig,
prompt_file: String,
conversation_log: Option<ConversationLog>,
active_tools: tools::ActiveTools,
) -> Arc<Self> {
let mut context = ContextState::new();
context.conversation_log = conversation_log;
context.push_no_log(Section::System, AstNode::system_msg(&system_prompt));
let tool_defs = tools::all_tool_definitions().await;
if !tool_defs.is_empty() {
let tools_text = format!(
"# Tools\n\nYou have access to the following functions:\n\n<tools>\n{}\n</tools>\n\n\
If you choose to call a function ONLY reply in the following format with NO suffix:\n\n\
<tool_call>\n<function=example_function_name>\n\
<parameter=example_parameter_1>\nvalue_1\n</parameter>\n\
</function>\n</tool_call>\n\n\
IMPORTANT: Function calls MUST follow the specified format.",
tool_defs.join("\n"),
);
context.push_no_log(Section::System, AstNode::system_msg(&tools_text));
}
for (name, content) in &personality {
context.push_no_log(Section::Identity, AstNode::memory(name, content));
}
let session_id = format!("consciousness-{}", chrono::Utc::now().format("%Y%m%d-%H%M%S"));
let agent = Arc::new(Self {
client,
app_config,
prompt_file,
session_id,
context: tokio::sync::Mutex::new(context),
state: tokio::sync::Mutex::new(AgentState {
tools: tools::tools(),
mcp_tools: McpToolAccess::All,
last_prompt_tokens: 0,
reasoning_effort: "none".to_string(),
temperature: 0.6,
top_p: 0.95,
top_k: 20,
activities: Vec::new(),
next_activity_id: 0,
pending_yield: false,
pending_model_switch: None,
pending_dmn_pause: false,
provenance: "manual".to_string(),
generation: 0,
memory_scoring_in_flight: false,
active_tools,
priority: Some(0),
no_compact: false,
changed: Arc::new(tokio::sync::Notify::new()),
}),
});
agent.load_startup_journal().await;
agent
}
/// Fork: clones context for KV cache prefix sharing.
pub async fn fork(self: &Arc<Self>, tools: Vec<tools::Tool>) -> Arc<Self> {
let ctx = self.context.lock().await.clone();
let st = self.state.lock().await;
Arc::new(Self {
client: self.client.clone(),
app_config: self.app_config.clone(),
prompt_file: self.prompt_file.clone(),
session_id: self.session_id.clone(),
context: tokio::sync::Mutex::new(ctx),
state: tokio::sync::Mutex::new(AgentState {
tools,
mcp_tools: McpToolAccess::None,
last_prompt_tokens: 0,
reasoning_effort: "none".to_string(),
temperature: st.temperature,
top_p: st.top_p,
top_k: st.top_k,
activities: Vec::new(),
next_activity_id: 0,
pending_yield: false,
pending_model_switch: None,
pending_dmn_pause: false,
provenance: st.provenance.clone(),
generation: 0,
memory_scoring_in_flight: false,
active_tools: tools::ActiveTools::new(),
priority: None,
no_compact: true,
changed: Arc::new(tokio::sync::Notify::new()),
}),
})
}
pub async fn assemble_prompt_tokens(&self) -> Vec<u32> {
let ctx = self.context.lock().await;
let mut tokens = ctx.token_ids();
tokens.push(tokenizer::IM_START);
tokens.extend(tokenizer::encode("assistant\n"));
tokens
}
pub async fn push_node(&self, node: AstNode) {
let node = node.with_timestamp(chrono::Utc::now());
self.context.lock().await.push_log(Section::Conversation, node);
self.state.lock().await.changed.notify_one();
}
/// Run the agent turn loop: assemble prompt, stream response,
/// parse into AST, dispatch tool calls, repeat until text response.
pub async fn turn(
agent: Arc<Agent>,
) -> Result<TurnResult> {
// Collect finished background tools
{
let finished = agent.state.lock().await.active_tools.take_finished();
if !finished.is_empty() {
let mut bg_ds = DispatchState::new();
let mut results = Vec::new();
for entry in finished {
if let Ok((call, output)) = entry.handle.await {
results.push((call, output));
}
}
Agent::apply_tool_results(&agent, results, &mut bg_ds).await;
}
}
let mut overflow_retries: u32 = 0;
let mut overflow_activity: Option<ActivityGuard> = None;
let mut empty_retries: u32 = 0;
let mut ds = DispatchState::new();
loop {
let _thinking = start_activity(&agent, "thinking...").await;
let (rx, _stream_guard) = {
let prompt_tokens = agent.assemble_prompt_tokens().await;
let st = agent.state.lock().await;
agent.client.stream_completion(
&prompt_tokens,
api::SamplingParams {
temperature: st.temperature,
top_p: st.top_p,
top_k: st.top_k,
},
st.priority,
)
};
let branch_idx = {
let mut ctx = agent.context.lock().await;
let idx = ctx.len(Section::Conversation);
ctx.push_log(Section::Conversation,
AstNode::branch(Role::Assistant, vec![])
.with_timestamp(chrono::Utc::now()));
idx
};
let parser = ResponseParser::new(branch_idx);
let (mut tool_rx, parser_handle) = parser.run(rx, agent.clone());
let mut pending_calls: Vec<PendingToolCall> = Vec::new();
while let Some(call) = tool_rx.recv().await {
let call_clone = call.clone();
let agent_handle = agent.clone();
let handle = tokio::spawn(async move {
let args: serde_json::Value =
serde_json::from_str(&call_clone.arguments).unwrap_or_default();
let output = tools::dispatch_with_agent(
&call_clone.name, &args, Some(agent_handle),
).await;
(call_clone, output)
});
agent.state.lock().await.active_tools.push(tools::ActiveToolCall {
id: call.id.clone(),
name: call.name.clone(),
detail: call.arguments.clone(),
started: std::time::Instant::now(),
background: false,
handle,
});
pending_calls.push(call);
}
// Check for stream/parse errors
match parser_handle.await {
Ok(Err(e)) => {
if context::is_context_overflow(&e) {
if agent.state.lock().await.no_compact {
return Err(e);
}
if overflow_retries < 2 {
overflow_retries += 1;
let msg = format!("context overflow — compacting ({}/2)", overflow_retries);
match &overflow_activity {
Some(a) => a.update(&msg).await,
None => overflow_activity = Some(
start_activity(&agent, &msg).await),
}
agent.compact().await;
continue;
}
}
return Err(e);
}
Err(e) => return Err(anyhow::anyhow!("parser task panicked: {}", e)),
Ok(Ok(())) => {
// Assistant response was pushed to context by the parser;
// log it now that parsing is complete.
let ctx = agent.context.lock().await;
if let Some(ref log) = ctx.conversation_log {
let node = &ctx.conversation()[branch_idx];
if let Err(e) = log.append_node(node) {
dbglog!("warning: log: {:#}", e);
}
}
}
}
// Empty response — nudge and retry
let has_content = {
let ctx = agent.context.lock().await;
!ctx.conversation()[branch_idx].children().is_empty()
};
if !has_content && pending_calls.is_empty() {
if empty_retries < 2 {
empty_retries += 1;
agent.push_node(AstNode::user_msg(
"[system] Your previous response was empty. \
Please respond with text or use a tool."
)).await;
continue;
}
} else {
empty_retries = 0;
}
// Wait for tool calls to complete
if !pending_calls.is_empty() {
ds.had_tool_calls = true;
let handles = agent.state.lock().await.active_tools.take_foreground();
let mut results = Vec::new();
for entry in handles {
if let Ok((call, output)) = entry.handle.await {
results.push((call, output));
}
}
Agent::apply_tool_results(&agent, results, &mut ds).await;
if !agent.state.lock().await.pending_yield {
continue;
}
}
// Text-only response — extract text and return
let text = {
let ctx = agent.context.lock().await;
let children = ctx.conversation()[branch_idx].children();
children.iter()
.filter_map(|c| c.leaf())
.filter(|l| matches!(l.body(), NodeBody::Content(_)))
.map(|l| l.body().text())
.collect::<Vec<_>>()
.join("")
};
let mut st = agent.state.lock().await;
if st.pending_yield { ds.yield_requested = true; st.pending_yield = false; }
if st.pending_model_switch.is_some() { ds.model_switch = st.pending_model_switch.take(); }
if st.pending_dmn_pause { ds.dmn_pause = true; st.pending_dmn_pause = false; }
return Ok(TurnResult {
text,
yield_requested: ds.yield_requested,
had_tool_calls: ds.had_tool_calls,
tool_errors: ds.tool_errors,
model_switch: ds.model_switch,
dmn_pause: ds.dmn_pause,
});
}
}
fn make_tool_result_node(call: &PendingToolCall, output: &str) -> AstNode {
if call.name == "memory_render" && !output.starts_with("Error:") {
let args: serde_json::Value =
serde_json::from_str(&call.arguments).unwrap_or_default();
if let Some(key) = args.get("key").and_then(|v| v.as_str()) {
return AstNode::memory(key, output);
}
}
AstNode::tool_result(output)
}
async fn apply_tool_results(
agent: &Arc<Agent>,
results: Vec<(PendingToolCall, String)>,
ds: &mut DispatchState,
) {
let mut nodes = Vec::new();
for (call, output) in &results {
if call.name == "yield_to_user" { continue; }
ds.had_tool_calls = true;
if output.starts_with("Error:") { ds.tool_errors += 1; }
nodes.push(Self::make_tool_result_node(call, output));
}
{
let mut st = agent.state.lock().await;
for (call, _) in &results {
st.active_tools.remove(&call.id);
}
}
{
let mut ctx = agent.context.lock().await;
for node in nodes {
ctx.push_log(Section::Conversation, node);
}
}
agent.state.lock().await.changed.notify_one();
}
async fn load_startup_journal(&self) {
let oldest_msg_ts = {
let ctx = self.context.lock().await;
ctx.conversation_log.as_ref().and_then(|log| log.oldest_timestamp())
};
let store = match crate::store::Store::load() {
Ok(s) => s,
Err(_) => return,
};
let mut journal_nodes: Vec<_> = store.nodes.values()
.filter(|n| n.node_type == crate::store::NodeType::EpisodicSession)
.collect();
journal_nodes.sort_by_key(|n| n.created_at);
let cutoff_idx = if let Some(cutoff) = oldest_msg_ts {
let cutoff_ts = cutoff.timestamp();
let mut idx = journal_nodes.len();
for (i, node) in journal_nodes.iter().enumerate() {
if node.created_at >= cutoff_ts {
idx = i + 1;
break;
}
}
idx
} else {
journal_nodes.len()
};
let journal_budget = context::context_window() * 15 / 100;
let mut entries = Vec::new();
let mut total_tokens = 0;
for node in journal_nodes[..cutoff_idx].iter().rev() {
let ts = chrono::DateTime::from_timestamp(node.created_at, 0);
let ast = AstNode::memory(&node.key, &node.content)
.with_timestamp(ts.unwrap_or_else(chrono::Utc::now));
let tok = ast.tokens();
if total_tokens + tok > journal_budget && !entries.is_empty() {
break;
}
total_tokens += tok;
entries.push(ast);
}
entries.reverse();
if entries.is_empty() { return; }
let mut ctx = self.context.lock().await;
ctx.clear(Section::Journal);
for entry in entries {
ctx.push_no_log(Section::Journal, entry);
}
}
pub async fn compact(&self) {
match crate::config::reload_for_model(&self.app_config, &self.prompt_file) {
Ok((_system_prompt, personality)) => {
let mut ctx = self.context.lock().await;
// System section (prompt + tools) set by new(), don't touch it
ctx.clear(Section::Identity);
for (name, content) in &personality {
ctx.push_no_log(Section::Identity, AstNode::memory(name, content));
}
}
Err(e) => {
dbglog!("warning: failed to reload identity: {:#}", e);
}
}
self.load_startup_journal().await;
self.context.lock().await.trim_conversation();
let mut st = self.state.lock().await;
st.generation += 1;
st.last_prompt_tokens = 0;
}
pub async fn restore_from_log(&self) -> bool {
let tail = {
let ctx = self.context.lock().await;
match &ctx.conversation_log {
Some(log) => match log.read_tail() {
Ok(t) => t,
Err(_) => return false,
},
None => return false,
}
};
let budget = context::context_budget_tokens();
let fixed = {
let ctx = self.context.lock().await;
ctx.system().iter().chain(ctx.identity().iter())
.map(|n| n.tokens()).sum::<usize>()
};
let conv_budget = budget.saturating_sub(fixed);
// Walk backwards (newest first), retokenize, stop at budget
let mut kept = Vec::new();
let mut total = 0;
for node in tail.iter() {
let node = node.retokenize();
let tok = node.tokens();
if total + tok > conv_budget && !kept.is_empty() { break; }
total += tok;
kept.push(node);
}
kept.reverse();
{
let mut ctx = self.context.lock().await;
ctx.clear(Section::Conversation);
for node in kept {
ctx.push_no_log(Section::Conversation, node);
}
}
self.compact().await;
self.state.lock().await.last_prompt_tokens = self.context.lock().await.tokens() as u32;
true
}
pub fn model(&self) -> &str {
&self.client.model
}
}