Compare commits
2 commits
a966dd9d5d
...
4d22a28794
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4d22a28794 | ||
|
|
19789b7e74 |
5 changed files with 112 additions and 53 deletions
|
|
@ -598,8 +598,8 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result<redb::Database> {
|
||||||
return Ok(database);
|
return Ok(database);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Track latest (offset, uuid, version, deleted, node_type, timestamp) per key
|
// Track latest (offset, uuid, version, deleted, node_type, timestamp, provenance) per key
|
||||||
let mut latest: HashMap<String, (u64, [u8; 16], u32, bool, u8, i64)> = HashMap::new();
|
let mut latest: HashMap<String, (u64, [u8; 16], u32, bool, u8, i64, String)> = HashMap::new();
|
||||||
|
|
||||||
let file = fs::File::open(capnp_path)
|
let file = fs::File::open(capnp_path)
|
||||||
.with_context(|| format!("open {}", capnp_path.display()))?;
|
.with_context(|| format!("open {}", capnp_path.display()))?;
|
||||||
|
|
@ -634,6 +634,10 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result<redb::Database> {
|
||||||
.map(|t| t as u8)
|
.map(|t| t as u8)
|
||||||
.unwrap_or(0);
|
.unwrap_or(0);
|
||||||
let timestamp = node_reader.get_timestamp();
|
let timestamp = node_reader.get_timestamp();
|
||||||
|
let provenance = node_reader.get_provenance().ok()
|
||||||
|
.and_then(|t| t.to_str().ok())
|
||||||
|
.unwrap_or("manual")
|
||||||
|
.to_string();
|
||||||
|
|
||||||
let mut uuid = [0u8; 16];
|
let mut uuid = [0u8; 16];
|
||||||
if let Ok(data) = node_reader.get_uuid() {
|
if let Ok(data) = node_reader.get_uuid() {
|
||||||
|
|
@ -644,10 +648,10 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result<redb::Database> {
|
||||||
|
|
||||||
// Keep if newer version
|
// Keep if newer version
|
||||||
let dominated = latest.get(&key)
|
let dominated = latest.get(&key)
|
||||||
.map(|(_, _, v, _, _, _)| version >= *v)
|
.map(|(_, _, v, _, _, _, _)| version >= *v)
|
||||||
.unwrap_or(true);
|
.unwrap_or(true);
|
||||||
if dominated {
|
if dominated {
|
||||||
latest.insert(key, (offset, uuid, version, deleted, node_type, timestamp));
|
latest.insert(key, (offset, uuid, version, deleted, node_type, timestamp, provenance));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -659,8 +663,9 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result<redb::Database> {
|
||||||
let mut nodes_table = txn.open_table(index::NODES)?;
|
let mut nodes_table = txn.open_table(index::NODES)?;
|
||||||
let mut key_uuid_table = txn.open_table(index::KEY_TO_UUID)?;
|
let mut key_uuid_table = txn.open_table(index::KEY_TO_UUID)?;
|
||||||
let mut uuid_offsets = txn.open_multimap_table(index::UUID_OFFSETS)?;
|
let mut uuid_offsets = txn.open_multimap_table(index::UUID_OFFSETS)?;
|
||||||
|
let mut by_provenance = txn.open_multimap_table(index::NODES_BY_PROVENANCE)?;
|
||||||
|
|
||||||
for (key, (offset, uuid, _, deleted, node_type, timestamp)) in latest {
|
for (key, (offset, uuid, _, deleted, node_type, timestamp, provenance)) in latest {
|
||||||
if !deleted {
|
if !deleted {
|
||||||
nodes_table.insert(key.as_str(), offset)?;
|
nodes_table.insert(key.as_str(), offset)?;
|
||||||
// Pack: [uuid:16][node_type:1][timestamp:8] = 25 bytes
|
// Pack: [uuid:16][node_type:1][timestamp:8] = 25 bytes
|
||||||
|
|
@ -669,6 +674,12 @@ fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result<redb::Database> {
|
||||||
packed[16] = node_type;
|
packed[16] = node_type;
|
||||||
packed[17..25].copy_from_slice(×tamp.to_be_bytes());
|
packed[17..25].copy_from_slice(×tamp.to_be_bytes());
|
||||||
key_uuid_table.insert(key.as_str(), packed.as_slice())?;
|
key_uuid_table.insert(key.as_str(), packed.as_slice())?;
|
||||||
|
// Pack: [negated_timestamp:8][key] for descending sort
|
||||||
|
let neg_ts = (!timestamp).to_be_bytes();
|
||||||
|
let mut prov_val = Vec::with_capacity(8 + key.len());
|
||||||
|
prov_val.extend_from_slice(&neg_ts);
|
||||||
|
prov_val.extend_from_slice(key.as_bytes());
|
||||||
|
by_provenance.insert(provenance.as_str(), prov_val.as_slice())?;
|
||||||
}
|
}
|
||||||
// Always record offset in UUID history (even for deleted)
|
// Always record offset in UUID history (even for deleted)
|
||||||
uuid_offsets.insert(uuid.as_slice(), offset)?;
|
uuid_offsets.insert(uuid.as_slice(), offset)?;
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,8 @@ pub const NODES: TableDefinition<&str, u64> = TableDefinition::new("nodes");
|
||||||
// KEY_TO_UUID: key → [uuid:16][node_type:1][timestamp:8] = 25 bytes
|
// KEY_TO_UUID: key → [uuid:16][node_type:1][timestamp:8] = 25 bytes
|
||||||
pub const KEY_TO_UUID: TableDefinition<&str, &[u8]> = TableDefinition::new("key_to_uuid");
|
pub const KEY_TO_UUID: TableDefinition<&str, &[u8]> = TableDefinition::new("key_to_uuid");
|
||||||
pub const UUID_OFFSETS: MultimapTableDefinition<&[u8], u64> = MultimapTableDefinition::new("uuid_offsets");
|
pub const UUID_OFFSETS: MultimapTableDefinition<&[u8], u64> = MultimapTableDefinition::new("uuid_offsets");
|
||||||
pub const NODES_BY_PROVENANCE: MultimapTableDefinition<&str, &str> = MultimapTableDefinition::new("nodes_by_provenance");
|
// NODES_BY_PROVENANCE: provenance → [timestamp:8 BE][key] (sorted by timestamp desc via negated ts)
|
||||||
|
pub const NODES_BY_PROVENANCE: MultimapTableDefinition<&str, &[u8]> = MultimapTableDefinition::new("nodes_by_provenance");
|
||||||
// Composite key: [node_type: u8][timestamp: i64 BE] for range queries
|
// Composite key: [node_type: u8][timestamp: i64 BE] for range queries
|
||||||
pub const NODES_BY_TYPE: TableDefinition<&[u8], &str> = TableDefinition::new("nodes_by_type");
|
pub const NODES_BY_TYPE: TableDefinition<&[u8], &str> = TableDefinition::new("nodes_by_type");
|
||||||
|
|
||||||
|
|
@ -81,19 +82,62 @@ pub fn unpack_node_meta(data: &[u8]) -> ([u8; 16], u8, i64) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Pack provenance value: [negated_timestamp:8][key] for descending sort
|
||||||
|
fn pack_provenance_value(timestamp: i64, key: &str) -> Vec<u8> {
|
||||||
|
let neg_ts = (!timestamp).to_be_bytes(); // negate for descending order
|
||||||
|
let mut buf = Vec::with_capacity(8 + key.len());
|
||||||
|
buf.extend_from_slice(&neg_ts);
|
||||||
|
buf.extend_from_slice(key.as_bytes());
|
||||||
|
buf
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Unpack provenance value: returns (timestamp, key)
|
||||||
|
fn unpack_provenance_value(data: &[u8]) -> (i64, String) {
|
||||||
|
let neg_ts = i64::from_be_bytes([data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]]);
|
||||||
|
let timestamp = !neg_ts;
|
||||||
|
let key = String::from_utf8_lossy(&data[8..]).to_string();
|
||||||
|
(timestamp, key)
|
||||||
|
}
|
||||||
|
|
||||||
/// Record a node's location in the index.
|
/// Record a node's location in the index.
|
||||||
pub fn index_node(txn: &WriteTransaction, key: &str, offset: u64, uuid: &[u8; 16], node_type: u8, timestamp: i64) -> Result<()> {
|
pub fn index_node(txn: &WriteTransaction, key: &str, offset: u64, uuid: &[u8; 16], node_type: u8, timestamp: i64, provenance: &str) -> Result<()> {
|
||||||
let mut nodes_table = txn.open_table(NODES)?;
|
let mut nodes_table = txn.open_table(NODES)?;
|
||||||
let mut key_uuid_table = txn.open_table(KEY_TO_UUID)?;
|
let mut key_uuid_table = txn.open_table(KEY_TO_UUID)?;
|
||||||
let mut uuid_offsets = txn.open_multimap_table(UUID_OFFSETS)?;
|
let mut uuid_offsets = txn.open_multimap_table(UUID_OFFSETS)?;
|
||||||
|
let mut by_provenance = txn.open_multimap_table(NODES_BY_PROVENANCE)?;
|
||||||
|
|
||||||
nodes_table.insert(key, offset)?;
|
nodes_table.insert(key, offset)?;
|
||||||
let packed = pack_node_meta(uuid, node_type, timestamp);
|
let packed = pack_node_meta(uuid, node_type, timestamp);
|
||||||
key_uuid_table.insert(key, packed.as_slice())?;
|
key_uuid_table.insert(key, packed.as_slice())?;
|
||||||
uuid_offsets.insert(uuid.as_slice(), offset)?;
|
uuid_offsets.insert(uuid.as_slice(), offset)?;
|
||||||
|
let prov_val = pack_provenance_value(timestamp, key);
|
||||||
|
by_provenance.insert(provenance, prov_val.as_slice())?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get recent keys for a given provenance, sorted by timestamp descending.
|
||||||
|
pub fn recent_by_provenance(db: &Database, provenance: &str, limit: usize) -> Result<Vec<(String, i64)>> {
|
||||||
|
let txn = db.begin_read()?;
|
||||||
|
let table = txn.open_multimap_table(NODES_BY_PROVENANCE)?;
|
||||||
|
let mut results = Vec::new();
|
||||||
|
for entry in table.get(provenance)? {
|
||||||
|
if results.len() >= limit { break; }
|
||||||
|
let (timestamp, key) = unpack_provenance_value(entry?.value());
|
||||||
|
results.push((key, timestamp));
|
||||||
|
}
|
||||||
|
Ok(results)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get node metadata (uuid, node_type, timestamp) from KEY_TO_UUID.
|
||||||
|
pub fn get_node_meta(db: &Database, key: &str) -> Result<Option<([u8; 16], u8, i64)>> {
|
||||||
|
let txn = db.begin_read()?;
|
||||||
|
let table = txn.open_table(KEY_TO_UUID)?;
|
||||||
|
match table.get(key)? {
|
||||||
|
Some(data) => Ok(Some(unpack_node_meta(data.value()))),
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Get offset for a node by key.
|
/// Get offset for a node by key.
|
||||||
pub fn get_offset(db: &Database, key: &str) -> Result<Option<u64>> {
|
pub fn get_offset(db: &Database, key: &str) -> Result<Option<u64>> {
|
||||||
let txn = db.begin_read()?;
|
let txn = db.begin_read()?;
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,7 @@
|
||||||
//
|
//
|
||||||
// CRUD (upsert, delete), maintenance (decay, cap_degree), and graph metrics.
|
// CRUD (upsert, delete), maintenance (decay, cap_degree), and graph metrics.
|
||||||
|
|
||||||
use super::{capnp, index, types::*, Store};
|
use super::{index, types::*, Store};
|
||||||
|
|
||||||
use anyhow::{anyhow, bail, Result};
|
use anyhow::{anyhow, bail, Result};
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
|
|
@ -24,7 +24,7 @@ impl Store {
|
||||||
let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?;
|
let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?;
|
||||||
let txn = db.begin_write()?;
|
let txn = db.begin_write()?;
|
||||||
let offset = self.append_nodes(&[node.clone()])?;
|
let offset = self.append_nodes(&[node.clone()])?;
|
||||||
index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp)?;
|
index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance)?;
|
||||||
txn.commit()?;
|
txn.commit()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
@ -45,24 +45,8 @@ impl Store {
|
||||||
Some(db) => db,
|
Some(db) => db,
|
||||||
None => return Vec::new(),
|
None => return Vec::new(),
|
||||||
};
|
};
|
||||||
let keys = match index::all_keys(db) {
|
// Index stores entries sorted by timestamp descending, so just take first N
|
||||||
Ok(keys) => keys,
|
index::recent_by_provenance(db, provenance, limit).unwrap_or_default()
|
||||||
Err(_) => return Vec::new(),
|
|
||||||
};
|
|
||||||
let mut nodes: Vec<_> = keys.iter()
|
|
||||||
.filter_map(|key| {
|
|
||||||
let offset = index::get_offset(db, key).ok()??;
|
|
||||||
let node = capnp::read_node_at_offset(offset).ok()?;
|
|
||||||
if !node.deleted && node.provenance == provenance {
|
|
||||||
Some((key.clone(), node.timestamp))
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
nodes.sort_by(|a, b| b.1.cmp(&a.1));
|
|
||||||
nodes.truncate(limit);
|
|
||||||
nodes
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Upsert a node: update if exists (and content changed), create if not.
|
/// Upsert a node: update if exists (and content changed), create if not.
|
||||||
|
|
@ -90,7 +74,7 @@ impl Store {
|
||||||
node.version += 1;
|
node.version += 1;
|
||||||
let txn = db.begin_write()?;
|
let txn = db.begin_write()?;
|
||||||
let offset = self.append_nodes(std::slice::from_ref(&node))?;
|
let offset = self.append_nodes(std::slice::from_ref(&node))?;
|
||||||
index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp)?;
|
index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance)?;
|
||||||
txn.commit()?;
|
txn.commit()?;
|
||||||
Ok("updated")
|
Ok("updated")
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -98,7 +82,7 @@ impl Store {
|
||||||
node.provenance = provenance.to_string();
|
node.provenance = provenance.to_string();
|
||||||
let txn = db.begin_write()?;
|
let txn = db.begin_write()?;
|
||||||
let offset = self.append_nodes(std::slice::from_ref(&node))?;
|
let offset = self.append_nodes(std::slice::from_ref(&node))?;
|
||||||
index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp)?;
|
index::index_node(&txn, &node.key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance)?;
|
||||||
txn.commit()?;
|
txn.commit()?;
|
||||||
Ok("created")
|
Ok("created")
|
||||||
}
|
}
|
||||||
|
|
@ -189,7 +173,7 @@ impl Store {
|
||||||
let txn = db.begin_write()?;
|
let txn = db.begin_write()?;
|
||||||
let offset = self.append_nodes(&[renamed.clone(), tombstone])?;
|
let offset = self.append_nodes(&[renamed.clone(), tombstone])?;
|
||||||
index::remove_node(&txn, old_key)?;
|
index::remove_node(&txn, old_key)?;
|
||||||
index::index_node(&txn, new_key, offset, &renamed.uuid, renamed.node_type as u8, renamed.timestamp)?;
|
index::index_node(&txn, new_key, offset, &renamed.uuid, renamed.node_type as u8, renamed.timestamp, &renamed.provenance)?;
|
||||||
if !updated_rels.is_empty() {
|
if !updated_rels.is_empty() {
|
||||||
self.append_relations(&updated_rels)?;
|
self.append_relations(&updated_rels)?;
|
||||||
}
|
}
|
||||||
|
|
@ -320,7 +304,7 @@ impl Store {
|
||||||
node.timestamp = now_epoch();
|
node.timestamp = now_epoch();
|
||||||
let txn = db.begin_write()?;
|
let txn = db.begin_write()?;
|
||||||
let offset = self.append_nodes(std::slice::from_ref(&node))?;
|
let offset = self.append_nodes(std::slice::from_ref(&node))?;
|
||||||
index::index_node(&txn, key, offset, &node.uuid, node.node_type as u8, node.timestamp)?;
|
index::index_node(&txn, key, offset, &node.uuid, node.node_type as u8, node.timestamp, &node.provenance)?;
|
||||||
txn.commit()?;
|
txn.commit()?;
|
||||||
Ok((old, weight))
|
Ok((old, weight))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -346,32 +346,44 @@ impl Mind {
|
||||||
let mut s = shared_for_unc.lock().unwrap();
|
let mut s = shared_for_unc.lock().unwrap();
|
||||||
s.unc_idle = true;
|
s.unc_idle = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get wake notify for event-driven loop
|
||||||
|
let wake = unc.lock().await.wake.clone();
|
||||||
|
let mut health_interval = tokio::time::interval(std::time::Duration::from_secs(600));
|
||||||
|
health_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// Phase 0: health check outside lock (slow I/O)
|
// Do work: reap finished agents, spawn new ones
|
||||||
let needs_health = unc.lock().await.needs_health_refresh();
|
let (to_spawn, needs_health) = {
|
||||||
|
let mut guard = unc.lock().await;
|
||||||
|
guard.reap_finished();
|
||||||
|
(guard.select_to_spawn(), guard.needs_health_refresh())
|
||||||
|
};
|
||||||
|
|
||||||
|
// Spawn agents outside lock
|
||||||
|
for (idx, name, auto) in to_spawn {
|
||||||
|
match crate::mind::unconscious::prepare_spawn(&name, auto, wake.clone()).await {
|
||||||
|
Ok(result) => unc.lock().await.complete_spawn(idx, result),
|
||||||
|
Err(auto) => unc.lock().await.abort_spawn(idx, auto),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Health check outside lock (slow I/O)
|
||||||
if needs_health {
|
if needs_health {
|
||||||
if let Ok(store_arc) = access_local() {
|
if let Ok(store_arc) = access_local() {
|
||||||
let health = crate::subconscious::daemon::compute_graph_health(&store_arc);
|
let health = crate::subconscious::daemon::compute_graph_health(&store_arc);
|
||||||
unc.lock().await.set_health(health);
|
unc.lock().await.set_health(health);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Phase 1: quick work under lock
|
|
||||||
let to_spawn = {
|
// Wait for: conscious active, agent finished, or health timer
|
||||||
let mut guard = unc.lock().await;
|
tokio::select! {
|
||||||
guard.reap_finished();
|
_ = unc_rx.changed() => {
|
||||||
guard.select_to_spawn()
|
if *unc_rx.borrow() { break; }
|
||||||
};
|
|
||||||
// Phase 2: slow work outside lock
|
|
||||||
for (idx, name, auto) in to_spawn {
|
|
||||||
match crate::mind::unconscious::prepare_spawn(&name, auto).await {
|
|
||||||
Ok(result) => unc.lock().await.complete_spawn(idx, result),
|
|
||||||
Err(auto) => unc.lock().await.abort_spawn(idx, auto),
|
|
||||||
}
|
}
|
||||||
|
_ = wake.notified() => {}
|
||||||
|
_ = health_interval.tick() => {}
|
||||||
}
|
}
|
||||||
// Check if conscious became active
|
|
||||||
if *unc_rx.borrow() { break; }
|
|
||||||
// Brief yield to not starve other tasks
|
|
||||||
tokio::task::yield_now().await;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
@ -637,7 +649,8 @@ impl Mind {
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut cmds = Vec::new();
|
let mut cmds = Vec::new();
|
||||||
let mut dmn_expired = false;
|
#[allow(unused_assignments)]
|
||||||
|
let mut _dmn_expired = false;
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
biased;
|
biased;
|
||||||
|
|
@ -676,7 +689,7 @@ impl Mind {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = tokio::time::sleep(timeout), if !has_input => dmn_expired = true,
|
_ = tokio::time::sleep(timeout), if !has_input => _dmn_expired = true,
|
||||||
}
|
}
|
||||||
|
|
||||||
if !self.config.no_agents {
|
if !self.config.no_agents {
|
||||||
|
|
|
||||||
|
|
@ -71,6 +71,8 @@ pub struct Unconscious {
|
||||||
max_concurrent: usize,
|
max_concurrent: usize,
|
||||||
pub graph_health: Option<crate::subconscious::daemon::GraphHealth>,
|
pub graph_health: Option<crate::subconscious::daemon::GraphHealth>,
|
||||||
last_health_check: Option<Instant>,
|
last_health_check: Option<Instant>,
|
||||||
|
/// Notified when agent state changes (finished, toggled)
|
||||||
|
pub wake: std::sync::Arc<tokio::sync::Notify>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Unconscious {
|
impl Unconscious {
|
||||||
|
|
@ -117,6 +119,7 @@ impl Unconscious {
|
||||||
agents, max_concurrent,
|
agents, max_concurrent,
|
||||||
graph_health: None,
|
graph_health: None,
|
||||||
last_health_check: None,
|
last_health_check: None,
|
||||||
|
wake: std::sync::Arc::new(tokio::sync::Notify::new()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -130,11 +133,13 @@ impl Unconscious {
|
||||||
if new_state && !self.agents[idx].is_running() && self.agents[idx].auto.is_some() {
|
if new_state && !self.agents[idx].is_running() && self.agents[idx].auto.is_some() {
|
||||||
let agent_name = self.agents[idx].name.clone();
|
let agent_name = self.agents[idx].name.clone();
|
||||||
let auto = self.agents[idx].auto.take().unwrap();
|
let auto = self.agents[idx].auto.take().unwrap();
|
||||||
match prepare_spawn(&agent_name, auto).await {
|
let wake = self.wake.clone();
|
||||||
|
match prepare_spawn(&agent_name, auto, wake).await {
|
||||||
Ok(result) => self.complete_spawn(idx, result),
|
Ok(result) => self.complete_spawn(idx, result),
|
||||||
Err(auto) => self.abort_spawn(idx, auto),
|
Err(auto) => self.abort_spawn(idx, auto),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
self.wake.notify_one(); // wake loop to consider new state
|
||||||
Some(new_state)
|
Some(new_state)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -245,7 +250,7 @@ pub struct SpawnResult {
|
||||||
/// Called outside the Unconscious lock.
|
/// Called outside the Unconscious lock.
|
||||||
/// On success, auto is consumed (moved into spawned task).
|
/// On success, auto is consumed (moved into spawned task).
|
||||||
/// On failure, auto is returned so it can be restored.
|
/// On failure, auto is returned so it can be restored.
|
||||||
pub async fn prepare_spawn(name: &str, mut auto: AutoAgent) -> Result<SpawnResult, AutoAgent> {
|
pub async fn prepare_spawn(name: &str, mut auto: AutoAgent, wake: std::sync::Arc<tokio::sync::Notify>) -> Result<SpawnResult, AutoAgent> {
|
||||||
dbglog!("[unconscious] spawning {}", name);
|
dbglog!("[unconscious] spawning {}", name);
|
||||||
|
|
||||||
let def = match defs::get_def(name) {
|
let def = match defs::get_def(name) {
|
||||||
|
|
@ -312,6 +317,7 @@ pub async fn prepare_spawn(name: &str, mut auto: AutoAgent) -> Result<SpawnResul
|
||||||
let stats = crate::agent::oneshot::save_agent_log(&auto.name, &agent_clone).await;
|
let stats = crate::agent::oneshot::save_agent_log(&auto.name, &agent_clone).await;
|
||||||
auto.update_stats(stats);
|
auto.update_stats(stats);
|
||||||
auto.steps = orig_steps;
|
auto.steps = orig_steps;
|
||||||
|
wake.notify_one(); // wake the loop to reap and maybe spawn more
|
||||||
(auto, result)
|
(auto, result)
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
@ -323,8 +329,9 @@ impl Unconscious {
|
||||||
pub async fn trigger(&mut self) {
|
pub async fn trigger(&mut self) {
|
||||||
self.reap_finished();
|
self.reap_finished();
|
||||||
let to_spawn = self.select_to_spawn();
|
let to_spawn = self.select_to_spawn();
|
||||||
|
let wake = self.wake.clone();
|
||||||
for (idx, name, auto) in to_spawn {
|
for (idx, name, auto) in to_spawn {
|
||||||
match prepare_spawn(&name, auto).await {
|
match prepare_spawn(&name, auto, wake.clone()).await {
|
||||||
Ok(result) => self.complete_spawn(idx, result),
|
Ok(result) => self.complete_spawn(idx, result),
|
||||||
Err(auto) => self.abort_spawn(idx, auto),
|
Err(auto) => self.abort_spawn(idx, auto),
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue