forked from kent/consciousness
store: wire up redb updates on mutations
Mutations (upsert_node, upsert_provenance, delete_node, rename_node) now update redb indices atomically with capnp log appends, under the same StoreLock. Also removes dead cmd_import command and the parse.rs module it depended on. Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
a1accc7cd4
commit
9309de68fc
5 changed files with 23 additions and 214 deletions
|
|
@ -315,45 +315,6 @@ pub async fn cmd_daily_check() -> Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn cmd_import(files: &[String]) -> Result<()> {
|
||||
if files.is_empty() {
|
||||
anyhow::bail!("import requires at least one file path");
|
||||
}
|
||||
|
||||
let arc = memory::access_local()?;
|
||||
let mut store = arc.lock().await;
|
||||
let mut count = 0;
|
||||
|
||||
for arg in files {
|
||||
let path = std::path::PathBuf::from(arg);
|
||||
let resolved = if path.exists() {
|
||||
path
|
||||
} else {
|
||||
let mem_path = store::memory_dir().join(arg);
|
||||
if !mem_path.exists() {
|
||||
eprintln!("File not found: {}", arg);
|
||||
continue;
|
||||
}
|
||||
mem_path
|
||||
};
|
||||
|
||||
let filename = resolved.file_name().unwrap().to_string_lossy().to_string();
|
||||
let content = std::fs::read_to_string(&resolved)?;
|
||||
let units = store::parse_units(&filename, &content);
|
||||
|
||||
for unit in units {
|
||||
store.upsert(&unit.key, &unit.content)?;
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if count > 0 {
|
||||
store.save()?;
|
||||
}
|
||||
println!("Imported {} memory units", count);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn cmd_status() -> Result<()> {
|
||||
let result = memory::graph_topology(None).await
|
||||
?;
|
||||
|
|
|
|||
|
|
@ -8,14 +8,12 @@
|
|||
//
|
||||
// Module layout:
|
||||
// types.rs — Node, Relation, enums, capnp macros, path helpers
|
||||
// parse.rs — markdown → MemoryUnit parsing
|
||||
// view.rs — StoreView trait for read-only access
|
||||
// persist.rs — load, replay, append (capnp IO)
|
||||
// ops.rs — mutations (upsert, delete, decay, cap_degree, etc.)
|
||||
// mod.rs — re-exports, key resolution, ingestion, rendering
|
||||
|
||||
mod types;
|
||||
mod parse;
|
||||
mod view;
|
||||
mod persist;
|
||||
mod ops;
|
||||
|
|
@ -28,7 +26,6 @@ pub use types::{
|
|||
Node, Relation, NodeType, RelationType, Store,
|
||||
new_node, new_relation,
|
||||
};
|
||||
pub use parse::{MemoryUnit, parse_units};
|
||||
pub use view::StoreView;
|
||||
pub use persist::fsck;
|
||||
pub use ops::current_provenance;
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
//
|
||||
// CRUD (upsert, delete), maintenance (decay, cap_degree), and graph metrics.
|
||||
|
||||
use super::types::*;
|
||||
use super::{db, types::*};
|
||||
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
|
@ -15,7 +15,7 @@ pub fn current_provenance() -> String {
|
|||
}
|
||||
|
||||
impl Store {
|
||||
/// Add or update a node (appends to log + updates cache).
|
||||
/// Add or update a node (appends to log + updates cache + redb).
|
||||
/// Holds StoreLock across refresh + check + write to prevent duplicate UUIDs.
|
||||
pub fn upsert_node(&mut self, mut node: Node) -> Result<()> {
|
||||
let _lock = StoreLock::acquire()?;
|
||||
|
|
@ -26,6 +26,9 @@ impl Store {
|
|||
node.version = existing.version + 1;
|
||||
}
|
||||
self.append_nodes_unlocked(&[node.clone()])?;
|
||||
if let Some(ref database) = self.db {
|
||||
db::upsert_node(database, &node)?;
|
||||
}
|
||||
self.uuid_to_key.insert(node.uuid, node.key.clone());
|
||||
self.nodes.insert(node.key.clone(), node);
|
||||
Ok(())
|
||||
|
|
@ -75,19 +78,25 @@ impl Store {
|
|||
node.timestamp = now_epoch();
|
||||
node.version += 1;
|
||||
self.append_nodes_unlocked(std::slice::from_ref(&node))?;
|
||||
if let Some(ref database) = self.db {
|
||||
db::upsert_node(database, &node)?;
|
||||
}
|
||||
self.nodes.insert(key.to_string(), node);
|
||||
Ok("updated")
|
||||
} else {
|
||||
let mut node = new_node(key, content);
|
||||
node.provenance = provenance.to_string();
|
||||
self.append_nodes_unlocked(std::slice::from_ref(&node))?;
|
||||
if let Some(ref database) = self.db {
|
||||
db::upsert_node(database, &node)?;
|
||||
}
|
||||
self.uuid_to_key.insert(node.uuid, node.key.clone());
|
||||
self.nodes.insert(key.to_string(), node);
|
||||
Ok("created")
|
||||
}
|
||||
}
|
||||
|
||||
/// Soft-delete a node (appends deleted version, removes from cache).
|
||||
/// Soft-delete a node (appends deleted version, removes from cache + redb).
|
||||
/// Holds StoreLock across refresh + write to see concurrent creates.
|
||||
pub fn delete_node(&mut self, key: &str) -> Result<()> {
|
||||
let _lock = StoreLock::acquire()?;
|
||||
|
|
@ -97,12 +106,16 @@ impl Store {
|
|||
|
||||
let node = self.nodes.get(key)
|
||||
.ok_or_else(|| anyhow!("No node '{}'", key))?;
|
||||
let uuid = node.uuid;
|
||||
let mut deleted = node.clone();
|
||||
deleted.deleted = true;
|
||||
deleted.version += 1;
|
||||
deleted.provenance = prov;
|
||||
deleted.timestamp = now_epoch();
|
||||
self.append_nodes_unlocked(std::slice::from_ref(&deleted))?;
|
||||
if let Some(ref database) = self.db {
|
||||
db::delete_node(database, key, &uuid)?;
|
||||
}
|
||||
self.nodes.remove(key);
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -159,11 +172,17 @@ impl Store {
|
|||
.collect();
|
||||
|
||||
// Persist under single lock
|
||||
self.append_nodes_unlocked(&[renamed.clone(), tombstone])?;
|
||||
self.append_nodes_unlocked(&[renamed.clone(), tombstone.clone()])?;
|
||||
if !updated_rels.is_empty() {
|
||||
self.append_relations_unlocked(&updated_rels)?;
|
||||
}
|
||||
|
||||
// Update redb: delete old key, insert renamed
|
||||
if let Some(ref database) = self.db {
|
||||
db::delete_node(database, old_key, &tombstone.uuid)?;
|
||||
db::upsert_node(database, &renamed)?;
|
||||
}
|
||||
|
||||
// Update in-memory cache
|
||||
self.nodes.remove(old_key);
|
||||
self.uuid_to_key.insert(renamed.uuid, new_key.to_string());
|
||||
|
|
|
|||
|
|
@ -1,162 +0,0 @@
|
|||
// Markdown parsing for memory files
|
||||
//
|
||||
// Splits markdown files into MemoryUnit structs based on `<!-- mem: ... -->`
|
||||
// markers. Each marker starts a new section; content before the first marker
|
||||
// becomes the file-level unit. Links and causal edges are extracted from
|
||||
// both marker attributes and inline markdown links.
|
||||
|
||||
use regex::Regex;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::sync::OnceLock;
|
||||
|
||||
pub struct MemoryUnit {
|
||||
pub key: String,
|
||||
pub content: String,
|
||||
pub marker_links: Vec<String>,
|
||||
pub md_links: Vec<String>,
|
||||
pub causes: Vec<String>,
|
||||
pub state: Option<String>,
|
||||
pub source_ref: Option<String>,
|
||||
}
|
||||
|
||||
pub fn parse_units(raw_filename: &str, content: &str) -> Vec<MemoryUnit> {
|
||||
let filename = raw_filename.strip_suffix(".md").unwrap_or(raw_filename);
|
||||
static MARKER_RE: OnceLock<Regex> = OnceLock::new();
|
||||
static SOURCE_RE: OnceLock<Regex> = OnceLock::new();
|
||||
static MD_LINK_RE: OnceLock<Regex> = OnceLock::new();
|
||||
|
||||
let marker_re = MARKER_RE.get_or_init(||
|
||||
Regex::new(r"<!--\s*mem:\s*((?:id|links|tags|causes|state)\s*=\s*[^\s].*?)-->").unwrap());
|
||||
let source_re = SOURCE_RE.get_or_init(||
|
||||
Regex::new(r"<!--\s*source:\s*(.+?)\s*-->").unwrap());
|
||||
let md_link_re = MD_LINK_RE.get_or_init(||
|
||||
Regex::new(r"\[[^\]]*\]\(([^):]+(?:#[^)]*)?)\)").unwrap());
|
||||
|
||||
let markers: Vec<_> = marker_re.captures_iter(content)
|
||||
.map(|cap| {
|
||||
let full_match = cap.get(0).unwrap();
|
||||
let attrs_str = &cap[1];
|
||||
(full_match.start(), full_match.end(), parse_marker_attrs(attrs_str))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let find_source = |text: &str| -> Option<String> {
|
||||
source_re.captures(text).map(|c| c[1].trim().to_string())
|
||||
};
|
||||
|
||||
if markers.is_empty() {
|
||||
let source_ref = find_source(content);
|
||||
let md_links = extract_md_links(content, md_link_re, filename);
|
||||
return vec![MemoryUnit {
|
||||
key: filename.to_string(),
|
||||
content: content.to_string(),
|
||||
marker_links: Vec::new(),
|
||||
md_links,
|
||||
causes: Vec::new(),
|
||||
state: None,
|
||||
source_ref,
|
||||
}];
|
||||
}
|
||||
|
||||
let mut units = Vec::new();
|
||||
|
||||
let first_start = markers[0].0;
|
||||
let pre_content = content[..first_start].trim();
|
||||
if !pre_content.is_empty() {
|
||||
let source_ref = find_source(pre_content);
|
||||
let md_links = extract_md_links(pre_content, md_link_re, filename);
|
||||
units.push(MemoryUnit {
|
||||
key: filename.to_string(),
|
||||
content: pre_content.to_string(),
|
||||
marker_links: Vec::new(),
|
||||
md_links,
|
||||
causes: Vec::new(),
|
||||
state: None,
|
||||
source_ref,
|
||||
});
|
||||
}
|
||||
|
||||
for (i, (_, end, attrs)) in markers.iter().enumerate() {
|
||||
let unit_end = if i + 1 < markers.len() {
|
||||
markers[i + 1].0
|
||||
} else {
|
||||
content.len()
|
||||
};
|
||||
let unit_content = content[*end..unit_end].trim();
|
||||
|
||||
let id = attrs.get("id").cloned().unwrap_or_default();
|
||||
let key = if id.is_empty() {
|
||||
format!("{}#unnamed-{}", filename, i)
|
||||
} else {
|
||||
format!("{}#{}", filename, id)
|
||||
};
|
||||
|
||||
let marker_links = attrs.get("links")
|
||||
.map(|l| l.split(',').map(|s| normalize_link(s.trim(), filename)).collect())
|
||||
.unwrap_or_default();
|
||||
|
||||
let causes = attrs.get("causes")
|
||||
.map(|l| l.split(',').map(|s| normalize_link(s.trim(), filename)).collect())
|
||||
.unwrap_or_default();
|
||||
|
||||
let state = attrs.get("state").cloned();
|
||||
let source_ref = find_source(unit_content);
|
||||
let md_links = extract_md_links(unit_content, md_link_re, filename);
|
||||
|
||||
units.push(MemoryUnit {
|
||||
key,
|
||||
content: unit_content.to_string(),
|
||||
marker_links,
|
||||
md_links,
|
||||
causes,
|
||||
state,
|
||||
source_ref,
|
||||
});
|
||||
}
|
||||
|
||||
units
|
||||
}
|
||||
|
||||
fn parse_marker_attrs(attrs_str: &str) -> HashMap<String, String> {
|
||||
static ATTR_RE: OnceLock<Regex> = OnceLock::new();
|
||||
let attr_re = ATTR_RE.get_or_init(|| Regex::new(r"(\w+)\s*=\s*(\S+)").unwrap());
|
||||
let mut attrs = HashMap::new();
|
||||
for cap in attr_re.captures_iter(attrs_str) {
|
||||
attrs.insert(cap[1].to_string(), cap[2].to_string());
|
||||
}
|
||||
attrs
|
||||
}
|
||||
|
||||
fn extract_md_links(content: &str, re: &Regex, source_file: &str) -> Vec<String> {
|
||||
re.captures_iter(content)
|
||||
.map(|cap| normalize_link(&cap[1], source_file))
|
||||
.filter(|link| !link.starts_with(source_file) || link.contains('#'))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn normalize_link(target: &str, source_file: &str) -> String {
|
||||
let source_bare = source_file.strip_suffix(".md").unwrap_or(source_file);
|
||||
|
||||
if target.starts_with('#') {
|
||||
return format!("{}{}", source_bare, target);
|
||||
}
|
||||
|
||||
let (path_part, fragment) = if let Some(hash_pos) = target.find('#') {
|
||||
(&target[..hash_pos], Some(&target[hash_pos..]))
|
||||
} else {
|
||||
(target, None)
|
||||
};
|
||||
|
||||
let basename = Path::new(path_part)
|
||||
.file_name()
|
||||
.map(|f| f.to_string_lossy().to_string())
|
||||
.unwrap_or_else(|| path_part.to_string());
|
||||
let bare = basename.strip_suffix(".md").unwrap_or(&basename);
|
||||
|
||||
match fragment {
|
||||
Some(frag) => format!("{}{}", bare, frag),
|
||||
None => bare.to_string(),
|
||||
}
|
||||
}
|
||||
|
|
@ -318,11 +318,6 @@ enum AdminCmd {
|
|||
/// Brief metrics check (for cron/notifications)
|
||||
#[command(name = "daily-check")]
|
||||
DailyCheck,
|
||||
/// Import markdown file(s) into the store
|
||||
Import {
|
||||
/// File paths
|
||||
files: Vec<String>,
|
||||
},
|
||||
/// Output session-start context from the store
|
||||
#[command(name = "load-context")]
|
||||
LoadContext {
|
||||
|
|
@ -452,7 +447,6 @@ impl Run for AdminCmd {
|
|||
Self::Fsck => cli::admin::cmd_fsck().await,
|
||||
Self::Dedup { apply } => cli::admin::cmd_dedup(apply).await,
|
||||
Self::DailyCheck => cli::admin::cmd_daily_check().await,
|
||||
Self::Import { files } => cli::admin::cmd_import(&files).await,
|
||||
Self::LoadContext { stats } => cli::node::cmd_load_context(stats).await,
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue