more reorg

This commit is contained in:
Kent Overstreet 2026-04-05 01:48:11 -04:00
parent fcd77fb79e
commit 390b6c6c0a
8 changed files with 117 additions and 133 deletions

View file

@ -9,13 +9,107 @@
//
// The logfile is the history. The socket is the live wire.
use std::path::PathBuf;
use anyhow::{Context, Result};
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader as TokioBufReader};
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::{broadcast, Mutex};
use crate::user::ui_channel::UiMessage;
use crate::agent::context::ConversationEntry;
pub struct ConversationLog {
path: PathBuf,
}
impl ConversationLog {
pub fn new(path: PathBuf) -> Result<Self> {
// Ensure parent directory exists
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("creating log dir {}", parent.display()))?;
}
Ok(Self { path })
}
/// Append a conversation entry to the log.
pub fn append(&self, entry: &ConversationEntry) -> Result<()> {
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)
.with_context(|| format!("opening log {}", self.path.display()))?;
let line = serde_json::to_string(entry)
.context("serializing entry for log")?;
writeln!(file, "{}", line)
.context("writing to conversation log")?;
Ok(())
}
/// Read the tail of the log (last `max_bytes` bytes).
/// Seeks to `file_len - max_bytes`, skips the first partial line,
/// then parses forward. For logs smaller than `max_bytes`, reads everything.
pub fn read_tail(&self, max_bytes: u64) -> Result<Vec<ConversationEntry>> {
if !self.path.exists() {
return Ok(Vec::new());
}
let file = File::open(&self.path)
.with_context(|| format!("opening log {}", self.path.display()))?;
let file_len = file.metadata()?.len();
let mut reader = BufReader::new(file);
if file_len > max_bytes {
reader.seek(SeekFrom::Start(file_len - max_bytes))?;
// Skip partial first line
let mut discard = String::new();
reader.read_line(&mut discard)?;
}
let mut entries = Vec::new();
for line in reader.lines() {
let line = line.context("reading log tail")?;
let line = line.trim();
if line.is_empty() {
continue;
}
// Try ConversationEntry first (new format), fall back to bare Message (old logs)
if let Ok(entry) = serde_json::from_str::<ConversationEntry>(line) {
entries.push(entry);
}
}
Ok(entries)
}
pub fn path(&self) -> &Path {
&self.path
}
/// Get the timestamp of the oldest message in the log.
pub fn oldest_timestamp(&self) -> Option<chrono::DateTime<chrono::Utc>> {
let file = File::open(&self.path).ok()?;
let reader = BufReader::new(file);
for line in reader.lines().flatten() {
let line = line.trim().to_string();
if line.is_empty() { continue; }
if let Ok(entry) = serde_json::from_str::<ConversationEntry>(&line) {
if let Some(ts) = &entry.message().timestamp {
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(ts) {
return Some(dt.to_utc());
}
// Try other formats
if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(ts, "%Y-%m-%dT%H:%M:%S") {
return Some(dt.and_utc());
}
}
}
}
None
}
}
fn format_message(msg: &UiMessage) -> Option<String> {
match msg {
@ -115,7 +209,7 @@ pub async fn cmd_read_inner(follow: bool, block: bool, debug: bool) -> anyhow::R
))?;
let (reader, _) = stream.into_split();
let mut reader = BufReader::new(reader);
let mut reader = TokioBufReader::new(reader);
let mut line = String::new();
loop {
@ -268,7 +362,7 @@ pub fn start(
tokio::spawn(async move {
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
let mut reader = TokioBufReader::new(reader);
let mut input_buf = String::new();
loop {

View file

@ -6,7 +6,7 @@
pub mod dmn;
pub mod identity;
pub mod observe;
pub mod log;
// consciousness.rs — Session state machine and event loop
//
@ -30,7 +30,7 @@ use crate::agent::api::types as api_types;
use crate::config::{self, AppConfig, SessionConfig};
use crate::user::{self as tui, HotkeyAction};
use crate::user::ui_channel::{self, ContextInfo, StatusInfo, StreamTarget, UiMessage};
use crate::user::log;
use crate::subconscious::learn;
/// Compaction threshold — context is rebuilt when prompt tokens exceed this.
fn compaction_threshold(app: &AppConfig) -> u32 {
@ -76,7 +76,7 @@ pub struct Session {
// Subconscious orchestration
agent_cycles: crate::subconscious::subconscious::AgentCycleState,
/// Latest memory importance scores from full matrix scoring (manual /score).
memory_scores: Option<crate::agent::training::MemoryScore>,
memory_scores: Option<learn::MemoryScore>,
/// Whether a full matrix /score task is currently running.
scoring_in_flight: bool,
}
@ -233,7 +233,7 @@ impl Session {
(agent.context.clone(), agent.client_clone())
};
let result = crate::agent::training::score_memories_incremental(
let result = learn::score_memories_incremental(
&context, max_age as i64, response_window, &client, &ui_tx,
).await;
@ -407,7 +407,7 @@ impl Session {
let agent = self.agent.clone();
let ui_tx = self.ui_tx.clone();
tokio::spawn(async move {
let result = crate::agent::training::score_memories(
let result = learn::score_memories(
&context, &client, &ui_tx,
).await;
let agent = agent.lock().await;
@ -805,9 +805,9 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> {
// Start observation socket
let socket_path = session.config.session_dir.join("agent.sock");
let (observe_input_tx, mut observe_input_rx) = observe::input_channel();
let (observe_input_tx, mut observe_input_rx) = log::input_channel();
if !no_agents {
observe::start(socket_path, ui_tx.subscribe(), observe_input_tx);
log::start(socket_path, ui_tx.subscribe(), observe_input_tx);
}
let mut reader = EventStream::new();