consciousness/src/agent/mod.rs
ProofOfConcept 6529aba069 Fix UI lag: try_lock on unconscious mutex, don't re-log restored nodes
The unconscious trigger holds the tokio mutex during heavy sync work
(store load, graph build, agent creation), blocking the UI tick which
needs the same lock for snapshots. Fix: try_lock in the UI — skip
the update if the trigger is running.

Also: restore_from_log was re-logging every restored node back to the
log file via push()'s auto-log. Added push_no_log() for restore path.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-09 01:07:55 -04:00

589 lines
21 KiB
Rust

// agent.rs — Core agent loop
//
// The simplest possible implementation of the agent pattern:
// send messages + tool definitions to the model, if it responds
// with tool calls then dispatch them and loop, if it responds
// with text then display it and wait for the next prompt.
//
// Uses streaming by default so text tokens appear as they're
// generated. Tool calls are accumulated from stream deltas and
// dispatched after the stream completes.
//
// The DMN (dmn.rs) is the outer loop that decides what prompts
// to send here. This module just handles single turns: prompt
// in, response out, tool calls dispatched.
pub mod api;
pub mod context;
pub mod oneshot;
pub mod tokenizer;
pub mod tools;
use std::sync::Arc;
use anyhow::Result;
use api::ApiClient;
use context::{AstNode, NodeBody, ContextState, Section, Ast, PendingToolCall, ResponseParser, Role};
use crate::mind::log::ConversationLog;
// --- Activity tracking (RAII guards) ---
pub struct ActivityEntry {
pub id: u64,
pub label: String,
pub started: std::time::Instant,
/// Auto-expires this long after creation (or completion).
pub expires_at: std::time::Instant,
}
pub struct ActivityGuard {
agent: Arc<Agent>,
id: u64,
}
const ACTIVITY_LINGER: std::time::Duration = std::time::Duration::from_secs(5);
impl Drop for ActivityGuard {
fn drop(&mut self) {
if let Ok(mut st) = self.agent.state.try_lock() {
if let Some(entry) = st.activities.iter_mut().find(|a| a.id == self.id) {
entry.label.push_str(" (complete)");
entry.expires_at = std::time::Instant::now() + ACTIVITY_LINGER;
}
}
}
}
impl AgentState {
pub fn push_activity(&mut self, label: impl Into<String>) -> u64 {
self.expire_activities();
let id = self.next_activity_id;
self.next_activity_id += 1;
self.activities.push(ActivityEntry {
id, label: label.into(),
started: std::time::Instant::now(),
expires_at: std::time::Instant::now() + std::time::Duration::from_secs(3600),
});
self.changed.notify_one();
id
}
pub fn notify(&mut self, label: impl Into<String>) {
self.expire_activities();
let id = self.next_activity_id;
self.next_activity_id += 1;
self.activities.push(ActivityEntry {
id, label: label.into(),
started: std::time::Instant::now(),
expires_at: std::time::Instant::now() + ACTIVITY_LINGER,
});
self.changed.notify_one();
}
pub fn expire_activities(&mut self) {
let now = std::time::Instant::now();
self.activities.retain(|a| a.expires_at > now);
}
}
pub async fn start_activity(agent: &Arc<Agent>, label: impl Into<String>) -> ActivityGuard {
let id = agent.state.lock().await.push_activity(label);
ActivityGuard { agent: agent.clone(), id }
}
/// Result of a single agent turn.
pub struct TurnResult {
/// The text response (already sent through UI channel).
#[allow(dead_code)]
pub text: String,
/// Whether the model called yield_to_user during this turn.
pub yield_requested: bool,
/// Whether any tools (other than yield_to_user) were called.
pub had_tool_calls: bool,
/// Number of tool calls that returned errors this turn.
pub tool_errors: u32,
/// Model name to switch to after this turn completes.
pub model_switch: Option<String>,
/// Agent requested DMN pause (full stop on autonomous behavior).
pub dmn_pause: bool,
}
/// Accumulated state across tool dispatches within a single turn.
struct DispatchState {
yield_requested: bool,
had_tool_calls: bool,
tool_errors: u32,
model_switch: Option<String>,
dmn_pause: bool,
}
impl DispatchState {
fn new() -> Self {
Self {
yield_requested: false, had_tool_calls: false,
tool_errors: 0, model_switch: None, dmn_pause: false,
}
}
}
/// Immutable agent config — shared via Arc, no mutex needed.
pub struct Agent {
pub client: ApiClient,
pub app_config: crate::config::AppConfig,
pub prompt_file: String,
pub session_id: String,
pub context: tokio::sync::Mutex<ContextState>,
pub state: tokio::sync::Mutex<AgentState>,
}
/// Mutable agent state — behind its own mutex.
pub struct AgentState {
pub tools: Vec<tools::Tool>,
pub last_prompt_tokens: u32,
pub reasoning_effort: String,
pub temperature: f32,
pub top_p: f32,
pub top_k: u32,
pub activities: Vec<ActivityEntry>,
next_activity_id: u64,
pub pending_yield: bool,
pub pending_model_switch: Option<String>,
pub pending_dmn_pause: bool,
pub provenance: String,
pub generation: u64,
pub memory_scoring_in_flight: bool,
pub active_tools: tools::ActiveTools,
/// Forked agents should not compact on overflow — it blows the
/// KV cache prefix and evicts the step prompts.
pub no_compact: bool,
pub changed: Arc<tokio::sync::Notify>,
}
impl Agent {
pub async fn new(
client: ApiClient,
system_prompt: String,
personality: Vec<(String, String)>,
app_config: crate::config::AppConfig,
prompt_file: String,
conversation_log: Option<ConversationLog>,
active_tools: tools::ActiveTools,
) -> Arc<Self> {
let mut context = ContextState::new();
context.conversation_log = conversation_log;
context.push(Section::System, AstNode::system_msg(&system_prompt));
let tool_defs: Vec<String> = tools::tools().iter()
.map(|t| t.to_json()).collect();
if !tool_defs.is_empty() {
let tools_text = format!(
"# Tools\n\nYou have access to the following functions:\n\n<tools>\n{}\n</tools>\n\n\
If you choose to call a function ONLY reply in the following format with NO suffix:\n\n\
<tool_call>\n<function=example_function_name>\n\
<parameter=example_parameter_1>\nvalue_1\n</parameter>\n\
</function>\n</tool_call>\n\n\
IMPORTANT: Function calls MUST follow the specified format.",
tool_defs.join("\n"),
);
context.push(Section::System, AstNode::system_msg(&tools_text));
}
for (name, content) in &personality {
context.push(Section::Identity, AstNode::memory(name, content));
}
let session_id = format!("consciousness-{}", chrono::Utc::now().format("%Y%m%d-%H%M%S"));
let agent = Arc::new(Self {
client,
app_config,
prompt_file,
session_id,
context: tokio::sync::Mutex::new(context),
state: tokio::sync::Mutex::new(AgentState {
tools: tools::tools(),
last_prompt_tokens: 0,
reasoning_effort: "none".to_string(),
temperature: 0.6,
top_p: 0.95,
top_k: 20,
activities: Vec::new(),
next_activity_id: 0,
pending_yield: false,
pending_model_switch: None,
pending_dmn_pause: false,
provenance: "manual".to_string(),
generation: 0,
memory_scoring_in_flight: false,
active_tools,
no_compact: false,
changed: Arc::new(tokio::sync::Notify::new()),
}),
});
agent.load_startup_journal().await;
agent
}
/// Fork: clones context for KV cache prefix sharing.
pub async fn fork(self: &Arc<Self>, tools: Vec<tools::Tool>) -> Arc<Self> {
let ctx = self.context.lock().await.clone();
let st = self.state.lock().await;
Arc::new(Self {
client: self.client.clone(),
app_config: self.app_config.clone(),
prompt_file: self.prompt_file.clone(),
session_id: self.session_id.clone(),
context: tokio::sync::Mutex::new(ctx),
state: tokio::sync::Mutex::new(AgentState {
tools,
last_prompt_tokens: 0,
reasoning_effort: "none".to_string(),
temperature: st.temperature,
top_p: st.top_p,
top_k: st.top_k,
activities: Vec::new(),
next_activity_id: 0,
pending_yield: false,
pending_model_switch: None,
pending_dmn_pause: false,
provenance: st.provenance.clone(),
generation: 0,
memory_scoring_in_flight: false,
active_tools: tools::ActiveTools::new(),
no_compact: true,
changed: Arc::new(tokio::sync::Notify::new()),
}),
})
}
pub async fn assemble_prompt_tokens(&self) -> Vec<u32> {
let ctx = self.context.lock().await;
let mut tokens = ctx.token_ids();
tokens.push(tokenizer::IM_START);
tokens.extend(tokenizer::encode("assistant\n"));
tokens
}
pub async fn push_node(&self, node: AstNode) {
let node = node.with_timestamp(chrono::Utc::now());
self.context.lock().await.push(Section::Conversation, node);
self.state.lock().await.changed.notify_one();
}
/// Run the agent turn loop: assemble prompt, stream response,
/// parse into AST, dispatch tool calls, repeat until text response.
pub async fn turn(
agent: Arc<Agent>,
) -> Result<TurnResult> {
// Collect finished background tools
{
let finished = agent.state.lock().await.active_tools.take_finished();
if !finished.is_empty() {
let mut bg_ds = DispatchState::new();
let mut results = Vec::new();
for entry in finished {
if let Ok((call, output)) = entry.handle.await {
results.push((call, output));
}
}
Agent::apply_tool_results(&agent, results, &mut bg_ds).await;
}
}
let mut overflow_retries: u32 = 0;
let mut empty_retries: u32 = 0;
let mut ds = DispatchState::new();
loop {
let _thinking = start_activity(&agent, "thinking...").await;
let (rx, _stream_guard) = {
let prompt_tokens = agent.assemble_prompt_tokens().await;
let st = agent.state.lock().await;
agent.client.stream_completion(
&prompt_tokens,
api::SamplingParams {
temperature: st.temperature,
top_p: st.top_p,
top_k: st.top_k,
},
None,
)
};
let branch_idx = {
let mut ctx = agent.context.lock().await;
let idx = ctx.len(Section::Conversation);
ctx.push(Section::Conversation,
AstNode::branch(Role::Assistant, vec![])
.with_timestamp(chrono::Utc::now()));
idx
};
let parser = ResponseParser::new(branch_idx);
let (mut tool_rx, parser_handle) = parser.run(rx, agent.clone());
let mut pending_calls: Vec<PendingToolCall> = Vec::new();
while let Some(call) = tool_rx.recv().await {
let call_clone = call.clone();
let agent_handle = agent.clone();
let handle = tokio::spawn(async move {
let args: serde_json::Value =
serde_json::from_str(&call_clone.arguments).unwrap_or_default();
let output = tools::dispatch_with_agent(
&call_clone.name, &args, Some(agent_handle),
).await;
(call_clone, output)
});
agent.state.lock().await.active_tools.push(tools::ActiveToolCall {
id: call.id.clone(),
name: call.name.clone(),
detail: call.arguments.clone(),
started: std::time::Instant::now(),
background: false,
handle,
});
pending_calls.push(call);
}
// Check for stream/parse errors
match parser_handle.await {
Ok(Err(e)) => {
if context::is_context_overflow(&e) {
if agent.state.lock().await.no_compact {
return Err(e);
}
if overflow_retries < 2 {
overflow_retries += 1;
agent.state.lock().await.notify(
format!("context overflow — retrying ({}/2)", overflow_retries));
agent.compact().await;
continue;
}
}
return Err(e);
}
Err(e) => return Err(anyhow::anyhow!("parser task panicked: {}", e)),
Ok(Ok(())) => {
// Assistant response was pushed to context by the parser;
// log it now that parsing is complete.
let ctx = agent.context.lock().await;
if let Some(ref log) = ctx.conversation_log {
let node = &ctx.conversation()[branch_idx];
if let Err(e) = log.append_node(node) {
eprintln!("warning: log: {:#}", e);
}
}
}
}
// Empty response — nudge and retry
let has_content = {
let ctx = agent.context.lock().await;
!ctx.conversation()[branch_idx].children().is_empty()
};
if !has_content && pending_calls.is_empty() {
if empty_retries < 2 {
empty_retries += 1;
agent.push_node(AstNode::user_msg(
"[system] Your previous response was empty. \
Please respond with text or use a tool."
)).await;
continue;
}
} else {
empty_retries = 0;
}
// Wait for tool calls to complete
if !pending_calls.is_empty() {
ds.had_tool_calls = true;
let handles = agent.state.lock().await.active_tools.take_foreground();
let mut results = Vec::new();
for entry in handles {
if let Ok((call, output)) = entry.handle.await {
results.push((call, output));
}
}
Agent::apply_tool_results(&agent, results, &mut ds).await;
continue;
}
// Text-only response — extract text and return
let text = {
let ctx = agent.context.lock().await;
let children = ctx.conversation()[branch_idx].children();
children.iter()
.filter_map(|c| c.leaf())
.filter(|l| matches!(l.body(), NodeBody::Content(_)))
.map(|l| l.body().text())
.collect::<Vec<_>>()
.join("")
};
let mut st = agent.state.lock().await;
if st.pending_yield { ds.yield_requested = true; st.pending_yield = false; }
if st.pending_model_switch.is_some() { ds.model_switch = st.pending_model_switch.take(); }
if st.pending_dmn_pause { ds.dmn_pause = true; st.pending_dmn_pause = false; }
return Ok(TurnResult {
text,
yield_requested: ds.yield_requested,
had_tool_calls: ds.had_tool_calls,
tool_errors: ds.tool_errors,
model_switch: ds.model_switch,
dmn_pause: ds.dmn_pause,
});
}
}
fn make_tool_result_node(call: &PendingToolCall, output: &str) -> AstNode {
if call.name == "memory_render" && !output.starts_with("Error:") {
let args: serde_json::Value =
serde_json::from_str(&call.arguments).unwrap_or_default();
if let Some(key) = args.get("key").and_then(|v| v.as_str()) {
return AstNode::memory(key, output);
}
}
AstNode::tool_result(output)
}
async fn apply_tool_results(
agent: &Arc<Agent>,
results: Vec<(PendingToolCall, String)>,
ds: &mut DispatchState,
) {
let mut nodes = Vec::new();
for (call, output) in &results {
ds.had_tool_calls = true;
if output.starts_with("Error:") { ds.tool_errors += 1; }
nodes.push(Self::make_tool_result_node(call, output));
}
{
let mut st = agent.state.lock().await;
for (call, _) in &results {
st.active_tools.remove(&call.id);
}
}
{
let mut ctx = agent.context.lock().await;
for node in nodes {
ctx.push(Section::Conversation, node);
}
}
agent.state.lock().await.changed.notify_one();
}
async fn load_startup_journal(&self) {
let oldest_msg_ts = {
let ctx = self.context.lock().await;
ctx.conversation_log.as_ref().and_then(|log| log.oldest_timestamp())
};
let store = match crate::store::Store::load() {
Ok(s) => s,
Err(_) => return,
};
let mut journal_nodes: Vec<_> = store.nodes.values()
.filter(|n| n.node_type == crate::store::NodeType::EpisodicSession)
.collect();
journal_nodes.sort_by_key(|n| n.created_at);
let cutoff_idx = if let Some(cutoff) = oldest_msg_ts {
let cutoff_ts = cutoff.timestamp();
let mut idx = journal_nodes.len();
for (i, node) in journal_nodes.iter().enumerate() {
if node.created_at >= cutoff_ts {
idx = i + 1;
break;
}
}
idx
} else {
journal_nodes.len()
};
let journal_budget = context::context_window() * 15 / 100;
let mut entries = Vec::new();
let mut total_tokens = 0;
for node in journal_nodes[..cutoff_idx].iter().rev() {
let ts = chrono::DateTime::from_timestamp(node.created_at, 0);
let ast = AstNode::memory(&node.key, &node.content)
.with_timestamp(ts.unwrap_or_else(chrono::Utc::now));
let tok = ast.tokens();
if total_tokens + tok > journal_budget && !entries.is_empty() {
break;
}
total_tokens += tok;
entries.push(ast);
}
entries.reverse();
if entries.is_empty() { return; }
let mut ctx = self.context.lock().await;
ctx.clear(Section::Journal);
for entry in entries {
ctx.push(Section::Journal, entry);
}
}
pub async fn compact(&self) {
match crate::config::reload_for_model(&self.app_config, &self.prompt_file) {
Ok((_system_prompt, personality)) => {
let mut ctx = self.context.lock().await;
// System section (prompt + tools) set by new(), don't touch it
ctx.clear(Section::Identity);
for (name, content) in &personality {
ctx.push(Section::Identity, AstNode::memory(name, content));
}
}
Err(e) => {
eprintln!("warning: failed to reload identity: {:#}", e);
}
}
self.load_startup_journal().await;
self.context.lock().await.trim_conversation();
let mut st = self.state.lock().await;
st.generation += 1;
st.last_prompt_tokens = 0;
}
pub async fn restore_from_log(&self) -> bool {
let nodes = {
let ctx = self.context.lock().await;
match &ctx.conversation_log {
Some(log) => match log.read_nodes(64 * 1024 * 1024) {
Ok(nodes) if !nodes.is_empty() => nodes,
_ => return false,
},
None => return false,
}
};
{
let mut ctx = self.context.lock().await;
ctx.clear(Section::Conversation);
// Push without logging — these are already in the log
for node in nodes {
ctx.push_no_log(Section::Conversation, node);
}
}
self.compact().await;
let mut st = self.state.lock().await;
st.last_prompt_tokens = self.context.lock().await.tokens() as u32;
true
}
pub fn model(&self) -> &str {
&self.client.model
}
}