WIP: Agent/AgentState split — struct defined, 80+ errors remaining
Split Agent into immutable Agent (behind Arc) and mutable AgentState (behind its own Mutex). ContextState has its own Mutex on Agent. Activities moved to AgentState. new() and fork() rewritten. All callers need mechanical updates: agent.lock().await.field → agent.state.lock().await.field or agent.context.lock().await.method. Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
e587431f9a
commit
7fe4584ba0
1 changed files with 82 additions and 98 deletions
132
src/agent/mod.rs
132
src/agent/mod.rs
|
|
@ -38,9 +38,8 @@ pub struct ActivityEntry {
|
||||||
pub expires_at: std::time::Instant,
|
pub expires_at: std::time::Instant,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// RAII guard — marks the activity "(complete)" on drop, starts expiry timer.
|
|
||||||
pub struct ActivityGuard {
|
pub struct ActivityGuard {
|
||||||
agent: Arc<tokio::sync::Mutex<Agent>>,
|
agent: Arc<Agent>,
|
||||||
id: u64,
|
id: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -48,8 +47,8 @@ const ACTIVITY_LINGER: std::time::Duration = std::time::Duration::from_secs(5);
|
||||||
|
|
||||||
impl Drop for ActivityGuard {
|
impl Drop for ActivityGuard {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if let Ok(mut ag) = self.agent.try_lock() {
|
if let Ok(mut st) = self.agent.state.try_lock() {
|
||||||
if let Some(entry) = ag.activities.iter_mut().find(|a| a.id == self.id) {
|
if let Some(entry) = st.activities.iter_mut().find(|a| a.id == self.id) {
|
||||||
entry.label.push_str(" (complete)");
|
entry.label.push_str(" (complete)");
|
||||||
entry.expires_at = std::time::Instant::now() + ACTIVITY_LINGER;
|
entry.expires_at = std::time::Instant::now() + ACTIVITY_LINGER;
|
||||||
}
|
}
|
||||||
|
|
@ -57,8 +56,7 @@ impl Drop for ActivityGuard {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Agent {
|
impl AgentState {
|
||||||
/// Register an activity, returns its ID. Caller creates the guard.
|
|
||||||
pub fn push_activity(&mut self, label: impl Into<String>) -> u64 {
|
pub fn push_activity(&mut self, label: impl Into<String>) -> u64 {
|
||||||
self.expire_activities();
|
self.expire_activities();
|
||||||
let id = self.next_activity_id;
|
let id = self.next_activity_id;
|
||||||
|
|
@ -72,7 +70,6 @@ impl Agent {
|
||||||
id
|
id
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Push a notification — auto-expires after 5 seconds.
|
|
||||||
pub fn notify(&mut self, label: impl Into<String>) {
|
pub fn notify(&mut self, label: impl Into<String>) {
|
||||||
self.expire_activities();
|
self.expire_activities();
|
||||||
let id = self.next_activity_id;
|
let id = self.next_activity_id;
|
||||||
|
|
@ -85,21 +82,14 @@ impl Agent {
|
||||||
self.changed.notify_one();
|
self.changed.notify_one();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove expired activities.
|
|
||||||
pub fn expire_activities(&mut self) {
|
pub fn expire_activities(&mut self) {
|
||||||
let now = std::time::Instant::now();
|
let now = std::time::Instant::now();
|
||||||
self.activities.retain(|a| a.expires_at > now);
|
self.activities.retain(|a| a.expires_at > now);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create an activity guard from outside the lock.
|
pub async fn start_activity(agent: &Arc<Agent>, label: impl Into<String>) -> ActivityGuard {
|
||||||
pub fn activity_guard(agent: &Arc<tokio::sync::Mutex<Agent>>, id: u64) -> ActivityGuard {
|
let id = agent.state.lock().await.push_activity(label);
|
||||||
ActivityGuard { agent: agent.clone(), id }
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Convenience: lock, push activity, unlock, return guard.
|
|
||||||
pub async fn start_activity(agent: &Arc<tokio::sync::Mutex<Agent>>, label: impl Into<String>) -> ActivityGuard {
|
|
||||||
let id = agent.lock().await.push_activity(label);
|
|
||||||
ActivityGuard { agent: agent.clone(), id }
|
ActivityGuard { agent: agent.clone(), id }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -138,46 +128,39 @@ impl DispatchState {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Immutable agent config — shared via Arc, no mutex needed.
|
||||||
pub struct Agent {
|
pub struct Agent {
|
||||||
client: ApiClient,
|
pub client: ApiClient,
|
||||||
tools: Vec<tools::Tool>,
|
pub app_config: crate::config::AppConfig,
|
||||||
/// Last known prompt token count from the API (tracks context size).
|
pub prompt_file: String,
|
||||||
last_prompt_tokens: u32,
|
pub session_id: String,
|
||||||
/// Current reasoning effort level ("none", "low", "high").
|
pub context: tokio::sync::Mutex<ContextState>,
|
||||||
|
pub state: tokio::sync::Mutex<AgentState>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mutable agent state — behind its own mutex.
|
||||||
|
pub struct AgentState {
|
||||||
|
pub tools: Vec<tools::Tool>,
|
||||||
|
pub last_prompt_tokens: u32,
|
||||||
pub reasoning_effort: String,
|
pub reasoning_effort: String,
|
||||||
/// Sampling parameters — adjustable at runtime from the thalamus screen.
|
|
||||||
pub temperature: f32,
|
pub temperature: f32,
|
||||||
pub top_p: f32,
|
pub top_p: f32,
|
||||||
pub top_k: u32,
|
pub top_k: u32,
|
||||||
/// Active activities — RAII guards auto-remove on drop.
|
|
||||||
pub activities: Vec<ActivityEntry>,
|
pub activities: Vec<ActivityEntry>,
|
||||||
next_activity_id: u64,
|
next_activity_id: u64,
|
||||||
/// Control tool flags — set by tool handlers, consumed by turn loop.
|
|
||||||
pub pending_yield: bool,
|
pub pending_yield: bool,
|
||||||
pub pending_model_switch: Option<String>,
|
pub pending_model_switch: Option<String>,
|
||||||
pub pending_dmn_pause: bool,
|
pub pending_dmn_pause: bool,
|
||||||
/// Provenance tag for memory operations — identifies who made the change.
|
|
||||||
pub provenance: String,
|
pub provenance: String,
|
||||||
/// Persistent conversation log — append-only record of all messages.
|
|
||||||
pub conversation_log: Option<ConversationLog>,
|
pub conversation_log: Option<ConversationLog>,
|
||||||
pub context: ContextState,
|
|
||||||
/// App config — used to reload identity on compaction and model switching.
|
|
||||||
pub app_config: crate::config::AppConfig,
|
|
||||||
pub prompt_file: String,
|
|
||||||
/// Stable session ID for memory-search dedup across turns.
|
|
||||||
pub session_id: String,
|
|
||||||
/// Incremented on compaction — UI uses this to detect resets.
|
|
||||||
pub generation: u64,
|
pub generation: u64,
|
||||||
/// Whether incremental memory scoring is currently running.
|
|
||||||
pub memory_scoring_in_flight: bool,
|
pub memory_scoring_in_flight: bool,
|
||||||
/// Shared active tools — Agent writes, TUI reads.
|
|
||||||
pub active_tools: tools::SharedActiveTools,
|
pub active_tools: tools::SharedActiveTools,
|
||||||
/// Fires when agent state changes — UI wakes on this instead of polling.
|
|
||||||
pub changed: Arc<tokio::sync::Notify>,
|
pub changed: Arc<tokio::sync::Notify>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Agent {
|
impl Agent {
|
||||||
pub fn new(
|
pub async fn new(
|
||||||
client: ApiClient,
|
client: ApiClient,
|
||||||
system_prompt: String,
|
system_prompt: String,
|
||||||
personality: Vec<(String, String)>,
|
personality: Vec<(String, String)>,
|
||||||
|
|
@ -185,7 +168,7 @@ impl Agent {
|
||||||
prompt_file: String,
|
prompt_file: String,
|
||||||
conversation_log: Option<ConversationLog>,
|
conversation_log: Option<ConversationLog>,
|
||||||
active_tools: tools::SharedActiveTools,
|
active_tools: tools::SharedActiveTools,
|
||||||
) -> Self {
|
) -> Arc<Self> {
|
||||||
let mut context = ContextState::new();
|
let mut context = ContextState::new();
|
||||||
context.push(Section::System, AstNode::system_msg(&system_prompt));
|
context.push(Section::System, AstNode::system_msg(&system_prompt));
|
||||||
|
|
||||||
|
|
@ -207,9 +190,15 @@ impl Agent {
|
||||||
for (name, content) in &personality {
|
for (name, content) in &personality {
|
||||||
context.push(Section::Identity, AstNode::memory(name, content));
|
context.push(Section::Identity, AstNode::memory(name, content));
|
||||||
}
|
}
|
||||||
|
|
||||||
let session_id = format!("consciousness-{}", chrono::Utc::now().format("%Y%m%d-%H%M%S"));
|
let session_id = format!("consciousness-{}", chrono::Utc::now().format("%Y%m%d-%H%M%S"));
|
||||||
let mut agent = Self {
|
let agent = Arc::new(Self {
|
||||||
client,
|
client,
|
||||||
|
app_config,
|
||||||
|
prompt_file,
|
||||||
|
session_id,
|
||||||
|
context: tokio::sync::Mutex::new(context),
|
||||||
|
state: tokio::sync::Mutex::new(AgentState {
|
||||||
tools: tools::tools(),
|
tools: tools::tools(),
|
||||||
last_prompt_tokens: 0,
|
last_prompt_tokens: 0,
|
||||||
reasoning_effort: "none".to_string(),
|
reasoning_effort: "none".to_string(),
|
||||||
|
|
@ -223,81 +212,76 @@ impl Agent {
|
||||||
pending_dmn_pause: false,
|
pending_dmn_pause: false,
|
||||||
provenance: "manual".to_string(),
|
provenance: "manual".to_string(),
|
||||||
conversation_log,
|
conversation_log,
|
||||||
context,
|
|
||||||
app_config,
|
|
||||||
prompt_file,
|
|
||||||
session_id,
|
|
||||||
generation: 0,
|
generation: 0,
|
||||||
memory_scoring_in_flight: false,
|
memory_scoring_in_flight: false,
|
||||||
active_tools,
|
active_tools,
|
||||||
changed: Arc::new(tokio::sync::Notify::new()),
|
changed: Arc::new(tokio::sync::Notify::new()),
|
||||||
};
|
}),
|
||||||
|
});
|
||||||
|
|
||||||
agent.load_startup_journal();
|
agent.load_startup_journal().await;
|
||||||
agent
|
agent
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a lightweight agent forked from this one's context.
|
/// Fork: clones context for KV cache prefix sharing.
|
||||||
///
|
pub async fn fork(self: &Arc<Self>, tools: Vec<tools::Tool>) -> Arc<Self> {
|
||||||
/// The forked agent shares the same conversation prefix (system prompt,
|
let ctx = self.context.lock().await.clone();
|
||||||
/// personality, journal, entries) for KV cache sharing. The caller
|
let st = self.state.lock().await;
|
||||||
/// appends the subconscious prompt as a user message and runs the turn.
|
Arc::new(Self {
|
||||||
pub fn fork(&self, tools: Vec<tools::Tool>) -> Self {
|
|
||||||
|
|
||||||
Self {
|
|
||||||
client: self.client.clone(),
|
client: self.client.clone(),
|
||||||
|
app_config: self.app_config.clone(),
|
||||||
|
prompt_file: self.prompt_file.clone(),
|
||||||
|
session_id: self.session_id.clone(),
|
||||||
|
context: tokio::sync::Mutex::new(ctx),
|
||||||
|
state: tokio::sync::Mutex::new(AgentState {
|
||||||
tools,
|
tools,
|
||||||
last_prompt_tokens: 0,
|
last_prompt_tokens: 0,
|
||||||
reasoning_effort: "none".to_string(),
|
reasoning_effort: "none".to_string(),
|
||||||
temperature: self.temperature,
|
temperature: st.temperature,
|
||||||
top_p: self.top_p,
|
top_p: st.top_p,
|
||||||
top_k: self.top_k,
|
top_k: st.top_k,
|
||||||
activities: Vec::new(),
|
activities: Vec::new(),
|
||||||
next_activity_id: 0,
|
next_activity_id: 0,
|
||||||
pending_yield: false,
|
pending_yield: false,
|
||||||
pending_model_switch: None,
|
pending_model_switch: None,
|
||||||
pending_dmn_pause: false,
|
pending_dmn_pause: false,
|
||||||
provenance: self.provenance.clone(),
|
provenance: st.provenance.clone(),
|
||||||
conversation_log: None,
|
conversation_log: None,
|
||||||
context: self.context.clone(),
|
|
||||||
app_config: self.app_config.clone(),
|
|
||||||
prompt_file: self.prompt_file.clone(),
|
|
||||||
session_id: self.session_id.clone(),
|
|
||||||
generation: 0,
|
generation: 0,
|
||||||
memory_scoring_in_flight: false,
|
memory_scoring_in_flight: false,
|
||||||
active_tools: tools::shared_active_tools(),
|
active_tools: tools::shared_active_tools(),
|
||||||
changed: Arc::new(tokio::sync::Notify::new()),
|
changed: Arc::new(tokio::sync::Notify::new()),
|
||||||
}
|
}),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Assemble the full prompt as token IDs.
|
pub async fn assemble_prompt_tokens(&self) -> Vec<u32> {
|
||||||
/// Context sections + assistant prompt suffix.
|
let ctx = self.context.lock().await;
|
||||||
pub fn assemble_prompt_tokens(&self) -> Vec<u32> {
|
let mut tokens = ctx.token_ids();
|
||||||
let mut tokens = self.context.token_ids();
|
|
||||||
tokens.push(tokenizer::IM_START);
|
tokens.push(tokenizer::IM_START);
|
||||||
tokens.extend(tokenizer::encode("assistant\n"));
|
tokens.extend(tokenizer::encode("assistant\n"));
|
||||||
tokens
|
tokens
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Push a node into the conversation and log it.
|
pub async fn push_node(&self, node: AstNode) {
|
||||||
pub fn push_node(&mut self, node: AstNode) {
|
let st = self.state.lock().await;
|
||||||
if let Some(ref log) = self.conversation_log {
|
if let Some(ref log) = st.conversation_log {
|
||||||
if let Err(e) = log.append_node(&node) {
|
if let Err(e) = log.append_node(&node) {
|
||||||
eprintln!("warning: failed to log entry: {:#}", e);
|
eprintln!("warning: failed to log entry: {:#}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.context.push(Section::Conversation, node);
|
drop(st);
|
||||||
self.changed.notify_one();
|
self.context.lock().await.push(Section::Conversation, node);
|
||||||
|
self.state.lock().await.changed.notify_one();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Run the agent turn loop: assemble prompt, stream response,
|
/// Run the agent turn loop: assemble prompt, stream response,
|
||||||
/// parse into AST, dispatch tool calls, repeat until text response.
|
/// parse into AST, dispatch tool calls, repeat until text response.
|
||||||
pub async fn turn(
|
pub async fn turn(
|
||||||
agent: Arc<tokio::sync::Mutex<Agent>>,
|
agent: Arc<Agent>,
|
||||||
) -> Result<TurnResult> {
|
) -> Result<TurnResult> {
|
||||||
let active_tools = {
|
let active_tools = {
|
||||||
let me = agent.lock().await;
|
agent.state.lock().await.active_tools.clone()
|
||||||
me.active_tools.clone()
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Collect finished background tools
|
// Collect finished background tools
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue