Shared persistent state across all subconscious agents

Moved persistent_state from per-agent to a single shared BTreeMap on
Subconscious. All agents read/write the same state — surface's walked
keys are visible to observe and reflect, etc.

- Subconscious.state: shared BTreeMap<String, String>
- walked() derives from state["walked"] instead of separate Vec
- subconscious-state.json is now a flat key-value map
- All agent outputs merge into the shared state on completion
- Loaded on startup, saved after any agent completes

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-07 19:16:01 -04:00
parent 578be807e7
commit 27ca3c058d
8 changed files with 60 additions and 80 deletions

View file

@ -298,10 +298,8 @@ pub struct SubconsciousSnapshot {
pub forked_agent: Option<Arc<tokio::sync::Mutex<crate::agent::Agent>>>,
/// Entry index where the fork diverged.
pub fork_point: usize,
/// Persistent agent state — accumulated across runs.
/// Shared persistent state — accumulated across all agent runs.
pub state: std::collections::BTreeMap<String, String>,
/// Persistent walked keys (shared state, relevant for surface).
pub walked: Vec<String>,
/// Recent store activity for this agent: (key, timestamp), newest first.
pub history: Vec<(String, i64)>,
}
@ -317,8 +315,6 @@ struct SubconsciousAgent {
/// Entry index where the fork diverged from the conscious agent.
fork_point: usize,
handle: Option<tokio::task::JoinHandle<(AutoAgent, Result<String, String>)>>,
/// Persistent state — accumulated across runs, serialized to disk.
persistent_state: std::collections::BTreeMap<String, String>,
}
impl SubconsciousAgent {
@ -348,7 +344,6 @@ impl SubconsciousAgent {
name: name.to_string(),
auto, last_trigger_bytes: 0, last_run: None,
forked_agent: None, fork_point: 0, handle: None,
persistent_state: std::collections::BTreeMap::new(),
})
}
@ -362,14 +357,7 @@ impl SubconsciousAgent {
conversation_bytes.saturating_sub(self.last_trigger_bytes) >= interval
}
fn snapshot(&self, walked: &[String], history: Vec<(String, i64)>) -> SubconsciousSnapshot {
// Merge persistent state with any live outputs from a running agent
let mut state = self.persistent_state.clone();
if self.is_running() {
for (k, v) in &self.auto.outputs {
state.insert(k.clone(), v.clone());
}
}
fn snapshot(&self, state: &std::collections::BTreeMap<String, String>, history: Vec<(String, i64)>) -> SubconsciousSnapshot {
SubconsciousSnapshot {
name: self.name.clone(),
running: self.is_running(),
@ -378,18 +366,18 @@ impl SubconsciousAgent {
last_run_secs_ago: self.last_run.map(|t| t.elapsed().as_secs_f64()),
forked_agent: self.forked_agent.clone(),
fork_point: self.fork_point,
state,
walked: walked.to_vec(),
state: state.clone(),
history,
}
}
}
/// Background agent orchestration — owns the subconscious agents
/// and their shared state (walked keys, etc.).
/// and their shared persistent state.
pub struct Subconscious {
agents: Vec<SubconsciousAgent>,
pub walked: Vec<String>,
/// Shared state across all agents — persisted to disk.
pub state: std::collections::BTreeMap<String, String>,
state_path: Option<std::path::PathBuf>,
}
@ -398,30 +386,18 @@ impl Subconscious {
let agents = AGENTS.iter()
.filter_map(|(name, _)| SubconsciousAgent::new(name))
.collect();
Self { agents, walked: Vec::new(), state_path: None }
Self { agents, state: std::collections::BTreeMap::new(), state_path: None }
}
/// Set the state file path and load any existing state from disk.
pub fn set_state_path(&mut self, path: std::path::PathBuf) {
if let Ok(data) = std::fs::read_to_string(&path) {
if let Ok(saved) = serde_json::from_str::<
std::collections::BTreeMap<String, std::collections::BTreeMap<String, String>>
std::collections::BTreeMap<String, String>
>(&data) {
for agent in &mut self.agents {
if let Some(state) = saved.get(&agent.name) {
agent.persistent_state = state.clone();
}
}
// Restore walked from surface agent if present
if let Some(surface) = self.agents.iter().find(|a| a.name == "subconscious-surface") {
if let Some(walked_str) = surface.persistent_state.get("walked") {
self.walked = walked_str.lines()
.map(|l| l.trim().to_string())
.filter(|l| !l.is_empty())
.collect();
}
}
dbglog!("[subconscious] loaded state from {}", path.display());
self.state = saved;
dbglog!("[subconscious] loaded {} state keys from {}",
self.state.len(), path.display());
}
}
self.state_path = Some(path);
@ -429,25 +405,24 @@ impl Subconscious {
fn save_state(&self) {
let Some(path) = &self.state_path else { return };
let mut map: std::collections::BTreeMap<String, std::collections::BTreeMap<String, String>> =
std::collections::BTreeMap::new();
for agent in &self.agents {
if !agent.persistent_state.is_empty() {
map.insert(agent.name.clone(), agent.persistent_state.clone());
}
}
if let Ok(json) = serde_json::to_string_pretty(&map) {
if let Ok(json) = serde_json::to_string_pretty(&self.state) {
let _ = std::fs::write(path, json);
}
}
pub fn walked(&self) -> Vec<String> {
self.state.get("walked")
.map(|s| s.lines().map(|l| l.trim().to_string()).filter(|l| !l.is_empty()).collect())
.unwrap_or_default()
}
pub fn snapshots(&self, store: Option<&crate::store::Store>) -> Vec<SubconsciousSnapshot> {
self.agents.iter().map(|s| {
let history = store.map(|st| {
let prov = format!("agent:{}", s.name);
st.recent_by_provenance(&prov, 30)
}).unwrap_or_default();
s.snapshot(&self.walked, history)
s.snapshot(&self.state, history)
}).collect()
}
@ -475,16 +450,9 @@ impl Subconscious {
Ok(_) => {
let name = self.agents[idx].name.clone();
let outputs = std::mem::take(&mut self.agents[idx].auto.outputs);
// Merge into persistent state
// Merge into shared persistent state
for (k, v) in &outputs {
self.agents[idx].persistent_state.insert(k.clone(), v.clone());
}
if let Some(walked_str) = outputs.get("walked") {
self.walked = walked_str.lines()
.map(|l| l.trim().to_string())
.filter(|l| !l.is_empty())
.collect();
self.state.insert(k.clone(), v.clone());
}
// Inject all outputs into the conscious agent under one lock
@ -577,7 +545,6 @@ impl Subconscious {
if to_run.is_empty() { return; }
let conscious = agent.lock().await;
let walked = self.walked.clone();
for (idx, mut auto) in to_run {
dbglog!("[subconscious] triggering {}", auto.name);
@ -590,10 +557,10 @@ impl Subconscious {
self.agents[idx].fork_point = fork_point;
let keys = memory_keys.clone();
let w = walked.clone();
let st = self.state.clone();
self.agents[idx].handle = Some(tokio::spawn(async move {
let result = auto.run_forked_shared(&shared_forked, &keys, &w).await;
let result = auto.run_forked_shared(&shared_forked, &keys, &st).await;
(auto, result)
}));
}

View file

@ -258,7 +258,7 @@ impl Mind {
}
pub async fn subconscious_walked(&self) -> Vec<String> {
self.subconscious.lock().await.walked.clone()
self.subconscious.lock().await.walked()
}
pub async fn init(&self) {