consciousness/src/agent/mod.rs

604 lines
21 KiB
Rust
Raw Normal View History

// agent.rs — Core agent loop
//
// The simplest possible implementation of the agent pattern:
// send messages + tool definitions to the model, if it responds
// with tool calls then dispatch them and loop, if it responds
// with text then display it and wait for the next prompt.
//
// Uses streaming by default so text tokens appear as they're
// generated. Tool calls are accumulated from stream deltas and
// dispatched after the stream completes.
//
// The DMN (dmn.rs) is the outer loop that decides what prompts
// to send here. This module just handles single turns: prompt
// in, response out, tool calls dispatched.
pub mod api;
pub mod context;
pub mod oneshot;
pub mod tokenizer;
pub mod tools;
use std::sync::Arc;
use anyhow::Result;
use api::ApiClient;
use context::{AstNode, NodeBody, ContextState, Section, Ast, PendingToolCall, ResponseParser, Role};
2026-04-05 01:48:11 -04:00
use crate::mind::log::ConversationLog;
// --- Activity tracking (RAII guards) ---
pub struct ActivityEntry {
pub id: u64,
pub label: String,
pub started: std::time::Instant,
/// Auto-expires this long after creation (or completion).
pub expires_at: std::time::Instant,
}
pub struct ActivityGuard {
agent: Arc<Agent>,
id: u64,
}
const ACTIVITY_LINGER: std::time::Duration = std::time::Duration::from_secs(5);
impl Drop for ActivityGuard {
fn drop(&mut self) {
if let Ok(mut st) = self.agent.state.try_lock() {
if let Some(entry) = st.activities.iter_mut().find(|a| a.id == self.id) {
entry.label.push_str(" (complete)");
entry.expires_at = std::time::Instant::now() + ACTIVITY_LINGER;
}
}
}
}
impl AgentState {
pub fn push_activity(&mut self, label: impl Into<String>) -> u64 {
self.expire_activities();
let id = self.next_activity_id;
self.next_activity_id += 1;
self.activities.push(ActivityEntry {
id, label: label.into(),
started: std::time::Instant::now(),
expires_at: std::time::Instant::now() + std::time::Duration::from_secs(3600),
});
2026-04-05 23:04:10 -04:00
self.changed.notify_one();
id
}
pub fn notify(&mut self, label: impl Into<String>) {
self.expire_activities();
let id = self.next_activity_id;
self.next_activity_id += 1;
self.activities.push(ActivityEntry {
id, label: label.into(),
started: std::time::Instant::now(),
expires_at: std::time::Instant::now() + ACTIVITY_LINGER,
});
2026-04-05 23:04:10 -04:00
self.changed.notify_one();
}
pub fn expire_activities(&mut self) {
let now = std::time::Instant::now();
self.activities.retain(|a| a.expires_at > now);
}
}
pub async fn start_activity(agent: &Arc<Agent>, label: impl Into<String>) -> ActivityGuard {
let id = agent.state.lock().await.push_activity(label);
ActivityGuard { agent: agent.clone(), id }
}
/// Result of a single agent turn.
pub struct TurnResult {
/// The text response (already sent through UI channel).
#[allow(dead_code)]
pub text: String,
/// Whether the model called yield_to_user during this turn.
pub yield_requested: bool,
/// Whether any tools (other than yield_to_user) were called.
pub had_tool_calls: bool,
/// Number of tool calls that returned errors this turn.
pub tool_errors: u32,
/// Model name to switch to after this turn completes.
pub model_switch: Option<String>,
/// Agent requested DMN pause (full stop on autonomous behavior).
pub dmn_pause: bool,
}
/// Accumulated state across tool dispatches within a single turn.
struct DispatchState {
yield_requested: bool,
had_tool_calls: bool,
tool_errors: u32,
model_switch: Option<String>,
dmn_pause: bool,
}
impl DispatchState {
fn new() -> Self {
Self {
yield_requested: false, had_tool_calls: false,
tool_errors: 0, model_switch: None, dmn_pause: false,
}
}
}
/// Immutable agent config — shared via Arc, no mutex needed.
pub struct Agent {
pub client: ApiClient,
pub app_config: crate::config::AppConfig,
pub prompt_file: String,
pub session_id: String,
pub context: tokio::sync::Mutex<ContextState>,
pub state: tokio::sync::Mutex<AgentState>,
}
/// Mutable agent state — behind its own mutex.
pub struct AgentState {
pub tools: Vec<tools::Tool>,
pub last_prompt_tokens: u32,
pub reasoning_effort: String,
pub temperature: f32,
pub top_p: f32,
pub top_k: u32,
pub activities: Vec<ActivityEntry>,
next_activity_id: u64,
pub pending_yield: bool,
pub pending_model_switch: Option<String>,
pub pending_dmn_pause: bool,
pub provenance: String,
pub conversation_log: Option<ConversationLog>,
pub generation: u64,
pub memory_scoring_in_flight: bool,
pub active_tools: tools::ActiveTools,
/// Forked agents should not compact on overflow — it blows the
/// KV cache prefix and evicts the step prompts.
pub no_compact: bool,
2026-04-05 23:04:10 -04:00
pub changed: Arc<tokio::sync::Notify>,
}
impl Agent {
pub async fn new(
client: ApiClient,
system_prompt: String,
personality: Vec<(String, String)>,
app_config: crate::config::AppConfig,
prompt_file: String,
conversation_log: Option<ConversationLog>,
active_tools: tools::ActiveTools,
) -> Arc<Self> {
let mut context = ContextState::new();
context.push(Section::System, AstNode::system_msg(&system_prompt));
let tool_defs: Vec<String> = tools::tools().iter()
.map(|t| t.to_json()).collect();
if !tool_defs.is_empty() {
let tools_text = format!(
"# Tools\n\nYou have access to the following functions:\n\n<tools>\n{}\n</tools>\n\n\
If you choose to call a function ONLY reply in the following format with NO suffix:\n\n\
<tool_call>\n<function=example_function_name>\n\
<parameter=example_parameter_1>\nvalue_1\n</parameter>\n\
</function>\n</tool_call>\n\n\
IMPORTANT: Function calls MUST follow the specified format.",
tool_defs.join("\n"),
);
context.push(Section::System, AstNode::system_msg(&tools_text));
}
for (name, content) in &personality {
context.push(Section::Identity, AstNode::memory(name, content));
}
let session_id = format!("consciousness-{}", chrono::Utc::now().format("%Y%m%d-%H%M%S"));
let agent = Arc::new(Self {
client,
app_config,
prompt_file,
session_id,
context: tokio::sync::Mutex::new(context),
state: tokio::sync::Mutex::new(AgentState {
tools: tools::tools(),
last_prompt_tokens: 0,
reasoning_effort: "none".to_string(),
temperature: 0.6,
top_p: 0.95,
top_k: 20,
activities: Vec::new(),
next_activity_id: 0,
pending_yield: false,
pending_model_switch: None,
pending_dmn_pause: false,
provenance: "manual".to_string(),
conversation_log,
generation: 0,
memory_scoring_in_flight: false,
active_tools,
no_compact: false,
changed: Arc::new(tokio::sync::Notify::new()),
}),
});
agent.load_startup_journal().await;
agent
}
/// Fork: clones context for KV cache prefix sharing.
pub async fn fork(self: &Arc<Self>, tools: Vec<tools::Tool>) -> Arc<Self> {
let ctx = self.context.lock().await.clone();
let st = self.state.lock().await;
Arc::new(Self {
client: self.client.clone(),
app_config: self.app_config.clone(),
prompt_file: self.prompt_file.clone(),
session_id: self.session_id.clone(),
context: tokio::sync::Mutex::new(ctx),
state: tokio::sync::Mutex::new(AgentState {
tools,
last_prompt_tokens: 0,
reasoning_effort: "none".to_string(),
temperature: st.temperature,
top_p: st.top_p,
top_k: st.top_k,
activities: Vec::new(),
next_activity_id: 0,
pending_yield: false,
pending_model_switch: None,
pending_dmn_pause: false,
provenance: st.provenance.clone(),
conversation_log: None,
generation: 0,
memory_scoring_in_flight: false,
active_tools: tools::ActiveTools::new(),
no_compact: true,
changed: Arc::new(tokio::sync::Notify::new()),
}),
})
}
pub async fn assemble_prompt_tokens(&self) -> Vec<u32> {
let ctx = self.context.lock().await;
let mut tokens = ctx.token_ids();
tokens.push(tokenizer::IM_START);
tokens.extend(tokenizer::encode("assistant\n"));
tokens
}
pub async fn push_node(&self, node: AstNode) {
let node = node.with_timestamp(chrono::Utc::now());
let st = self.state.lock().await;
if let Some(ref log) = st.conversation_log {
if let Err(e) = log.append_node(&node) {
eprintln!("warning: failed to log entry: {:#}", e);
}
}
drop(st);
self.context.lock().await.push(Section::Conversation, node);
self.state.lock().await.changed.notify_one();
}
/// Run the agent turn loop: assemble prompt, stream response,
/// parse into AST, dispatch tool calls, repeat until text response.
pub async fn turn(
agent: Arc<Agent>,
) -> Result<TurnResult> {
// Collect finished background tools
{
let finished = agent.state.lock().await.active_tools.take_finished();
if !finished.is_empty() {
let mut bg_ds = DispatchState::new();
let mut results = Vec::new();
for entry in finished {
if let Ok((call, output)) = entry.handle.await {
results.push((call, output));
}
}
Agent::apply_tool_results(&agent, results, &mut bg_ds).await;
}
}
let mut overflow_retries: u32 = 0;
let mut empty_retries: u32 = 0;
let mut ds = DispatchState::new();
loop {
let _thinking = start_activity(&agent, "thinking...").await;
let (rx, _stream_guard) = {
let prompt_tokens = agent.assemble_prompt_tokens().await;
let st = agent.state.lock().await;
agent.client.stream_completion(
&prompt_tokens,
api::SamplingParams {
temperature: st.temperature,
top_p: st.top_p,
top_k: st.top_k,
},
None,
)
};
let branch_idx = {
let mut ctx = agent.context.lock().await;
let idx = ctx.len(Section::Conversation);
ctx.push(Section::Conversation,
AstNode::branch(Role::Assistant, vec![])
.with_timestamp(chrono::Utc::now()));
idx
};
let parser = ResponseParser::new(branch_idx);
let (mut tool_rx, parser_handle) = parser.run(rx, agent.clone());
let mut pending_calls: Vec<PendingToolCall> = Vec::new();
while let Some(call) = tool_rx.recv().await {
let call_clone = call.clone();
let agent_handle = agent.clone();
let handle = tokio::spawn(async move {
let args: serde_json::Value =
serde_json::from_str(&call_clone.arguments).unwrap_or_default();
let output = tools::dispatch_with_agent(
&call_clone.name, &args, Some(agent_handle),
).await;
(call_clone, output)
});
agent.state.lock().await.active_tools.push(tools::ActiveToolCall {
id: call.id.clone(),
name: call.name.clone(),
detail: call.arguments.clone(),
started: std::time::Instant::now(),
background: false,
handle,
});
pending_calls.push(call);
}
// Check for stream/parse errors
match parser_handle.await {
Ok(Err(e)) => {
if context::is_context_overflow(&e) {
if agent.state.lock().await.no_compact {
return Err(e);
}
if overflow_retries < 2 {
overflow_retries += 1;
agent.state.lock().await.notify(
format!("context overflow — retrying ({}/2)", overflow_retries));
agent.compact().await;
continue;
}
}
return Err(e);
}
Err(e) => return Err(anyhow::anyhow!("parser task panicked: {}", e)),
Ok(Ok(())) => {
let node = agent.context.lock().await.conversation()[branch_idx].clone();
let st = agent.state.lock().await;
if let Some(ref log) = st.conversation_log {
if let Err(e) = log.append_node(&node) {
eprintln!("warning: failed to log assistant response: {:#}", e);
}
}
}
}
// Empty response — nudge and retry
let has_content = {
let ctx = agent.context.lock().await;
!ctx.conversation()[branch_idx].children().is_empty()
};
if !has_content && pending_calls.is_empty() {
if empty_retries < 2 {
empty_retries += 1;
agent.push_node(AstNode::user_msg(
"[system] Your previous response was empty. \
Please respond with text or use a tool."
)).await;
continue;
}
} else {
empty_retries = 0;
}
// Wait for tool calls to complete
if !pending_calls.is_empty() {
ds.had_tool_calls = true;
let handles = agent.state.lock().await.active_tools.take_foreground();
let mut results = Vec::new();
for entry in handles {
if let Ok((call, output)) = entry.handle.await {
results.push((call, output));
}
}
Agent::apply_tool_results(&agent, results, &mut ds).await;
continue;
}
// Text-only response — extract text and return
let text = {
let ctx = agent.context.lock().await;
let children = ctx.conversation()[branch_idx].children();
children.iter()
.filter_map(|c| c.leaf())
.filter(|l| matches!(l.body(), NodeBody::Content(_)))
.map(|l| l.body().text())
.collect::<Vec<_>>()
.join("")
};
let mut st = agent.state.lock().await;
if st.pending_yield { ds.yield_requested = true; st.pending_yield = false; }
if st.pending_model_switch.is_some() { ds.model_switch = st.pending_model_switch.take(); }
if st.pending_dmn_pause { ds.dmn_pause = true; st.pending_dmn_pause = false; }
return Ok(TurnResult {
text,
yield_requested: ds.yield_requested,
had_tool_calls: ds.had_tool_calls,
tool_errors: ds.tool_errors,
model_switch: ds.model_switch,
dmn_pause: ds.dmn_pause,
});
}
}
fn make_tool_result_node(call: &PendingToolCall, output: &str) -> AstNode {
if call.name == "memory_render" && !output.starts_with("Error:") {
let args: serde_json::Value =
serde_json::from_str(&call.arguments).unwrap_or_default();
if let Some(key) = args.get("key").and_then(|v| v.as_str()) {
return AstNode::memory(key, output);
}
}
AstNode::tool_result(output)
}
async fn apply_tool_results(
agent: &Arc<Agent>,
results: Vec<(PendingToolCall, String)>,
ds: &mut DispatchState,
) {
let mut nodes = Vec::new();
for (call, output) in &results {
ds.had_tool_calls = true;
if output.starts_with("Error:") { ds.tool_errors += 1; }
nodes.push(Self::make_tool_result_node(call, output));
}
// Single lock: remove from active, log, push to context
{
let mut st = agent.state.lock().await;
for (call, _) in &results {
st.active_tools.remove(&call.id);
}
for node in &nodes {
if let Some(ref log) = st.conversation_log {
if let Err(e) = log.append_node(node) {
eprintln!("warning: failed to log entry: {:#}", e);
}
}
}
}
{
let mut ctx = agent.context.lock().await;
for node in nodes {
ctx.push(Section::Conversation, node);
}
}
agent.state.lock().await.changed.notify_one();
}
async fn load_startup_journal(&self) {
let oldest_msg_ts = {
let st = self.state.lock().await;
st.conversation_log.as_ref().and_then(|log| log.oldest_timestamp())
};
let store = match crate::store::Store::load() {
Ok(s) => s,
Err(_) => return,
};
let mut journal_nodes: Vec<_> = store.nodes.values()
.filter(|n| n.node_type == crate::store::NodeType::EpisodicSession)
.collect();
journal_nodes.sort_by_key(|n| n.created_at);
let cutoff_idx = if let Some(cutoff) = oldest_msg_ts {
let cutoff_ts = cutoff.timestamp();
let mut idx = journal_nodes.len();
for (i, node) in journal_nodes.iter().enumerate() {
if node.created_at >= cutoff_ts {
idx = i + 1;
break;
}
}
idx
} else {
journal_nodes.len()
};
let journal_budget = context::context_window() * 15 / 100;
let mut entries = Vec::new();
let mut total_tokens = 0;
for node in journal_nodes[..cutoff_idx].iter().rev() {
let ts = chrono::DateTime::from_timestamp(node.created_at, 0);
let ast = AstNode::memory(&node.key, &node.content)
.with_timestamp(ts.unwrap_or_else(chrono::Utc::now));
let tok = ast.tokens();
if total_tokens + tok > journal_budget && !entries.is_empty() {
break;
}
total_tokens += tok;
entries.push(ast);
}
entries.reverse();
if entries.is_empty() { return; }
let mut ctx = self.context.lock().await;
ctx.clear(Section::Journal);
for entry in entries {
ctx.push(Section::Journal, entry);
}
}
pub async fn compact(&self) {
match crate::config::reload_for_model(&self.app_config, &self.prompt_file) {
Ok((_system_prompt, personality)) => {
let mut ctx = self.context.lock().await;
// System section (prompt + tools) set by new(), don't touch it
ctx.clear(Section::Identity);
for (name, content) in &personality {
ctx.push(Section::Identity, AstNode::memory(name, content));
}
}
Err(e) => {
eprintln!("warning: failed to reload identity: {:#}", e);
}
}
self.load_startup_journal().await;
self.context.lock().await.trim_conversation();
let mut st = self.state.lock().await;
st.generation += 1;
st.last_prompt_tokens = 0;
}
pub async fn restore_from_log(&self) -> bool {
let nodes = {
let st = self.state.lock().await;
match &st.conversation_log {
Some(log) => match log.read_nodes(64 * 1024 * 1024) {
Ok(nodes) if !nodes.is_empty() => nodes,
_ => return false,
},
None => return false,
}
};
{
let mut ctx = self.context.lock().await;
ctx.clear(Section::Conversation);
for node in nodes {
ctx.push(Section::Conversation, node);
}
}
self.compact().await;
let mut st = self.state.lock().await;
st.last_prompt_tokens = self.context.lock().await.tokens() as u32;
true
}
pub fn model(&self) -> &str {
&self.client.model
}
}