Clean up mind loop: fix double locks, async agent triggers, input peek
- push_node: notify before dropping state lock instead of relocking - Mind::run: single lock for timeout + turn_active + has_input; single lock for turn_handle + complete_turn - Agent triggers (subconscious/unconscious) spawned as async tasks so they don't block the select loop - has_pending_input() peek for DMN sleep guard — don't sleep when there's user input waiting - unconscious: merge collect_results into trigger, single store load Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
0314619579
commit
d82a2ae90d
3 changed files with 49 additions and 52 deletions
|
|
@ -275,9 +275,9 @@ impl Agent {
|
||||||
eprintln!("warning: failed to log entry: {:#}", e);
|
eprintln!("warning: failed to log entry: {:#}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
st.changed.notify_one();
|
||||||
drop(st);
|
drop(st);
|
||||||
self.context.lock().await.push(Section::Conversation, node);
|
self.context.lock().await.push(Section::Conversation, node);
|
||||||
self.state.lock().await.changed.notify_one();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Run the agent turn loop: assemble prompt, stream response,
|
/// Run the agent turn loop: assemble prompt, stream response,
|
||||||
|
|
|
||||||
|
|
@ -165,6 +165,11 @@ impl MindState {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Is there pending user input waiting?
|
||||||
|
fn has_pending_input(&self) -> bool {
|
||||||
|
!self.turn_active && !self.input.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
/// Consume pending user input if no turn is active.
|
/// Consume pending user input if no turn is active.
|
||||||
/// Returns the text to send; caller is responsible for pushing it
|
/// Returns the text to send; caller is responsible for pushing it
|
||||||
/// into the Agent's context and starting the turn.
|
/// into the Agent's context and starting the turn.
|
||||||
|
|
@ -254,7 +259,7 @@ pub struct Mind {
|
||||||
pub shared: Arc<SharedMindState>,
|
pub shared: Arc<SharedMindState>,
|
||||||
pub config: SessionConfig,
|
pub config: SessionConfig,
|
||||||
subconscious: Arc<tokio::sync::Mutex<Subconscious>>,
|
subconscious: Arc<tokio::sync::Mutex<Subconscious>>,
|
||||||
unconscious: tokio::sync::Mutex<Unconscious>,
|
unconscious: Arc<tokio::sync::Mutex<Unconscious>>,
|
||||||
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
|
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
|
||||||
turn_watch: tokio::sync::watch::Sender<bool>,
|
turn_watch: tokio::sync::watch::Sender<bool>,
|
||||||
bg_tx: mpsc::UnboundedSender<BgEvent>,
|
bg_tx: mpsc::UnboundedSender<BgEvent>,
|
||||||
|
|
@ -295,7 +300,7 @@ impl Mind {
|
||||||
|
|
||||||
Self { agent, shared, config,
|
Self { agent, shared, config,
|
||||||
subconscious,
|
subconscious,
|
||||||
unconscious: tokio::sync::Mutex::new(Unconscious::new()),
|
unconscious: Arc::new(tokio::sync::Mutex::new(Unconscious::new())),
|
||||||
turn_tx, turn_watch, bg_tx,
|
turn_tx, turn_watch, bg_tx,
|
||||||
bg_rx: std::sync::Mutex::new(Some(bg_rx)), _supervisor: sup }
|
bg_rx: std::sync::Mutex::new(Some(bg_rx)), _supervisor: sup }
|
||||||
}
|
}
|
||||||
|
|
@ -480,9 +485,13 @@ impl Mind {
|
||||||
) {
|
) {
|
||||||
let mut bg_rx = self.bg_rx.lock().unwrap().take()
|
let mut bg_rx = self.bg_rx.lock().unwrap().take()
|
||||||
.expect("Mind::run() called twice");
|
.expect("Mind::run() called twice");
|
||||||
|
let mut sub_handle: Option<tokio::task::JoinHandle<()>> = None;
|
||||||
|
let mut unc_handle: Option<tokio::task::JoinHandle<()>> = None;
|
||||||
loop {
|
loop {
|
||||||
let timeout = self.shared.lock().unwrap().dmn.interval();
|
let (timeout, turn_active, has_input) = {
|
||||||
let turn_active = self.shared.lock().unwrap().turn_active;
|
let me = self.shared.lock().unwrap();
|
||||||
|
(me.dmn.interval(), me.turn_active, me.has_pending_input())
|
||||||
|
};
|
||||||
|
|
||||||
let mut cmds = Vec::new();
|
let mut cmds = Vec::new();
|
||||||
|
|
||||||
|
|
@ -505,8 +514,11 @@ impl Mind {
|
||||||
}
|
}
|
||||||
|
|
||||||
Some((result, target)) = turn_rx.recv() => {
|
Some((result, target)) = turn_rx.recv() => {
|
||||||
self.shared.lock().unwrap().turn_handle = None;
|
let model_switch = {
|
||||||
let model_switch = self.shared.lock().unwrap().complete_turn(&result, target);
|
let mut s = self.shared.lock().unwrap();
|
||||||
|
s.turn_handle = None;
|
||||||
|
s.complete_turn(&result, target)
|
||||||
|
};
|
||||||
let _ = self.turn_watch.send(false);
|
let _ = self.turn_watch.send(false);
|
||||||
|
|
||||||
if let Some(name) = model_switch {
|
if let Some(name) = model_switch {
|
||||||
|
|
@ -519,7 +531,7 @@ impl Mind {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = tokio::time::sleep(timeout), if !turn_active => {
|
_ = tokio::time::sleep(timeout), if !has_input => {
|
||||||
let tick = self.shared.lock().unwrap().dmn_tick();
|
let tick = self.shared.lock().unwrap().dmn_tick();
|
||||||
if let Some((prompt, target)) = tick {
|
if let Some((prompt, target)) = tick {
|
||||||
self.start_turn(&prompt, target).await;
|
self.start_turn(&prompt, target).await;
|
||||||
|
|
@ -527,16 +539,22 @@ impl Mind {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subconscious: collect finished results, trigger due agents
|
|
||||||
if !self.config.no_agents {
|
if !self.config.no_agents {
|
||||||
let mut sub = self.subconscious.lock().await;
|
if sub_handle.as_ref().map_or(true, |h| h.is_finished()) {
|
||||||
sub.collect_results(&self.agent).await;
|
let sub = self.subconscious.clone();
|
||||||
sub.trigger(&self.agent).await;
|
let agent = self.agent.clone();
|
||||||
drop(sub);
|
sub_handle = Some(tokio::spawn(async move {
|
||||||
|
let mut s = sub.lock().await;
|
||||||
let mut unc = self.unconscious.lock().await;
|
s.collect_results(&agent).await;
|
||||||
unc.collect_results().await;
|
s.trigger(&agent).await;
|
||||||
unc.trigger();
|
}));
|
||||||
|
}
|
||||||
|
if unc_handle.as_ref().map_or(true, |h| h.is_finished()) {
|
||||||
|
let unc = self.unconscious.clone();
|
||||||
|
unc_handle = Some(tokio::spawn(async move {
|
||||||
|
unc.lock().await.trigger();
|
||||||
|
}));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for pending user input → push to agent context and start turn
|
// Check for pending user input → push to agent context and start turn
|
||||||
|
|
|
||||||
|
|
@ -114,32 +114,19 @@ impl Unconscious {
|
||||||
}).collect()
|
}).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Collect results from finished agents.
|
|
||||||
pub async fn collect_results(&mut self) {
|
|
||||||
for agent in &mut self.agents {
|
|
||||||
if agent.handle.as_ref().is_some_and(|h| h.is_finished()) {
|
|
||||||
let handle = agent.handle.take().unwrap();
|
|
||||||
agent.last_run = Some(Instant::now());
|
|
||||||
agent.completed += 1;
|
|
||||||
|
|
||||||
match handle.await {
|
|
||||||
Ok((_auto, Ok(text))) => {
|
|
||||||
let preview = &text[..text.floor_char_boundary(text.len().min(100))];
|
|
||||||
dbglog!("[unconscious] {} completed: {}", agent.name, preview);
|
|
||||||
}
|
|
||||||
Ok((_auto, Err(e))) => {
|
|
||||||
dbglog!("[unconscious] {} failed: {}", agent.name, e);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
dbglog!("[unconscious] {} panicked: {}", agent.name, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Trigger agents that are due to run.
|
/// Trigger agents that are due to run.
|
||||||
pub fn trigger(&mut self) {
|
pub fn trigger(&mut self) {
|
||||||
|
// Reap finished agents
|
||||||
|
for agent in &mut self.agents {
|
||||||
|
if agent.handle.as_ref().is_some_and(|h| h.is_finished()) {
|
||||||
|
agent.last_run = Some(Instant::now());
|
||||||
|
agent.completed += 1;
|
||||||
|
dbglog!("[unconscious] {} completed ({}/{})",
|
||||||
|
agent.name, agent.completed, agent.budget);
|
||||||
|
agent.handle = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Refresh plan every 30 minutes (or on first call)
|
// Refresh plan every 30 minutes (or on first call)
|
||||||
let should_refresh = self.last_plan_refresh
|
let should_refresh = self.last_plan_refresh
|
||||||
.map(|t| t.elapsed() > Duration::from_secs(1800))
|
.map(|t| t.elapsed() > Duration::from_secs(1800))
|
||||||
|
|
@ -187,8 +174,8 @@ impl Unconscious {
|
||||||
.collect()
|
.collect()
|
||||||
};
|
};
|
||||||
|
|
||||||
// Run query and resolve placeholders synchronously
|
// Run query, resolve placeholders, record visits
|
||||||
let store = match crate::store::Store::load() {
|
let mut store = match crate::store::Store::load() {
|
||||||
Ok(s) => s,
|
Ok(s) => s,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
dbglog!("[unconscious] store load failed: {}", e);
|
dbglog!("[unconscious] store load failed: {}", e);
|
||||||
|
|
@ -196,10 +183,7 @@ impl Unconscious {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Track which nodes other running agents are working on
|
|
||||||
// to avoid concurrent collisions
|
|
||||||
let exclude: std::collections::HashSet<String> = std::collections::HashSet::new();
|
let exclude: std::collections::HashSet<String> = std::collections::HashSet::new();
|
||||||
|
|
||||||
let batch = match defs::run_agent(
|
let batch = match defs::run_agent(
|
||||||
&store, &def, def.count.unwrap_or(5), &exclude,
|
&store, &def, def.count.unwrap_or(5), &exclude,
|
||||||
) {
|
) {
|
||||||
|
|
@ -210,13 +194,8 @@ impl Unconscious {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Record visits
|
|
||||||
if !batch.node_keys.is_empty() {
|
if !batch.node_keys.is_empty() {
|
||||||
let mut store_mut = match crate::store::Store::load() {
|
store.record_agent_visits(&batch.node_keys, &name).ok();
|
||||||
Ok(s) => s,
|
|
||||||
Err(_) => return,
|
|
||||||
};
|
|
||||||
store_mut.record_agent_visits(&batch.node_keys, &name).ok();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let steps: Vec<AutoStep> = batch.steps.iter().map(|s| AutoStep {
|
let steps: Vec<AutoStep> = batch.steps.iter().map(|s| AutoStep {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue