mind: move state to MindState, Mind becomes thin event loop
MindState (behind Arc<Mutex<>>) holds all cognitive state: DMN, turn tracking, pending input, scoring, error counters. Pure state transition methods (take_pending_input, complete_turn, dmn_tick) return Action values instead of directly spawning turns. Mind is now just the event loop: lock MindState, call state methods, execute returned actions (spawn turns, send UiMessages). No state of its own except agent handle, turn handle, and watch channel. mind/mod.rs: 957 → 586 lines. Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
parent
792e9440af
commit
7adc333219
1 changed files with 268 additions and 380 deletions
646
src/mind/mod.rs
646
src/mind/mod.rs
|
|
@ -36,155 +36,71 @@ fn compaction_threshold(app: &AppConfig) -> u32 {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// Shared state between Mind and UI. UI writes, Mind reads.
|
/// Shared state between Mind and UI.
|
||||||
pub struct MindState {
|
pub struct MindState {
|
||||||
/// Pending user input — UI pushes, Mind consumes after turn completes.
|
/// Pending user input — UI pushes, Mind consumes after turn completes.
|
||||||
pub input: Vec<String>,
|
pub input: Vec<String>,
|
||||||
/// True while a turn is in progress.
|
/// True while a turn is in progress.
|
||||||
pub turn_active: bool,
|
pub turn_active: bool,
|
||||||
|
/// DMN state
|
||||||
|
pub dmn: dmn::State,
|
||||||
|
pub dmn_turns: u32,
|
||||||
|
pub max_dmn_turns: u32,
|
||||||
|
/// Whether a full matrix /score task is currently running.
|
||||||
|
pub scoring_in_flight: bool,
|
||||||
|
/// Per-turn tracking
|
||||||
|
pub last_user_input: Instant,
|
||||||
|
pub consecutive_errors: u32,
|
||||||
|
pub last_turn_had_tools: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type SharedMindState = Arc<std::sync::Mutex<MindState>>;
|
pub type SharedMindState = Arc<std::sync::Mutex<MindState>>;
|
||||||
|
|
||||||
pub fn shared_mind_state() -> SharedMindState {
|
/// What should happen after a state transition.
|
||||||
Arc::new(std::sync::Mutex::new(MindState {
|
pub enum Action {
|
||||||
|
/// Start a turn with this input
|
||||||
|
Turn(String, StreamTarget),
|
||||||
|
/// Nothing to do
|
||||||
|
None,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MindState {
|
||||||
|
pub fn new(max_dmn_turns: u32) -> Self {
|
||||||
|
Self {
|
||||||
input: Vec::new(),
|
input: Vec::new(),
|
||||||
turn_active: false,
|
turn_active: false,
|
||||||
}))
|
dmn: if dmn::is_off() { dmn::State::Off }
|
||||||
}
|
else { dmn::State::Resting { since: Instant::now() } },
|
||||||
|
|
||||||
// --- Mind: cognitive state machine ---
|
|
||||||
|
|
||||||
pub struct Mind {
|
|
||||||
pub agent: Arc<Mutex<Agent>>,
|
|
||||||
pub shared: SharedMindState,
|
|
||||||
config: SessionConfig,
|
|
||||||
ui_tx: ui_channel::UiSender,
|
|
||||||
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
|
|
||||||
// DMN state
|
|
||||||
dmn: dmn::State,
|
|
||||||
dmn_turns: u32,
|
|
||||||
max_dmn_turns: u32,
|
|
||||||
|
|
||||||
// Turn tracking
|
|
||||||
turn_in_progress: bool,
|
|
||||||
turn_handle: Option<tokio::task::JoinHandle<()>>,
|
|
||||||
/// Broadcast when turn_in_progress changes. Commands can wait
|
|
||||||
/// for turns to complete via `turn_watch_rx.wait_for(|&v| !v)`.
|
|
||||||
turn_watch: tokio::sync::watch::Sender<bool>,
|
|
||||||
|
|
||||||
// Per-turn tracking for DMN context
|
|
||||||
last_user_input: Instant,
|
|
||||||
consecutive_errors: u32,
|
|
||||||
last_turn_had_tools: bool,
|
|
||||||
|
|
||||||
// Subconscious orchestration
|
|
||||||
agent_cycles: crate::subconscious::subconscious::AgentCycleState,
|
|
||||||
/// Latest memory importance scores from full matrix scoring (manual /score).
|
|
||||||
memory_scores: Option<learn::MemoryScore>,
|
|
||||||
/// Whether a full matrix /score task is currently running.
|
|
||||||
scoring_in_flight: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Mind {
|
|
||||||
pub fn new(
|
|
||||||
agent: Arc<Mutex<Agent>>,
|
|
||||||
shared: SharedMindState,
|
|
||||||
config: SessionConfig,
|
|
||||||
ui_tx: ui_channel::UiSender,
|
|
||||||
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
|
|
||||||
) -> Self {
|
|
||||||
let max_dmn_turns = config.app.dmn.max_turns;
|
|
||||||
let (turn_watch, _) = tokio::sync::watch::channel(false);
|
|
||||||
|
|
||||||
Self {
|
|
||||||
agent,
|
|
||||||
shared,
|
|
||||||
config,
|
|
||||||
ui_tx,
|
|
||||||
turn_tx,
|
|
||||||
dmn: if dmn::is_off() {
|
|
||||||
dmn::State::Off
|
|
||||||
} else {
|
|
||||||
dmn::State::Resting { since: Instant::now() }
|
|
||||||
},
|
|
||||||
dmn_turns: 0,
|
dmn_turns: 0,
|
||||||
max_dmn_turns,
|
max_dmn_turns,
|
||||||
turn_in_progress: false,
|
scoring_in_flight: false,
|
||||||
turn_handle: None,
|
|
||||||
turn_watch,
|
|
||||||
last_user_input: Instant::now(),
|
last_user_input: Instant::now(),
|
||||||
consecutive_errors: 0,
|
consecutive_errors: 0,
|
||||||
last_turn_had_tools: false,
|
last_turn_had_tools: false,
|
||||||
agent_cycles: crate::subconscious::subconscious::AgentCycleState::new(""),
|
|
||||||
memory_scores: None,
|
|
||||||
scoring_in_flight: false,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Subscribe to turn state changes. Use `rx.wait_for(|&v| !v).await`
|
pub fn dmn_interval(&self) -> Duration {
|
||||||
/// to wait until no turn is in progress.
|
|
||||||
pub fn turn_watch(&self) -> tokio::sync::watch::Receiver<bool> {
|
|
||||||
self.turn_watch.subscribe()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn set_turn_active(&mut self, active: bool) {
|
|
||||||
self.turn_in_progress = active;
|
|
||||||
self.shared.lock().unwrap().turn_active = active;
|
|
||||||
let _ = self.turn_watch.send(active);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// How long before the next DMN tick.
|
|
||||||
fn dmn_interval(&self) -> Duration {
|
|
||||||
self.dmn.interval()
|
self.dmn.interval()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawn an agent turn in a background task.
|
/// Consume pending input, return a Turn action if ready.
|
||||||
fn spawn_turn(&mut self, input: String, target: StreamTarget) {
|
pub fn take_pending_input(&mut self) -> Action {
|
||||||
let agent = self.agent.clone();
|
if self.turn_active || self.input.is_empty() {
|
||||||
let ui_tx = self.ui_tx.clone();
|
return Action::None;
|
||||||
let result_tx = self.turn_tx.clone();
|
|
||||||
self.set_turn_active(true);
|
|
||||||
self.turn_handle = Some(tokio::spawn(async move {
|
|
||||||
let result = Agent::turn(agent, &input, &ui_tx, target).await;
|
|
||||||
let _ = result_tx.send((result, target)).await;
|
|
||||||
}));
|
|
||||||
}
|
}
|
||||||
|
let text = self.input.join("\n");
|
||||||
/// Submit user input — either queue it (if a turn is running) or
|
self.input.clear();
|
||||||
/// start a new turn immediately.
|
|
||||||
/// Check shared state for pending user input, start a turn if available.
|
|
||||||
fn check_pending_input(&mut self) {
|
|
||||||
if self.turn_in_progress {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let input = {
|
|
||||||
let mut shared = self.shared.lock().unwrap();
|
|
||||||
if shared.input.is_empty() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
shared.input.join("\n")
|
|
||||||
};
|
|
||||||
self.shared.lock().unwrap().input.clear();
|
|
||||||
self.dmn_turns = 0;
|
self.dmn_turns = 0;
|
||||||
self.consecutive_errors = 0;
|
self.consecutive_errors = 0;
|
||||||
self.last_user_input = Instant::now();
|
self.last_user_input = Instant::now();
|
||||||
self.dmn = dmn::State::Engaged;
|
self.dmn = dmn::State::Engaged;
|
||||||
let _ = self.ui_tx.send(UiMessage::UserInput(input.clone()));
|
Action::Turn(text, StreamTarget::Conversation)
|
||||||
self.update_status();
|
|
||||||
self.spawn_turn(input, StreamTarget::Conversation);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process a completed turn: update DMN state, check compaction,
|
/// Process turn completion, return model switch name if requested.
|
||||||
/// drain any queued input.
|
pub fn complete_turn(&mut self, result: &Result<TurnResult>, target: StreamTarget) -> Option<String> {
|
||||||
async fn handle_turn_result(
|
self.turn_active = false;
|
||||||
&mut self,
|
|
||||||
result: Result<TurnResult>,
|
|
||||||
target: StreamTarget,
|
|
||||||
) {
|
|
||||||
self.set_turn_active(false);
|
|
||||||
self.turn_handle = None;
|
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
Ok(turn_result) => {
|
Ok(turn_result) => {
|
||||||
if turn_result.tool_errors > 0 {
|
if turn_result.tool_errors > 0 {
|
||||||
|
|
@ -202,131 +118,28 @@ impl Mind {
|
||||||
if turn_result.dmn_pause {
|
if turn_result.dmn_pause {
|
||||||
self.dmn = dmn::State::Paused;
|
self.dmn = dmn::State::Paused;
|
||||||
self.dmn_turns = 0;
|
self.dmn_turns = 0;
|
||||||
let _ = self.ui_tx.send(UiMessage::Info(
|
|
||||||
"DMN paused (agent requested). Ctrl+P or /wake to resume.".into(),
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
if let Some(model_name) = turn_result.model_switch {
|
turn_result.model_switch.clone()
|
||||||
crate::user::event_loop::cmd_switch_model(
|
|
||||||
&self.agent, &model_name, &self.ui_tx,
|
|
||||||
).await;
|
|
||||||
}
|
}
|
||||||
}
|
Err(_) => {
|
||||||
Err(e) => {
|
|
||||||
self.consecutive_errors += 1;
|
self.consecutive_errors += 1;
|
||||||
let msg = match target {
|
self.dmn = dmn::State::Resting { since: Instant::now() };
|
||||||
StreamTarget::Autonomous => {
|
None
|
||||||
UiMessage::DmnAnnotation(format!("[error: {:#}]", e))
|
|
||||||
}
|
}
|
||||||
StreamTarget::Conversation => {
|
|
||||||
UiMessage::Info(format!("Error: {:#}", e))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let _ = self.ui_tx.send(msg);
|
|
||||||
self.dmn = dmn::State::Resting {
|
|
||||||
since: Instant::now(),
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.update_status();
|
/// DMN tick — returns a Turn action with the DMN prompt, or None.
|
||||||
self.check_compaction();
|
pub fn dmn_tick(&mut self) -> Action {
|
||||||
if !self.config.no_agents {
|
|
||||||
self.start_memory_scoring();
|
|
||||||
}
|
|
||||||
self.drain_pending();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Spawn incremental memory scoring if not already running.
|
|
||||||
/// Non-blocking — all async work happens in the spawned task.
|
|
||||||
fn start_memory_scoring(&self) {
|
|
||||||
let agent = self.agent.clone();
|
|
||||||
let ui_tx = self.ui_tx.clone();
|
|
||||||
let cfg = crate::config::get();
|
|
||||||
let max_age = cfg.scoring_interval_secs;
|
|
||||||
let response_window = cfg.scoring_response_window;
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let (context, client) = {
|
|
||||||
let mut agent = agent.lock().await;
|
|
||||||
if agent.agent_cycles.memory_scoring_in_flight {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
agent.agent_cycles.memory_scoring_in_flight = true;
|
|
||||||
let _ = ui_tx.send(UiMessage::AgentUpdate(agent.agent_cycles.snapshots()));
|
|
||||||
(agent.context.clone(), agent.client_clone())
|
|
||||||
};
|
|
||||||
|
|
||||||
let result = learn::score_memories_incremental(
|
|
||||||
&context, max_age as i64, response_window, &client, &ui_tx,
|
|
||||||
).await;
|
|
||||||
|
|
||||||
{
|
|
||||||
let mut agent = agent.lock().await;
|
|
||||||
agent.agent_cycles.memory_scoring_in_flight = false;
|
|
||||||
if let Ok(ref scores) = result {
|
|
||||||
agent.agent_cycles.memory_scores = scores.clone();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
match result {
|
|
||||||
Ok(_) => {
|
|
||||||
let agent = agent.lock().await;
|
|
||||||
let _ = ui_tx.send(UiMessage::AgentUpdate(agent.agent_cycles.snapshots()));
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
let _ = ui_tx.send(UiMessage::Debug(format!(
|
|
||||||
"[memory-scoring] failed: {:#}", e,
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check if compaction is needed after a turn.
|
|
||||||
fn check_compaction(&self) {
|
|
||||||
let threshold = compaction_threshold(&self.config.app);
|
|
||||||
let agent = self.agent.clone();
|
|
||||||
let ui_tx = self.ui_tx.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut agent_guard = agent.lock().await;
|
|
||||||
let tokens = agent_guard.last_prompt_tokens();
|
|
||||||
if tokens > threshold {
|
|
||||||
let _ = ui_tx.send(UiMessage::Info(format!(
|
|
||||||
"[compaction: {}K > {}K threshold]",
|
|
||||||
tokens / 1000,
|
|
||||||
threshold / 1000,
|
|
||||||
)));
|
|
||||||
agent_guard.compact();
|
|
||||||
let _ = ui_tx.send(UiMessage::Info(
|
|
||||||
"[compacted — journal + recent messages]".into(),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Send any consolidated pending input as a single turn.
|
|
||||||
fn drain_pending(&mut self) {
|
|
||||||
self.check_pending_input();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Fire a DMN tick: check max turns, generate prompt, spawn turn.
|
|
||||||
fn dmn_tick(&mut self) {
|
|
||||||
if matches!(self.dmn, dmn::State::Paused | dmn::State::Off) {
|
if matches!(self.dmn, dmn::State::Paused | dmn::State::Off) {
|
||||||
return;
|
return Action::None;
|
||||||
}
|
}
|
||||||
|
|
||||||
self.dmn_turns += 1;
|
self.dmn_turns += 1;
|
||||||
if self.dmn_turns > self.max_dmn_turns {
|
if self.dmn_turns > self.max_dmn_turns {
|
||||||
let _ = self.ui_tx.send(UiMessage::DmnAnnotation(format!(
|
self.dmn = dmn::State::Resting { since: Instant::now() };
|
||||||
"[dmn: {} consecutive turns, resting (limit: {})]",
|
|
||||||
self.dmn_turns - 1,
|
|
||||||
self.max_dmn_turns,
|
|
||||||
)));
|
|
||||||
self.dmn = dmn::State::Resting {
|
|
||||||
since: Instant::now(),
|
|
||||||
};
|
|
||||||
self.dmn_turns = 0;
|
self.dmn_turns = 0;
|
||||||
self.update_status();
|
return Action::None;
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let dmn_ctx = dmn::DmnContext {
|
let dmn_ctx = dmn::DmnContext {
|
||||||
|
|
@ -335,132 +148,28 @@ impl Mind {
|
||||||
last_turn_had_tools: self.last_turn_had_tools,
|
last_turn_had_tools: self.last_turn_had_tools,
|
||||||
};
|
};
|
||||||
let prompt = self.dmn.prompt(&dmn_ctx);
|
let prompt = self.dmn.prompt(&dmn_ctx);
|
||||||
let _ = self.ui_tx.send(UiMessage::DmnAnnotation(format!(
|
Action::Turn(prompt, StreamTarget::Autonomous)
|
||||||
"[dmn: {} ({}/{})]",
|
|
||||||
self.dmn.label(),
|
|
||||||
self.dmn_turns,
|
|
||||||
self.max_dmn_turns,
|
|
||||||
)));
|
|
||||||
self.update_status();
|
|
||||||
self.spawn_turn(prompt, StreamTarget::Autonomous);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn cmd_new(&mut self) {
|
pub fn dmn_sleep(&mut self) {
|
||||||
let new_log = log::ConversationLog::new(
|
|
||||||
self.config.session_dir.join("conversation.jsonl"),
|
|
||||||
).ok();
|
|
||||||
let mut agent_guard = self.agent.lock().await;
|
|
||||||
let shared_ctx = agent_guard.shared_context.clone();
|
|
||||||
let shared_tools = agent_guard.active_tools.clone();
|
|
||||||
*agent_guard = Agent::new(
|
|
||||||
ApiClient::new(&self.config.api_base, &self.config.api_key, &self.config.model),
|
|
||||||
self.config.system_prompt.clone(),
|
|
||||||
self.config.context_parts.clone(),
|
|
||||||
self.config.app.clone(),
|
|
||||||
self.config.prompt_file.clone(),
|
|
||||||
new_log,
|
|
||||||
shared_ctx,
|
|
||||||
shared_tools,
|
|
||||||
);
|
|
||||||
drop(agent_guard);
|
|
||||||
self.dmn = dmn::State::Resting { since: Instant::now() };
|
|
||||||
let _ = self.ui_tx.send(UiMessage::Info("New session started.".into()));
|
|
||||||
}
|
|
||||||
|
|
||||||
fn cmd_score(&mut self) {
|
|
||||||
if self.scoring_in_flight {
|
|
||||||
let _ = self.ui_tx.send(UiMessage::Info("(scoring already in progress)".into()));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let agent = self.agent.clone();
|
|
||||||
let ui_tx = self.ui_tx.clone();
|
|
||||||
self.scoring_in_flight = true;
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let (context, client) = {
|
|
||||||
let ag = agent.lock().await;
|
|
||||||
(ag.context.clone(), ag.client_clone())
|
|
||||||
};
|
|
||||||
let result = learn::score_memories(&context, &client, &ui_tx).await;
|
|
||||||
let ag = agent.lock().await;
|
|
||||||
match result {
|
|
||||||
Ok(scores) => ag.publish_context_state_with_scores(Some(&scores)),
|
|
||||||
Err(e) => { let _ = ui_tx.send(UiMessage::Info(format!("[scoring failed: {:#}]", e))); }
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
fn cmd_dmn_query(&self) {
|
|
||||||
let _ = self.ui_tx.send(UiMessage::Info(format!("DMN state: {:?}", self.dmn)));
|
|
||||||
let _ = self.ui_tx.send(UiMessage::Info(format!("Next tick in: {:?}", self.dmn.interval())));
|
|
||||||
let _ = self.ui_tx.send(UiMessage::Info(format!(
|
|
||||||
"Consecutive DMN turns: {}/{}", self.dmn_turns, self.max_dmn_turns,
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
fn cmd_dmn_sleep(&mut self) {
|
|
||||||
self.dmn = dmn::State::Resting { since: Instant::now() };
|
self.dmn = dmn::State::Resting { since: Instant::now() };
|
||||||
self.dmn_turns = 0;
|
self.dmn_turns = 0;
|
||||||
let _ = self.ui_tx.send(UiMessage::Info(
|
|
||||||
"DMN sleeping (heartbeat every 5 min). Type anything to wake.".into(),
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn cmd_dmn_wake(&mut self) {
|
pub fn dmn_wake(&mut self) {
|
||||||
let was_paused = matches!(self.dmn, dmn::State::Paused | dmn::State::Off);
|
|
||||||
if matches!(self.dmn, dmn::State::Off) {
|
if matches!(self.dmn, dmn::State::Off) {
|
||||||
dmn::set_off(false);
|
dmn::set_off(false);
|
||||||
}
|
}
|
||||||
self.dmn = dmn::State::Foraging;
|
self.dmn = dmn::State::Foraging;
|
||||||
self.dmn_turns = 0;
|
self.dmn_turns = 0;
|
||||||
let msg = if was_paused { "DMN unpaused — entering foraging mode." }
|
|
||||||
else { "DMN waking — entering foraging mode." };
|
|
||||||
let _ = self.ui_tx.send(UiMessage::Info(msg.into()));
|
|
||||||
self.update_status();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn cmd_dmn_pause(&mut self) {
|
pub fn dmn_pause(&mut self) {
|
||||||
self.dmn = dmn::State::Paused;
|
self.dmn = dmn::State::Paused;
|
||||||
self.dmn_turns = 0;
|
self.dmn_turns = 0;
|
||||||
let _ = self.ui_tx.send(UiMessage::Info(
|
|
||||||
"DMN paused — no autonomous ticks. Ctrl+P or /wake to resume.".into(),
|
|
||||||
));
|
|
||||||
self.update_status();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Interrupt: kill processes, abort current turn, clear pending queue.
|
pub fn cycle_autonomy(&mut self) -> &'static str {
|
||||||
async fn interrupt(&mut self) {
|
|
||||||
let count = {
|
|
||||||
let agent = self.agent.lock().await;
|
|
||||||
let mut tools = agent.active_tools.lock().unwrap();
|
|
||||||
let count = tools.len();
|
|
||||||
for entry in tools.drain(..) {
|
|
||||||
entry.handle.abort();
|
|
||||||
}
|
|
||||||
count
|
|
||||||
};
|
|
||||||
if count == 0 {
|
|
||||||
if let Some(handle) = self.turn_handle.take() {
|
|
||||||
handle.abort();
|
|
||||||
self.set_turn_active(false);
|
|
||||||
self.dmn = dmn::State::Resting { since: Instant::now() };
|
|
||||||
self.update_status();
|
|
||||||
let _ = self.ui_tx.send(UiMessage::Activity(String::new()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
self.shared.lock().unwrap().input.clear();
|
|
||||||
let killed = count;
|
|
||||||
if killed > 0 || self.turn_in_progress {
|
|
||||||
let _ = self.ui_tx.send(UiMessage::Info(format!(
|
|
||||||
"(interrupted — killed {} process(es), turn aborted)", killed,
|
|
||||||
)));
|
|
||||||
} else {
|
|
||||||
let _ = self.ui_tx.send(UiMessage::Info("(interrupted)".into()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Cycle reasoning effort: none → low → high → none.
|
|
||||||
/// Cycle DMN autonomy: foraging → resting → paused → off → foraging.
|
|
||||||
fn cycle_autonomy(&mut self) {
|
|
||||||
let (new_state, label) = match &self.dmn {
|
let (new_state, label) = match &self.dmn {
|
||||||
dmn::State::Engaged | dmn::State::Working | dmn::State::Foraging => {
|
dmn::State::Engaged | dmn::State::Working | dmn::State::Foraging => {
|
||||||
(dmn::State::Resting { since: Instant::now() }, "resting")
|
(dmn::State::Resting { since: Instant::now() }, "resting")
|
||||||
|
|
@ -477,17 +186,117 @@ impl Mind {
|
||||||
};
|
};
|
||||||
self.dmn = new_state;
|
self.dmn = new_state;
|
||||||
self.dmn_turns = 0;
|
self.dmn_turns = 0;
|
||||||
let _ = self.ui_tx.send(UiMessage::Info(format!("DMN → {} (Ctrl+P to cycle)", label)));
|
label
|
||||||
self.update_status();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Switch to a named model from the config registry.
|
pub fn interrupt(&mut self) {
|
||||||
fn load_context_groups(&self) -> Vec<config::ContextGroup> {
|
self.input.clear();
|
||||||
config::get().context_groups.clone()
|
self.dmn = dmn::State::Resting { since: Instant::now() };
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_context_info(&self) {
|
pub fn shared_mind_state(max_dmn_turns: u32) -> SharedMindState {
|
||||||
let context_groups = self.load_context_groups();
|
Arc::new(std::sync::Mutex::new(MindState::new(max_dmn_turns)))
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Mind: cognitive state machine ---
|
||||||
|
|
||||||
|
pub struct Mind {
|
||||||
|
pub agent: Arc<Mutex<Agent>>,
|
||||||
|
pub shared: SharedMindState,
|
||||||
|
pub config: SessionConfig,
|
||||||
|
ui_tx: ui_channel::UiSender,
|
||||||
|
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
|
||||||
|
turn_handle: Option<tokio::task::JoinHandle<()>>,
|
||||||
|
turn_watch: tokio::sync::watch::Sender<bool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Mind {
|
||||||
|
pub fn new(
|
||||||
|
agent: Arc<Mutex<Agent>>,
|
||||||
|
shared: SharedMindState,
|
||||||
|
config: SessionConfig,
|
||||||
|
ui_tx: ui_channel::UiSender,
|
||||||
|
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
|
||||||
|
) -> Self {
|
||||||
|
let (turn_watch, _) = tokio::sync::watch::channel(false);
|
||||||
|
Self { agent, shared, config, ui_tx, turn_tx, turn_handle: None, turn_watch }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn turn_watch(&self) -> tokio::sync::watch::Receiver<bool> {
|
||||||
|
self.turn_watch.subscribe()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Execute an Action from a MindState method.
|
||||||
|
fn execute(&mut self, action: Action) {
|
||||||
|
if let Action::Turn(input, target) = action {
|
||||||
|
if target == StreamTarget::Conversation {
|
||||||
|
let _ = self.ui_tx.send(UiMessage::UserInput(input.clone()));
|
||||||
|
} else {
|
||||||
|
let s = self.shared.lock().unwrap();
|
||||||
|
let _ = self.ui_tx.send(UiMessage::DmnAnnotation(format!(
|
||||||
|
"[dmn: {} ({}/{})]", s.dmn.label(), s.dmn_turns, s.max_dmn_turns,
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
self.shared.lock().unwrap().turn_active = true;
|
||||||
|
let _ = self.turn_watch.send(true);
|
||||||
|
let agent = self.agent.clone();
|
||||||
|
let ui_tx = self.ui_tx.clone();
|
||||||
|
let result_tx = self.turn_tx.clone();
|
||||||
|
self.turn_handle = Some(tokio::spawn(async move {
|
||||||
|
let result = Agent::turn(agent, &input, &ui_tx, target).await;
|
||||||
|
let _ = result_tx.send((result, target)).await;
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn start_memory_scoring(&self) {
|
||||||
|
let agent = self.agent.clone();
|
||||||
|
let ui_tx = self.ui_tx.clone();
|
||||||
|
let cfg = crate::config::get();
|
||||||
|
let max_age = cfg.scoring_interval_secs;
|
||||||
|
let response_window = cfg.scoring_response_window;
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let (context, client) = {
|
||||||
|
let mut ag = agent.lock().await;
|
||||||
|
if ag.agent_cycles.memory_scoring_in_flight { return; }
|
||||||
|
ag.agent_cycles.memory_scoring_in_flight = true;
|
||||||
|
let _ = ui_tx.send(UiMessage::AgentUpdate(ag.agent_cycles.snapshots()));
|
||||||
|
(ag.context.clone(), ag.client_clone())
|
||||||
|
};
|
||||||
|
let result = learn::score_memories_incremental(
|
||||||
|
&context, max_age as i64, response_window, &client, &ui_tx,
|
||||||
|
).await;
|
||||||
|
{
|
||||||
|
let mut ag = agent.lock().await;
|
||||||
|
ag.agent_cycles.memory_scoring_in_flight = false;
|
||||||
|
if let Ok(ref scores) = result { ag.agent_cycles.memory_scores = scores.clone(); }
|
||||||
|
}
|
||||||
|
match result {
|
||||||
|
Ok(_) => { let ag = agent.lock().await; let _ = ui_tx.send(UiMessage::AgentUpdate(ag.agent_cycles.snapshots())); }
|
||||||
|
Err(e) => { let _ = ui_tx.send(UiMessage::Debug(format!("[memory-scoring] failed: {:#}", e))); }
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn check_compaction(&self) {
|
||||||
|
let threshold = compaction_threshold(&self.config.app);
|
||||||
|
let agent = self.agent.clone();
|
||||||
|
let ui_tx = self.ui_tx.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut ag = agent.lock().await;
|
||||||
|
if ag.last_prompt_tokens() > threshold {
|
||||||
|
let _ = ui_tx.send(UiMessage::Info(format!(
|
||||||
|
"[compaction: {}K > {}K]", ag.last_prompt_tokens() / 1000, threshold / 1000,
|
||||||
|
)));
|
||||||
|
ag.compact();
|
||||||
|
let _ = ui_tx.send(UiMessage::Info("[compacted]".into()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send_context_info(&self) {
|
||||||
|
let context_groups = config::get().context_groups.clone();
|
||||||
let (instruction_files, memory_files) = identity::context_file_info(
|
let (instruction_files, memory_files) = identity::context_file_info(
|
||||||
&self.config.prompt_file,
|
&self.config.prompt_file,
|
||||||
self.config.app.memory_project.as_deref(),
|
self.config.app.memory_project.as_deref(),
|
||||||
|
|
@ -505,26 +314,23 @@ impl Mind {
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_status(&self) {
|
pub fn update_status(&self) {
|
||||||
|
let s = self.shared.lock().unwrap();
|
||||||
let _ = self.ui_tx.send(UiMessage::StatusUpdate(StatusInfo {
|
let _ = self.ui_tx.send(UiMessage::StatusUpdate(StatusInfo {
|
||||||
dmn_state: self.dmn.label().to_string(),
|
dmn_state: s.dmn.label().to_string(),
|
||||||
dmn_turns: self.dmn_turns,
|
dmn_turns: s.dmn_turns,
|
||||||
dmn_max_turns: self.max_dmn_turns,
|
dmn_max_turns: s.max_dmn_turns,
|
||||||
prompt_tokens: 0,
|
prompt_tokens: 0, completion_tokens: 0,
|
||||||
completion_tokens: 0,
|
model: String::new(), turn_tools: 0,
|
||||||
model: String::new(),
|
|
||||||
turn_tools: 0,
|
|
||||||
context_budget: String::new(),
|
context_budget: String::new(),
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn shutdown(&mut self) {
|
pub async fn shutdown(&mut self) {
|
||||||
if let Some(handle) = self.turn_handle.take() {
|
if let Some(handle) = self.turn_handle.take() { handle.abort(); }
|
||||||
handle.abort();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mind event loop — reacts to user input, turn results, DMN ticks.
|
/// Mind event loop — locks MindState, calls state methods, executes actions.
|
||||||
pub async fn run(
|
pub async fn run(
|
||||||
&mut self,
|
&mut self,
|
||||||
mut input_rx: tokio::sync::mpsc::UnboundedReceiver<crate::user::event_loop::MindMessage>,
|
mut input_rx: tokio::sync::mpsc::UnboundedReceiver<crate::user::event_loop::MindMessage>,
|
||||||
|
|
@ -534,40 +340,122 @@ impl Mind {
|
||||||
use crate::user::HotkeyAction;
|
use crate::user::HotkeyAction;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let timeout = self.dmn_interval();
|
let timeout = self.shared.lock().unwrap().dmn_interval();
|
||||||
|
let turn_active = self.shared.lock().unwrap().turn_active;
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
biased;
|
biased;
|
||||||
|
|
||||||
Some(msg) = input_rx.recv() => {
|
Some(msg) = input_rx.recv() => {
|
||||||
|
{
|
||||||
|
let mut s = self.shared.lock().unwrap();
|
||||||
match msg {
|
match msg {
|
||||||
MindMessage::Hotkey(action) => {
|
MindMessage::Hotkey(HotkeyAction::CycleAutonomy) => {
|
||||||
match action {
|
let label = s.cycle_autonomy();
|
||||||
HotkeyAction::Interrupt => self.interrupt().await,
|
let _ = self.ui_tx.send(UiMessage::Info(
|
||||||
HotkeyAction::CycleAutonomy => self.cycle_autonomy(),
|
format!("DMN → {} (Ctrl+P to cycle)", label),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
MindMessage::NewSession => {
|
||||||
|
s.dmn_sleep();
|
||||||
|
let _ = self.ui_tx.send(UiMessage::Info("New session started.".into()));
|
||||||
|
}
|
||||||
|
MindMessage::Score => {
|
||||||
|
if !s.scoring_in_flight {
|
||||||
|
s.scoring_in_flight = true;
|
||||||
|
} else {
|
||||||
|
let _ = self.ui_tx.send(UiMessage::Info("(scoring already in progress)".into()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
MindMessage::DmnQuery => {
|
||||||
|
let _ = self.ui_tx.send(UiMessage::Info(format!("DMN: {:?} ({}/{})", s.dmn, s.dmn_turns, s.max_dmn_turns)));
|
||||||
|
}
|
||||||
|
MindMessage::DmnSleep => {
|
||||||
|
s.dmn_sleep();
|
||||||
|
let _ = self.ui_tx.send(UiMessage::Info("DMN sleeping.".into()));
|
||||||
|
}
|
||||||
|
MindMessage::DmnWake => {
|
||||||
|
s.dmn_wake();
|
||||||
|
let _ = self.ui_tx.send(UiMessage::Info("DMN foraging.".into()));
|
||||||
|
}
|
||||||
|
MindMessage::DmnPause => {
|
||||||
|
s.dmn_pause();
|
||||||
|
let _ = self.ui_tx.send(UiMessage::Info("DMN paused.".into()));
|
||||||
|
}
|
||||||
|
MindMessage::Hotkey(HotkeyAction::Interrupt) => {
|
||||||
|
s.interrupt();
|
||||||
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
MindMessage::NewSession => self.cmd_new().await,
|
// Handle interrupt — kill processes and abort turn
|
||||||
MindMessage::Score => self.cmd_score(),
|
if matches!(msg, MindMessage::Hotkey(HotkeyAction::Interrupt)) {
|
||||||
MindMessage::DmnQuery => self.cmd_dmn_query(),
|
let ag = self.agent.lock().await;
|
||||||
MindMessage::DmnSleep => self.cmd_dmn_sleep(),
|
let mut tools = ag.active_tools.lock().unwrap();
|
||||||
MindMessage::DmnWake => self.cmd_dmn_wake(),
|
for entry in tools.drain(..) { entry.handle.abort(); }
|
||||||
MindMessage::DmnPause => self.cmd_dmn_pause(),
|
drop(tools); drop(ag);
|
||||||
|
if let Some(h) = self.turn_handle.take() { h.abort(); }
|
||||||
|
self.shared.lock().unwrap().turn_active = false;
|
||||||
|
let _ = self.turn_watch.send(false);
|
||||||
|
let _ = self.ui_tx.send(UiMessage::Info("(interrupted)".into()));
|
||||||
}
|
}
|
||||||
self.check_pending_input();
|
// Handle /new — reset agent
|
||||||
|
if matches!(msg, MindMessage::NewSession) {
|
||||||
|
let new_log = log::ConversationLog::new(
|
||||||
|
self.config.session_dir.join("conversation.jsonl"),
|
||||||
|
).ok();
|
||||||
|
let mut ag = self.agent.lock().await;
|
||||||
|
let shared_ctx = ag.shared_context.clone();
|
||||||
|
let shared_tools = ag.active_tools.clone();
|
||||||
|
*ag = Agent::new(
|
||||||
|
ApiClient::new(&self.config.api_base, &self.config.api_key, &self.config.model),
|
||||||
|
self.config.system_prompt.clone(), self.config.context_parts.clone(),
|
||||||
|
self.config.app.clone(), self.config.prompt_file.clone(),
|
||||||
|
new_log, shared_ctx, shared_tools,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
// Handle /score — spawn scoring
|
||||||
|
if matches!(msg, MindMessage::Score) && self.shared.lock().unwrap().scoring_in_flight {
|
||||||
|
self.start_memory_scoring();
|
||||||
|
}
|
||||||
|
// Check for pending input
|
||||||
|
let action = self.shared.lock().unwrap().take_pending_input();
|
||||||
|
self.execute(action);
|
||||||
|
self.update_status();
|
||||||
}
|
}
|
||||||
|
|
||||||
Some((result, target)) = turn_rx.recv() => {
|
Some((result, target)) = turn_rx.recv() => {
|
||||||
self.handle_turn_result(result, target).await;
|
self.turn_handle = None;
|
||||||
self.check_pending_input();
|
let model_switch = self.shared.lock().unwrap().complete_turn(&result, target);
|
||||||
|
let _ = self.turn_watch.send(false);
|
||||||
|
|
||||||
|
if let Err(ref e) = result {
|
||||||
|
let msg = match target {
|
||||||
|
StreamTarget::Autonomous => UiMessage::DmnAnnotation(format!("[error: {:#}]", e)),
|
||||||
|
StreamTarget::Conversation => UiMessage::Info(format!("Error: {:#}", e)),
|
||||||
|
};
|
||||||
|
let _ = self.ui_tx.send(msg);
|
||||||
|
}
|
||||||
|
if let Some(name) = model_switch {
|
||||||
|
crate::user::event_loop::cmd_switch_model(&self.agent, &name, &self.ui_tx).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = tokio::time::sleep(timeout), if !self.turn_in_progress => {
|
self.check_compaction();
|
||||||
self.check_pending_input();
|
if !self.config.no_agents { self.start_memory_scoring(); }
|
||||||
if !self.turn_in_progress {
|
self.update_status();
|
||||||
self.dmn_tick();
|
|
||||||
|
let action = self.shared.lock().unwrap().take_pending_input();
|
||||||
|
self.execute(action);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_ = tokio::time::sleep(timeout), if !turn_active => {
|
||||||
|
let action = self.shared.lock().unwrap().take_pending_input();
|
||||||
|
self.execute(action);
|
||||||
|
if !self.shared.lock().unwrap().turn_active {
|
||||||
|
let action = self.shared.lock().unwrap().dmn_tick();
|
||||||
|
self.execute(action);
|
||||||
|
}
|
||||||
|
self.update_status();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -664,7 +552,7 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> {
|
||||||
let (turn_tx, turn_rx) = mpsc::channel::<(Result<TurnResult>, StreamTarget)>(1);
|
let (turn_tx, turn_rx) = mpsc::channel::<(Result<TurnResult>, StreamTarget)>(1);
|
||||||
|
|
||||||
let no_agents = config.no_agents;
|
let no_agents = config.no_agents;
|
||||||
let shared_mind = shared_mind_state();
|
let shared_mind = shared_mind_state(config.app.dmn.max_turns);
|
||||||
let mut mind = Mind::new(agent, shared_mind.clone(), config, ui_tx.clone(), turn_tx);
|
let mut mind = Mind::new(agent, shared_mind.clone(), config, ui_tx.clone(), turn_tx);
|
||||||
mind.update_status();
|
mind.update_status();
|
||||||
if !no_agents {
|
if !no_agents {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue