Agent/AgentState split complete — separate context and state locks
Agent is now Arc<Agent> (immutable config). ContextState and AgentState have separate tokio::sync::Mutex locks. The parser locks only context, tool dispatch locks only state. No contention between the two. All callers migrated: mind/, user/, tools/, oneshot, dmn, learn. 28 tests pass, zero errors. Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
1d61b091b0
commit
0b9813431a
8 changed files with 156 additions and 159 deletions
|
|
@ -460,8 +460,6 @@ impl Subconscious {
|
|||
|| outputs.contains_key("reflection")
|
||||
|| outputs.contains_key("thalamus");
|
||||
if has_outputs {
|
||||
let mut ag = agent.lock().await;
|
||||
|
||||
if let Some(surface_str) = outputs.get("surface") {
|
||||
let store = crate::store::Store::cached().await.ok();
|
||||
let store_guard = match &store {
|
||||
|
|
@ -472,30 +470,30 @@ impl Subconscious {
|
|||
let rendered = store_guard.as_ref()
|
||||
.and_then(|s| crate::cli::node::render_node(s, key));
|
||||
if let Some(rendered) = rendered {
|
||||
ag.push_node(AstNode::memory(
|
||||
agent.push_node(AstNode::memory(
|
||||
key,
|
||||
format!("--- {} (surfaced) ---\n{}", key, rendered),
|
||||
));
|
||||
)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(reflection) = outputs.get("reflection") {
|
||||
if !reflection.trim().is_empty() {
|
||||
ag.push_node(AstNode::dmn(format!(
|
||||
agent.push_node(AstNode::dmn(format!(
|
||||
"--- subconscious reflection ---\n{}",
|
||||
reflection.trim(),
|
||||
)));
|
||||
))).await;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(nudge) = outputs.get("thalamus") {
|
||||
let nudge = nudge.trim();
|
||||
if !nudge.is_empty() && nudge != "ok" {
|
||||
ag.push_node(AstNode::dmn(format!(
|
||||
agent.push_node(AstNode::dmn(format!(
|
||||
"--- thalamus ---\n{}",
|
||||
nudge,
|
||||
)));
|
||||
))).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -513,13 +511,13 @@ impl Subconscious {
|
|||
/// Trigger subconscious agents that are due to run.
|
||||
pub async fn trigger(&mut self, agent: &Arc<Agent>) {
|
||||
let (conversation_bytes, memory_keys) = {
|
||||
let ag = agent.lock().await;
|
||||
let bytes = ag.context.conversation().iter()
|
||||
let ctx = agent.context.lock().await;
|
||||
let bytes = ctx.conversation().iter()
|
||||
.filter(|node| !matches!(node.leaf().map(|l| l.body()),
|
||||
Some(NodeBody::Log(_)) | Some(NodeBody::Memory { .. })))
|
||||
.map(|node| node.render().len() as u64)
|
||||
.sum::<u64>();
|
||||
let keys: Vec<String> = ag.context.conversation().iter().filter_map(|node| {
|
||||
let keys: Vec<String> = ctx.conversation().iter().filter_map(|node| {
|
||||
if let Some(NodeBody::Memory { key, .. }) = node.leaf().map(|l| l.body()) {
|
||||
Some(key.clone())
|
||||
} else { None }
|
||||
|
|
@ -541,23 +539,21 @@ impl Subconscious {
|
|||
|
||||
if to_run.is_empty() { return; }
|
||||
|
||||
let conscious = agent.lock().await;
|
||||
for (idx, mut auto) in to_run {
|
||||
dbglog!("[subconscious] triggering {}", auto.name);
|
||||
|
||||
let mut forked = conscious.fork(auto.tools.clone());
|
||||
forked.provenance = format!("agent:{}", auto.name);
|
||||
let fork_point = forked.context.conversation().len();
|
||||
let shared_forked = Arc::new(tokio::sync::Mutex::new(forked));
|
||||
let forked = agent.fork(auto.tools.clone()).await;
|
||||
forked.state.lock().await.provenance = format!("agent:{}", auto.name);
|
||||
let fork_point = forked.context.lock().await.conversation().len();
|
||||
|
||||
self.agents[idx].forked_agent = Some(shared_forked.clone());
|
||||
self.agents[idx].forked_agent = Some(forked.clone());
|
||||
self.agents[idx].fork_point = fork_point;
|
||||
|
||||
let keys = memory_keys.clone();
|
||||
let st = self.state.clone();
|
||||
|
||||
self.agents[idx].handle = Some(tokio::spawn(async move {
|
||||
let result = auto.run_forked_shared(&shared_forked, &keys, &st).await;
|
||||
let result = auto.run_forked_shared(&forked, &keys, &st).await;
|
||||
(auto, result)
|
||||
}));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -260,7 +260,7 @@ pub struct Mind {
|
|||
}
|
||||
|
||||
impl Mind {
|
||||
pub fn new(
|
||||
pub async fn new(
|
||||
config: SessionConfig,
|
||||
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
|
||||
) -> Self {
|
||||
|
|
@ -271,7 +271,7 @@ impl Mind {
|
|||
config.session_dir.join("conversation.jsonl"),
|
||||
).ok();
|
||||
|
||||
let ag = Agent::new(
|
||||
let agent = Agent::new(
|
||||
client,
|
||||
config.system_prompt.clone(),
|
||||
config.context_parts.clone(),
|
||||
|
|
@ -279,8 +279,7 @@ impl Mind {
|
|||
config.prompt_file.clone(),
|
||||
conversation_log,
|
||||
shared_active_tools,
|
||||
);
|
||||
let agent = Arc::new(tokio::sync::Mutex::new(ag));
|
||||
).await;
|
||||
|
||||
let shared = Arc::new(std::sync::Mutex::new(MindState::new(config.app.dmn.max_turns)));
|
||||
let (turn_watch, _) = tokio::sync::watch::channel(false);
|
||||
|
|
@ -314,15 +313,13 @@ impl Mind {
|
|||
|
||||
pub async fn init(&self) {
|
||||
// Restore conversation
|
||||
let mut ag = self.agent.lock().await;
|
||||
ag.restore_from_log();
|
||||
self.agent.restore_from_log().await;
|
||||
|
||||
// Restore persisted memory scores
|
||||
let scores_path = self.config.session_dir.join("memory-scores.json");
|
||||
load_memory_scores(&mut ag.context, &scores_path);
|
||||
load_memory_scores(&mut *self.agent.context.lock().await, &scores_path);
|
||||
|
||||
ag.changed.notify_one();
|
||||
drop(ag);
|
||||
self.agent.state.lock().await.changed.notify_one();
|
||||
|
||||
// Load persistent subconscious state
|
||||
let state_path = self.config.session_dir.join("subconscious-state.json");
|
||||
|
|
@ -340,10 +337,9 @@ impl Mind {
|
|||
MindCommand::None => {}
|
||||
MindCommand::Compact => {
|
||||
let threshold = compaction_threshold(&self.config.app) as usize;
|
||||
let mut ag = self.agent.lock().await;
|
||||
if ag.context.tokens() > threshold {
|
||||
ag.compact();
|
||||
ag.notify("compacted");
|
||||
if self.agent.context.lock().await.tokens() > threshold {
|
||||
self.agent.compact().await;
|
||||
self.agent.state.lock().await.notify("compacted");
|
||||
}
|
||||
}
|
||||
MindCommand::Score => {
|
||||
|
|
@ -356,10 +352,10 @@ impl Mind {
|
|||
}
|
||||
MindCommand::Interrupt => {
|
||||
self.shared.lock().unwrap().interrupt();
|
||||
let ag = self.agent.lock().await;
|
||||
let mut tools = ag.active_tools.lock().unwrap();
|
||||
let active_tools = self.agent.state.lock().await.active_tools.clone();
|
||||
let mut tools = active_tools.lock().unwrap();
|
||||
for entry in tools.drain(..) { entry.handle.abort(); }
|
||||
drop(tools); drop(ag);
|
||||
drop(tools);
|
||||
if let Some(h) = self.shared.lock().unwrap().turn_handle.take() { h.abort(); }
|
||||
self.shared.lock().unwrap().turn_active = false;
|
||||
let _ = self.turn_watch.send(false);
|
||||
|
|
@ -373,14 +369,17 @@ impl Mind {
|
|||
let new_log = log::ConversationLog::new(
|
||||
self.config.session_dir.join("conversation.jsonl"),
|
||||
).ok();
|
||||
let mut ag = self.agent.lock().await;
|
||||
let shared_tools = ag.active_tools.clone();
|
||||
*ag = Agent::new(
|
||||
ApiClient::new(&self.config.api_base, &self.config.api_key, &self.config.model),
|
||||
self.config.system_prompt.clone(), self.config.context_parts.clone(),
|
||||
self.config.app.clone(), self.config.prompt_file.clone(),
|
||||
new_log, shared_tools,
|
||||
);
|
||||
{
|
||||
let mut ctx = self.agent.context.lock().await;
|
||||
ctx.clear(Section::Conversation);
|
||||
}
|
||||
{
|
||||
let mut st = self.agent.state.lock().await;
|
||||
st.conversation_log = new_log;
|
||||
st.generation += 1;
|
||||
st.last_prompt_tokens = 0;
|
||||
}
|
||||
self.agent.compact().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -395,10 +394,12 @@ impl Mind {
|
|||
let response_window = cfg.scoring_response_window;
|
||||
tokio::spawn(async move {
|
||||
let (context, client) = {
|
||||
let mut ag = agent.lock().await;
|
||||
if ag.memory_scoring_in_flight { return; }
|
||||
ag.memory_scoring_in_flight = true;
|
||||
(ag.context.clone(), ag.client_clone())
|
||||
let mut st = agent.state.lock().await;
|
||||
if st.memory_scoring_in_flight { return; }
|
||||
st.memory_scoring_in_flight = true;
|
||||
drop(st);
|
||||
let ctx = agent.context.lock().await.clone();
|
||||
(ctx, agent.client.clone())
|
||||
};
|
||||
let _result = learn::score_memories_incremental(
|
||||
&context, max_age as i64, response_window, &client, &agent,
|
||||
|
|
@ -407,27 +408,27 @@ impl Mind {
|
|||
let path = scores_path.clone();
|
||||
async move {
|
||||
let scores_snapshot = {
|
||||
let mut ag = agent.lock().await;
|
||||
for i in 0..ag.context.conversation().len() {
|
||||
if let AstNode::Leaf(leaf) = &ag.context.conversation()[i] {
|
||||
let mut ctx = agent.context.lock().await;
|
||||
for i in 0..ctx.conversation().len() {
|
||||
if let AstNode::Leaf(leaf) = &ctx.conversation()[i] {
|
||||
if let NodeBody::Memory { key: k, .. } = leaf.body() {
|
||||
if *k == key {
|
||||
ag.context.set_score(Section::Conversation, i, Some(score));
|
||||
ctx.set_score(Section::Conversation, i, Some(score));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
ag.changed.notify_one();
|
||||
collect_memory_scores(&ag.context)
|
||||
let snapshot = collect_memory_scores(&ctx);
|
||||
drop(ctx);
|
||||
agent.state.lock().await.changed.notify_one();
|
||||
snapshot
|
||||
};
|
||||
// Write to disk after releasing the lock
|
||||
save_memory_scores(&scores_snapshot, &path);
|
||||
}
|
||||
},
|
||||
).await;
|
||||
{
|
||||
let mut ag = agent.lock().await;
|
||||
ag.memory_scoring_in_flight = false;
|
||||
agent.state.lock().await.memory_scoring_in_flight = false;
|
||||
}
|
||||
let _ = bg_tx.send(BgEvent::ScoringDone);
|
||||
});
|
||||
|
|
@ -435,21 +436,20 @@ impl Mind {
|
|||
|
||||
async fn start_turn(&self, text: &str, target: StreamTarget) {
|
||||
{
|
||||
let mut ag = self.agent.lock().await;
|
||||
match target {
|
||||
StreamTarget::Conversation => {
|
||||
ag.push_node(AstNode::user_msg(text));
|
||||
self.agent.push_node(AstNode::user_msg(text)).await;
|
||||
}
|
||||
StreamTarget::Autonomous => {
|
||||
ag.push_node(AstNode::dmn(text));
|
||||
self.agent.push_node(AstNode::dmn(text)).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Compact if over budget before sending
|
||||
let threshold = compaction_threshold(&self.config.app) as usize;
|
||||
if ag.context.tokens() > threshold {
|
||||
ag.compact();
|
||||
ag.notify("compacted");
|
||||
if self.agent.context.lock().await.tokens() > threshold {
|
||||
self.agent.compact().await;
|
||||
self.agent.state.lock().await.notify("compacted");
|
||||
}
|
||||
}
|
||||
self.shared.lock().unwrap().turn_active = true;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue