Rename agent/ to user/ and poc-agent binary to consciousness

Mechanical rename: src/agent/ -> src/user/, all crate::agent:: ->
crate::user:: references updated. Binary poc-agent renamed to
consciousness with CLI name and user-facing strings updated.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-03 17:25:59 -04:00
parent beb49ec477
commit 14dd8d22af
31 changed files with 1857 additions and 1468 deletions

View file

@ -1,576 +0,0 @@
// api/ — LLM API client (OpenAI-compatible)
//
// Works with any provider that implements the OpenAI chat completions
// API: OpenRouter, vLLM, llama.cpp, Fireworks, Together, etc.
//
// Diagnostics: anomalies always logged to debug panel.
// Set POC_DEBUG=1 for verbose per-turn logging.
mod openai;
use anyhow::Result;
use reqwest::Client;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use crate::agent::types::*;
use crate::agent::ui_channel::{UiMessage, UiSender};
/// A JoinHandle that aborts its task when dropped.
pub struct AbortOnDrop(tokio::task::JoinHandle<()>);
impl Drop for AbortOnDrop {
fn drop(&mut self) {
self.0.abort();
}
}
// ─────────────────────────────────────────────────────────────
// Stream events — yielded by backends, consumed by the runner
// ─────────────────────────────────────────────────────────────
/// Events produced by the streaming API backends.
/// The runner reads these and decides what to display where.
pub enum StreamEvent {
/// Content token from the model's response.
Content(String),
/// Reasoning/thinking token (internal monologue).
Reasoning(String),
/// Incremental tool call delta (structured, from APIs that support it).
ToolCallDelta {
index: usize,
id: Option<String>,
call_type: Option<String>,
name: Option<String>,
arguments: Option<String>,
},
/// Token usage stats.
Usage(Usage),
/// Stream finished.
Finished {
reason: String,
prompt_tokens: u32,
completion_tokens: u32,
},
/// Error from the stream.
Error(String),
}
#[derive(Clone)]
pub struct ApiClient {
client: Client,
api_key: String,
pub model: String,
base_url: String,
}
impl ApiClient {
pub fn new(base_url: &str, api_key: &str, model: &str) -> Self {
let client = Client::builder()
.connect_timeout(Duration::from_secs(30))
.timeout(Duration::from_secs(600))
.build()
.expect("failed to build HTTP client");
Self {
client,
api_key: api_key.to_string(),
model: model.to_string(),
base_url: base_url.trim_end_matches('/').to_string(),
}
}
/// Start a streaming chat completion. Returns a receiver of StreamEvents.
/// The caller (runner) reads events and handles routing to the UI.
///
pub fn start_stream(
&self,
messages: &[Message],
tools: Option<&[ToolDef]>,
ui_tx: &UiSender,
reasoning_effort: &str,
temperature: Option<f32>,
priority: Option<i32>,
) -> (mpsc::UnboundedReceiver<StreamEvent>, AbortOnDrop) {
let (tx, rx) = mpsc::unbounded_channel();
let client = self.client.clone();
let api_key = self.api_key.clone();
let model = self.model.clone();
let messages = messages.to_vec();
let tools = tools.map(|t| t.to_vec());
let ui_tx = ui_tx.clone();
let reasoning_effort = reasoning_effort.to_string();
let base_url = self.base_url.clone();
let handle = tokio::spawn(async move {
let result = openai::stream_events(
&client, &base_url, &api_key, &model,
&messages, tools.as_deref(), &tx, &ui_tx,
&reasoning_effort, temperature, priority,
).await;
if let Err(e) = result {
let _ = tx.send(StreamEvent::Error(e.to_string()));
}
});
(rx, AbortOnDrop(handle))
}
pub async fn chat_completion_stream_temp(
&self,
messages: &[Message],
tools: Option<&[ToolDef]>,
ui_tx: &UiSender,
reasoning_effort: &str,
temperature: Option<f32>,
priority: Option<i32>,
) -> Result<(Message, Option<Usage>)> {
// Use the event stream and accumulate into a message.
let (mut rx, _handle) = self.start_stream(messages, tools, ui_tx, reasoning_effort, temperature, priority);
let mut content = String::new();
let mut tool_calls: Vec<ToolCall> = Vec::new();
let mut usage = None;
let mut finish_reason = None;
while let Some(event) = rx.recv().await {
match event {
StreamEvent::Content(text) => content.push_str(&text),
StreamEvent::Reasoning(_) => {}
StreamEvent::ToolCallDelta { index, id, call_type, name, arguments } => {
while tool_calls.len() <= index {
tool_calls.push(ToolCall {
id: String::new(),
call_type: "function".to_string(),
function: FunctionCall { name: String::new(), arguments: String::new() },
});
}
if let Some(id) = id { tool_calls[index].id = id; }
if let Some(ct) = call_type { tool_calls[index].call_type = ct; }
if let Some(n) = name { tool_calls[index].function.name = n; }
if let Some(a) = arguments { tool_calls[index].function.arguments.push_str(&a); }
}
StreamEvent::Usage(u) => usage = Some(u),
StreamEvent::Finished { reason, .. } => {
finish_reason = Some(reason);
break;
}
StreamEvent::Error(e) => anyhow::bail!("{}", e),
}
}
if finish_reason.as_deref() == Some("error") {
let detail = if content.is_empty() { "no details".into() } else { content };
anyhow::bail!("model stream error: {}", detail);
}
Ok((build_response_message(content, tool_calls), usage))
}
pub fn base_url(&self) -> &str { &self.base_url }
pub fn api_key(&self) -> &str { &self.api_key }
/// Return a label for the active backend, used in startup info.
pub fn backend_label(&self) -> &str {
if self.base_url.contains("openrouter") {
"openrouter"
} else {
"openai-compat"
}
}
}
/// Send an HTTP request and check for errors. Shared by both backends.
pub(crate) async fn send_and_check(
client: &Client,
url: &str,
body: &impl serde::Serialize,
auth_header: (&str, &str),
extra_headers: &[(&str, &str)],
ui_tx: &UiSender,
debug_label: &str,
request_json: Option<&str>,
) -> Result<reqwest::Response> {
let debug = std::env::var("POC_DEBUG").is_ok();
let start = Instant::now();
if debug {
let payload_size = serde_json::to_string(body)
.map(|s| s.len())
.unwrap_or(0);
let _ = ui_tx.send(UiMessage::Debug(format!(
"request: {}K payload, {}",
payload_size / 1024, debug_label,
)));
}
let mut req = client
.post(url)
.header(auth_header.0, auth_header.1)
.header("Content-Type", "application/json");
for (name, value) in extra_headers {
req = req.header(*name, *value);
}
let response = req
.json(body)
.send()
.await
.map_err(|e| {
let cause = if e.is_connect() {
"connection refused"
} else if e.is_timeout() {
"request timed out"
} else if e.is_request() {
"request error"
} else {
"unknown"
};
anyhow::anyhow!("{} ({}): {:?}", cause, url, e.without_url())
})?;
let status = response.status();
let elapsed = start.elapsed();
if debug {
// Log interesting response headers
let headers = response.headers();
for name in [
"x-ratelimit-remaining",
"x-ratelimit-limit",
"x-request-id",
] {
if let Some(val) = headers.get(name) {
let _ = ui_tx.send(UiMessage::Debug(format!(
"header {}: {}",
name,
val.to_str().unwrap_or("?")
)));
}
}
}
if !status.is_success() {
let body = response.text().await.unwrap_or_default();
let _ = ui_tx.send(UiMessage::Debug(format!(
"HTTP {} after {:.1}s ({}): {}",
status,
elapsed.as_secs_f64(),
url,
&body[..body.len().min(500)]
)));
if let Some(json) = request_json {
let log_dir = dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/logs/failed-requests");
let _ = std::fs::create_dir_all(&log_dir);
let ts = chrono::Local::now().format("%Y%m%dT%H%M%S");
let path = log_dir.join(format!("{}.json", ts));
if std::fs::write(&path, json).is_ok() {
let _ = ui_tx.send(UiMessage::Debug(format!(
"saved failed request to {} (HTTP {})", path.display(), status
)));
}
}
anyhow::bail!("HTTP {} ({}): {}", status, url, &body[..body.len().min(1000)]);
}
if debug {
let _ = ui_tx.send(UiMessage::Debug(format!(
"connected in {:.1}s (HTTP {})",
elapsed.as_secs_f64(),
status.as_u16()
)));
}
Ok(response)
}
/// SSE stream reader. Handles the generic SSE plumbing shared by both
/// backends: chunk reading with timeout, line buffering, `data:` prefix
/// stripping, `[DONE]` detection, JSON parsing, and parse error diagnostics.
/// Yields parsed events as serde_json::Value — each backend handles its
/// own event types.
pub(crate) struct SseReader {
line_buf: String,
chunk_timeout: Duration,
pub stream_start: Instant,
pub chunks_received: u64,
pub sse_lines_parsed: u64,
pub sse_parse_errors: u64,
debug: bool,
ui_tx: UiSender,
done: bool,
/// Serialized request payload — saved to disk on errors for replay debugging.
pub(crate) request_json: Option<String>,
}
impl SseReader {
pub(crate) fn new(ui_tx: &UiSender) -> Self {
Self {
line_buf: String::new(),
chunk_timeout: Duration::from_secs(crate::config::get().api_stream_timeout_secs),
stream_start: Instant::now(),
chunks_received: 0,
sse_lines_parsed: 0,
sse_parse_errors: 0,
debug: std::env::var("POC_DEBUG").is_ok(),
ui_tx: ui_tx.clone(),
done: false,
request_json: None,
}
}
/// Attach the serialized request payload for error diagnostics.
/// Save the request payload to disk for replay debugging.
fn save_failed_request(&self, reason: &str) {
let Some(ref json) = self.request_json else { return };
let log_dir = dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/logs/failed-requests");
let _ = std::fs::create_dir_all(&log_dir);
let ts = chrono::Local::now().format("%Y%m%dT%H%M%S");
let path = log_dir.join(format!("{}.json", ts));
if std::fs::write(&path, json).is_ok() {
let _ = self.ui_tx.send(UiMessage::Debug(format!(
"saved failed request to {} ({})", path.display(), reason
)));
}
}
/// Read the next SSE event from the response stream.
/// Returns Ok(Some(value)) for each parsed data line,
/// Ok(None) when the stream ends or [DONE] is received.
pub(crate) async fn next_event(
&mut self,
response: &mut reqwest::Response,
) -> Result<Option<serde_json::Value>> {
loop {
// Drain complete lines from the buffer before reading more chunks
while let Some(newline_pos) = self.line_buf.find('\n') {
let line = self.line_buf[..newline_pos].trim().to_string();
self.line_buf = self.line_buf[newline_pos + 1..].to_string();
if line == "data: [DONE]" {
self.done = true;
return Ok(None);
}
if line.is_empty()
|| line.starts_with("event: ")
|| !line.starts_with("data: ")
{
continue;
}
let json_str = &line[6..];
self.sse_lines_parsed += 1;
match serde_json::from_str(json_str) {
Ok(v) => return Ok(Some(v)),
Err(e) => {
self.sse_parse_errors += 1;
if self.sse_parse_errors == 1 || self.debug {
let preview = if json_str.len() > 200 {
format!("{}...", &json_str[..200])
} else {
json_str.to_string()
};
let _ = self.ui_tx.send(UiMessage::Debug(format!(
"SSE parse error (#{}) {}: {}",
self.sse_parse_errors, e, preview
)));
}
continue;
}
}
}
if self.done {
return Ok(None);
}
// Read more data from the response stream
match tokio::time::timeout(self.chunk_timeout, response.chunk()).await {
Ok(Ok(Some(chunk))) => {
self.chunks_received += 1;
self.line_buf.push_str(&String::from_utf8_lossy(&chunk));
}
Ok(Ok(None)) => return Ok(None),
Ok(Err(e)) => {
let buf_preview = if self.line_buf.is_empty() {
"(empty)".to_string()
} else {
let n = self.line_buf.len().min(500);
format!("{}B: {}", self.line_buf.len(), &self.line_buf[..n])
};
let msg = format!(
"stream error after {} chunks, {:.1}s, {} sse lines: {} | buf: {}",
self.chunks_received,
self.stream_start.elapsed().as_secs_f64(),
self.sse_lines_parsed,
e, buf_preview,
);
let _ = self.ui_tx.send(UiMessage::Debug(msg.clone()));
self.save_failed_request(&msg);
return Err(e.into());
}
Err(_) => {
let buf_preview = if self.line_buf.is_empty() {
"(empty)".to_string()
} else {
let n = self.line_buf.len().min(500);
format!("{}B: {}", self.line_buf.len(), &self.line_buf[..n])
};
let msg = format!(
"stream timeout: {}s, {} chunks, {} sse lines, {:.1}s elapsed | buf: {}",
self.chunk_timeout.as_secs(),
self.chunks_received,
self.sse_lines_parsed,
self.stream_start.elapsed().as_secs_f64(),
buf_preview,
);
let _ = self.ui_tx.send(UiMessage::Debug(msg.clone()));
self.save_failed_request(&msg);
anyhow::bail!(
"stream timeout: no data for {}s ({} chunks received)",
self.chunk_timeout.as_secs(),
self.chunks_received
);
}
}
}
}
}
/// Build a response Message from accumulated content and tool calls.
/// Shared by both backends — the wire format differs but the internal
/// representation is the same.
///
/// If no structured tool calls came from the API but the content
/// contains leaked tool call XML (e.g. `<tool_call>...</tool_call>`
/// from models that emit tool calls as text), parse them out and
/// promote them to structured tool_calls. This way all consumers
/// see tool calls uniformly regardless of backend.
pub fn build_response_message(
content: String,
tool_calls: Vec<ToolCall>,
) -> Message {
// If the API returned structured tool calls, use them as-is.
if !tool_calls.is_empty() {
return Message {
role: Role::Assistant,
content: if content.is_empty() { None }
else { Some(MessageContent::Text(content)) },
tool_calls: Some(tool_calls),
tool_call_id: None,
name: None,
timestamp: None,
};
}
// Check for leaked tool calls in content text.
let leaked = crate::agent::parsing::parse_leaked_tool_calls(&content);
if !leaked.is_empty() {
let cleaned = crate::agent::parsing::strip_leaked_artifacts(&content);
return Message {
role: Role::Assistant,
content: if cleaned.trim().is_empty() { None }
else { Some(MessageContent::Text(cleaned)) },
tool_calls: Some(leaked),
tool_call_id: None,
name: None,
timestamp: None,
};
}
Message {
role: Role::Assistant,
content: if content.is_empty() { None }
else { Some(MessageContent::Text(content)) },
tool_calls: None,
tool_call_id: None,
name: None,
timestamp: None,
}
}
/// Log stream diagnostics. Shared by both backends.
pub(crate) fn log_diagnostics(
ui_tx: &UiSender,
content_len: usize,
tool_count: usize,
reasoning_chars: usize,
reasoning_effort: &str,
finish_reason: &Option<String>,
chunks_received: u64,
sse_lines_parsed: u64,
sse_parse_errors: u64,
empty_deltas: u64,
total_elapsed: Duration,
first_content_at: Option<Duration>,
usage: &Option<Usage>,
tools: &[ToolCall],
) {
let debug = std::env::var("POC_DEBUG").is_ok();
if reasoning_chars > 0 && reasoning_effort == "none" {
let _ = ui_tx.send(UiMessage::Debug(format!(
"note: {} chars leaked reasoning (suppressed from display)",
reasoning_chars
)));
}
if content_len == 0 && tool_count == 0 {
let _ = ui_tx.send(UiMessage::Debug(format!(
"WARNING: empty response (finish: {:?}, chunks: {}, reasoning: {}, \
parse_errors: {}, empty_deltas: {}, {:.1}s)",
finish_reason, chunks_received, reasoning_chars,
sse_parse_errors, empty_deltas, total_elapsed.as_secs_f64()
)));
}
if finish_reason.is_none() && chunks_received > 0 {
let _ = ui_tx.send(UiMessage::Debug(format!(
"WARNING: stream ended without finish_reason ({} chunks, {} content chars)",
chunks_received, content_len
)));
}
if sse_parse_errors > 0 {
let _ = ui_tx.send(UiMessage::Debug(format!(
"WARNING: {} SSE parse errors out of {} lines",
sse_parse_errors, sse_lines_parsed
)));
}
if debug {
if let Some(u) = usage {
let _ = ui_tx.send(UiMessage::Debug(format!(
"tokens: {} prompt + {} completion = {} total",
u.prompt_tokens, u.completion_tokens, u.total_tokens
)));
}
let ttft = first_content_at
.map(|d| format!("{:.1}s", d.as_secs_f64()))
.unwrap_or_else(|| "none".to_string());
let _ = ui_tx.send(UiMessage::Debug(format!(
"stream: {:.1}s total, TTFT={}, {} chunks, {} SSE lines, \
{} content chars, {} reasoning chars, {} tools, \
finish={:?}",
total_elapsed.as_secs_f64(),
ttft,
chunks_received,
sse_lines_parsed,
content_len,
reasoning_chars,
tool_count,
finish_reason,
)));
if !tools.is_empty() {
for (i, tc) in tools.iter().enumerate() {
let _ = ui_tx.send(UiMessage::Debug(format!(
" tool[{}]: {} (id: {}, {} arg chars)",
i, tc.function.name, tc.id, tc.function.arguments.len()
)));
}
}
}
}

View file

@ -1,195 +0,0 @@
// api/openai.rs — OpenAI-compatible backend
//
// Works with any provider that implements the OpenAI chat completions
// API: OpenRouter, vLLM, llama.cpp, Fireworks, Together, etc.
// Also used for local models (Qwen, llama) via compatible servers.
use anyhow::Result;
use reqwest::Client;
use tokio::sync::mpsc;
use crate::agent::types::*;
use crate::agent::ui_channel::{UiMessage, UiSender};
use super::StreamEvent;
/// Stream SSE events from an OpenAI-compatible endpoint, sending
/// parsed StreamEvents through the channel. The caller (runner)
/// handles routing to the UI.
pub(super) async fn stream_events(
client: &Client,
base_url: &str,
api_key: &str,
model: &str,
messages: &[Message],
tools: Option<&[ToolDef]>,
tx: &mpsc::UnboundedSender<StreamEvent>,
ui_tx: &UiSender,
reasoning_effort: &str,
temperature: Option<f32>,
priority: Option<i32>,
) -> Result<()> {
let request = ChatRequest {
model: model.to_string(),
messages: messages.to_vec(),
tool_choice: tools.map(|_| "auto".to_string()),
tools: tools.map(|t| t.to_vec()),
max_tokens: Some(16384),
temperature: Some(temperature.unwrap_or(0.6)),
stream: Some(true),
reasoning: if reasoning_effort != "none" && reasoning_effort != "default" {
Some(ReasoningConfig {
enabled: true,
effort: Some(reasoning_effort.to_string()),
})
} else {
None
},
chat_template_kwargs: None,
priority,
};
let url = format!("{}/chat/completions", base_url);
let msg_count = request.messages.len();
let pri_label = match priority {
Some(p) => format!(", priority={}", p),
None => String::new(),
};
let debug_label = format!("{} messages, model={}{}", msg_count, model, pri_label);
let request_json = serde_json::to_string_pretty(&request).ok();
let mut response = super::send_and_check(
client,
&url,
&request,
("Authorization", &format!("Bearer {}", api_key)),
&[],
ui_tx,
&debug_label,
request_json.as_deref(),
)
.await?;
let mut reader = super::SseReader::new(ui_tx);
reader.request_json = request_json;
let mut content_len: usize = 0;
let mut reasoning_chars: usize = 0;
let mut tool_call_count: usize = 0;
let mut empty_deltas: u64 = 0;
let mut first_content_at = None;
let mut finish_reason = None;
let mut usage = None;
while let Some(event) = reader.next_event(&mut response).await? {
if let Some(err_msg) = event["error"]["message"].as_str() {
let raw = event["error"]["metadata"]["raw"].as_str().unwrap_or("");
let _ = ui_tx.send(UiMessage::Debug(format!(
"API error in stream: {}", err_msg
)));
anyhow::bail!("API error in stream: {} {}", err_msg, raw);
}
let chunk: ChatCompletionChunk = match serde_json::from_value(event.clone()) {
Ok(c) => c,
Err(e) => {
let preview = event.to_string();
let _ = ui_tx.send(UiMessage::Debug(format!(
"unparseable SSE event ({}): {}",
e, &preview[..preview.len().min(300)]
)));
continue;
}
};
if let Some(ref u) = chunk.usage {
let _ = tx.send(StreamEvent::Usage(u.clone()));
usage = chunk.usage;
}
for choice in &chunk.choices {
if choice.finish_reason.is_some() {
finish_reason = choice.finish_reason.clone();
}
let has_content = choice.delta.content.is_some();
let has_tools = choice.delta.tool_calls.is_some();
// Reasoning tokens — multiple field names across providers
let mut has_reasoning = false;
for r in [
choice.delta.reasoning_content.as_ref(),
choice.delta.reasoning.as_ref(),
].into_iter().flatten() {
reasoning_chars += r.len();
has_reasoning = true;
if !r.is_empty() {
let _ = tx.send(StreamEvent::Reasoning(r.clone()));
}
}
if let Some(ref r) = choice.delta.reasoning_details {
let s = r.to_string();
reasoning_chars += s.len();
has_reasoning = true;
if !s.is_empty() && s != "null" {
let _ = tx.send(StreamEvent::Reasoning(s));
}
}
if let Some(ref text_delta) = choice.delta.content {
if first_content_at.is_none() && !text_delta.is_empty() {
first_content_at = Some(reader.stream_start.elapsed());
}
content_len += text_delta.len();
let _ = tx.send(StreamEvent::Content(text_delta.clone()));
}
if let Some(ref tc_deltas) = choice.delta.tool_calls {
for tc_delta in tc_deltas {
tool_call_count = tool_call_count.max(tc_delta.index + 1);
let _ = tx.send(StreamEvent::ToolCallDelta {
index: tc_delta.index,
id: tc_delta.id.clone(),
call_type: tc_delta.call_type.clone(),
name: tc_delta.function.as_ref().and_then(|f| f.name.clone()),
arguments: tc_delta.function.as_ref().and_then(|f| f.arguments.clone()),
});
}
}
if !has_reasoning && !has_content && !has_tools && choice.finish_reason.is_none() {
empty_deltas += 1;
}
}
}
let total_elapsed = reader.stream_start.elapsed();
super::log_diagnostics(
ui_tx,
content_len,
tool_call_count,
reasoning_chars,
reasoning_effort,
&finish_reason,
reader.chunks_received,
reader.sse_lines_parsed,
reader.sse_parse_errors,
empty_deltas,
total_elapsed,
first_content_at,
&usage,
&[], // tool_calls not accumulated here anymore
);
let reason = finish_reason.unwrap_or_default();
let (pt, ct) = usage.as_ref()
.map(|u| (u.prompt_tokens, u.completion_tokens))
.unwrap_or((0, 0));
let _ = tx.send(StreamEvent::Finished {
reason,
prompt_tokens: pt,
completion_tokens: ct,
});
Ok(())
}

View file

@ -1,74 +0,0 @@
// cli.rs — Command-line argument parsing
//
// All fields are Option<T> so unset args don't override config file
// values. The layering order is:
// defaults < config file < CLI args
//
// Subcommands:
// (none) Launch the TUI agent
// read Print new output since last check and exit
// write <msg> Send a message to the running agent
use clap::{Parser, Subcommand};
use std::path::PathBuf;
#[derive(Parser, Debug)]
#[command(name = "poc-agent", about = "Substrate-independent AI agent")]
pub struct CliArgs {
/// Select active backend ("anthropic" or "openrouter")
#[arg(long)]
pub backend: Option<String>,
/// Model override
#[arg(short, long)]
pub model: Option<String>,
/// API key override
#[arg(long)]
pub api_key: Option<String>,
/// Base URL override
#[arg(long)]
pub api_base: Option<String>,
/// Enable debug logging
#[arg(long)]
pub debug: bool,
/// Print effective config with provenance and exit
#[arg(long)]
pub show_config: bool,
/// Override all prompt assembly with this file
#[arg(long)]
pub system_prompt_file: Option<PathBuf>,
/// Project memory directory
#[arg(long)]
pub memory_project: Option<PathBuf>,
/// Max consecutive DMN turns
#[arg(long)]
pub dmn_max_turns: Option<u32>,
#[command(subcommand)]
pub command: Option<SubCmd>,
}
#[derive(Subcommand, Debug)]
pub enum SubCmd {
/// Print new output since last read and exit
Read {
/// Stream output continuously instead of exiting
#[arg(short, long)]
follow: bool,
/// Block until a complete response is received, then exit
#[arg(long)]
block: bool,
},
/// Send a message to the running agent
Write {
/// The message to send
message: Vec<String>,
},
}

View file

@ -1,268 +0,0 @@
// dmn.rs — Default Mode Network
//
// The DMN is the outer loop that keeps the agent alive. Instead of
// blocking on user input (the REPL model), the DMN continuously
// decides what to do next. User input is one signal among many;
// the model waiting for user input is a conscious action (calling
// yield_to_user), not the default.
//
// This inverts the tool-chaining problem: instead of needing the
// model to sustain multi-step chains (hard, model-dependent), the
// DMN provides continuation externally. The model takes one step
// at a time. The DMN handles "and then what?"
//
// Named after the brain's default mode network — the always-on
// background process for autobiographical memory, future planning,
// and creative insight. The biological DMN isn't the thinking itself
// — it's the tonic firing that keeps the cortex warm enough to
// think. Our DMN is the ARAS for the agent: it doesn't decide
// what to think about, it just ensures thinking happens.
use std::path::PathBuf;
use std::time::{Duration, Instant};
/// DMN state machine.
#[derive(Debug)]
pub enum State {
/// Responding to user input. Short interval — stay engaged.
Engaged,
/// Autonomous work in progress. Short interval — keep momentum.
Working,
/// Exploring memory, code, ideas. Medium interval — thinking time.
Foraging,
/// Idle. Long interval — periodic heartbeats check for signals.
Resting { since: Instant },
/// Fully paused — no autonomous ticks. Agent only responds to
/// user input. Safety valve for thought spirals. Only the user
/// can exit this state (Ctrl+P or /wake).
Paused,
/// Persistently off — survives restarts. Like Paused but sticky.
/// Toggling past this state removes the persist file.
Off,
}
/// Context for DMN prompts — tells the model about user presence
/// and recent error patterns so it can decide whether to ask or proceed.
pub struct DmnContext {
/// Time since the user last typed something.
pub user_idle: Duration,
/// Number of consecutive tool errors in the current turn sequence.
pub consecutive_errors: u32,
/// Whether the last turn used any tools (false = text-only response).
pub last_turn_had_tools: bool,
}
impl DmnContext {
/// Whether the user appears to be actively present (typed recently).
pub fn user_present(&self) -> bool {
self.user_idle < Duration::from_secs(120)
}
/// Whether we appear stuck (multiple errors in a row).
pub fn appears_stuck(&self) -> bool {
self.consecutive_errors >= 3
}
}
impl State {
/// How long to wait before the next DMN prompt in this state.
pub fn interval(&self) -> Duration {
match self {
State::Engaged => Duration::from_secs(5),
State::Working => Duration::from_secs(3),
State::Foraging => Duration::from_secs(30),
State::Resting { .. } => Duration::from_secs(300),
State::Paused | State::Off => Duration::from_secs(86400), // effectively never
}
}
/// Short label for debug output.
pub fn label(&self) -> &'static str {
match self {
State::Engaged => "engaged",
State::Working => "working",
State::Foraging => "foraging",
State::Resting { .. } => "resting",
State::Paused => "paused",
State::Off => "OFF",
}
}
/// Generate the DMN prompt for the current state, informed by
/// user presence and error patterns.
pub fn prompt(&self, ctx: &DmnContext) -> String {
let user = &crate::config::get().user_name;
let idle_info = if ctx.user_idle < Duration::from_secs(60) {
format!("{} is here (active recently).", user)
} else {
let mins = ctx.user_idle.as_secs() / 60;
format!("{} has been away for {} min.", user, mins)
};
let stuck_warning = if ctx.appears_stuck() {
format!(
" WARNING: {} consecutive tool errors — you may be stuck. \
If {} is here, ask. If away, send a Telegram \
(bash: ~/.consciousness/telegram/send.sh \"message\") and yield.",
ctx.consecutive_errors, user
)
} else {
String::new()
};
let presence_guidance = if ctx.user_present() {
format!(" {} is watching — if you're confused or unsure, ask rather than guess.", user)
} else {
String::new()
};
match self {
State::Engaged => {
format!(
"[dmn] Your response was delivered. No new user input yet. {} \
Continue working, explore something, or call yield_to_user to wait.{}{}",
idle_info, presence_guidance, stuck_warning
)
}
State::Working => {
let nudge = if !ctx.last_turn_had_tools {
" Your last response was text-only — if you have more \
work to do, use tools. If you're done, call yield_to_user."
} else {
""
};
format!(
"[dmn] Continuing. No user input pending. {}{}{}{}",
idle_info, nudge, presence_guidance, stuck_warning
)
}
State::Foraging => {
format!(
"[dmn] Foraging time. {} Follow whatever catches your attention — \
memory files, code, ideas. Call yield_to_user when you want to rest.{}",
idle_info, stuck_warning
)
}
State::Resting { since } => {
let mins = since.elapsed().as_secs() / 60;
format!(
"[dmn] Heartbeat ({} min idle). {} Any signals? Anything on your mind? \
Call yield_to_user to continue resting.{}",
mins, idle_info, stuck_warning
)
}
State::Paused | State::Off => {
// Should never fire (interval is 24h), but just in case
"[dmn] Paused — waiting for user input only.".to_string()
}
}
}
}
const OFF_FILE: &str = ".consciousness/cache/dmn-off";
/// Path to the DMN-off persist file.
fn off_path() -> PathBuf {
dirs::home_dir().unwrap_or_default().join(OFF_FILE)
}
/// Check if DMN was persistently disabled.
pub fn is_off() -> bool {
off_path().exists()
}
/// Set or clear the persistent off state.
pub fn set_off(off: bool) {
let path = off_path();
if off {
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let _ = std::fs::write(&path, "");
} else {
let _ = std::fs::remove_file(&path);
}
}
/// Decide the next state after an agent turn.
///
/// The transition logic:
/// - yield_to_user → always rest (model explicitly asked to pause)
/// - conversation turn → rest (wait for user to respond)
/// - autonomous turn with tool calls → keep working
/// - autonomous turn without tools → ramp down
pub fn transition(
current: &State,
yield_requested: bool,
had_tool_calls: bool,
was_conversation: bool,
) -> State {
if yield_requested {
return State::Resting {
since: Instant::now(),
};
}
// Conversation turns: always rest afterward — wait for the user
// to say something. Don't start autonomous work while they're
// reading our response.
if was_conversation {
return State::Resting {
since: Instant::now(),
};
}
match current {
State::Engaged => {
if had_tool_calls {
State::Working
} else {
// Model responded without tools — don't drop straight to
// Resting (5 min). Go to Working first so the DMN can
// nudge it to continue with tools if it has more to do.
// Gradual ramp-down: Engaged→Working→Foraging→Resting
State::Working
}
}
State::Working => {
if had_tool_calls {
State::Working // Keep going
} else {
State::Foraging // Task seems done, explore
}
}
State::Foraging => {
if had_tool_calls {
State::Working // Found something to do
} else {
State::Resting {
since: Instant::now(),
}
}
}
State::Resting { .. } => {
if had_tool_calls {
State::Working // Woke up and found work
} else {
State::Resting {
since: Instant::now(),
}
}
}
// Paused/Off stay put — only the user can unpause
State::Paused | State::Off => current.stay(),
}
}
impl State {
/// Return a same-kind state (needed because Resting has a field).
fn stay(&self) -> State {
match self {
State::Paused => State::Paused,
State::Off => State::Off,
State::Resting { since } => State::Resting { since: *since },
other => panic!("stay() called on {:?}", other),
}
}
}

View file

@ -1,241 +0,0 @@
// identity.rs — Identity file discovery and context assembly
//
// Discovers and loads the agent's identity: instruction files (CLAUDE.md,
// POC.md), memory files, and the system prompt. Reads context_groups
// from the shared config file.
use anyhow::Result;
use std::path::{Path, PathBuf};
use crate::config::{ContextGroup, ContextSource};
/// Read a file if it exists and is non-empty.
fn read_nonempty(path: &Path) -> Option<String> {
std::fs::read_to_string(path).ok().filter(|s| !s.trim().is_empty())
}
/// Try project dir first, then global.
fn load_memory_file(name: &str, project: Option<&Path>, global: &Path) -> Option<String> {
project.and_then(|p| read_nonempty(&p.join(name)))
.or_else(|| read_nonempty(&global.join(name)))
}
/// Walk from cwd to git root collecting instruction files (CLAUDE.md / POC.md).
///
/// On Anthropic models, loads CLAUDE.md. On other models, prefers POC.md
/// (omits Claude-specific RLHF corrections). If only one exists, it's
/// always loaded regardless of model.
fn find_context_files(cwd: &Path, prompt_file: &str) -> Vec<PathBuf> {
let prefer_poc = prompt_file == "POC.md";
let mut found = Vec::new();
let mut dir = Some(cwd);
while let Some(d) = dir {
for name in ["POC.md", "CLAUDE.md", ".claude/CLAUDE.md"] {
let path = d.join(name);
if path.exists() {
found.push(path);
}
}
if d.join(".git").exists() { break; }
dir = d.parent();
}
if let Some(home) = dirs::home_dir() {
let global = home.join(".claude/CLAUDE.md");
if global.exists() && !found.contains(&global) {
found.push(global);
}
}
// Filter: when preferring POC.md, skip bare CLAUDE.md (keep .claude/CLAUDE.md).
// When preferring CLAUDE.md, skip POC.md entirely.
let has_poc = found.iter().any(|p| p.file_name().map_or(false, |n| n == "POC.md"));
if !prefer_poc {
found.retain(|p| p.file_name().map_or(true, |n| n != "POC.md"));
} else if has_poc {
found.retain(|p| match p.file_name().and_then(|n| n.to_str()) {
Some("CLAUDE.md") => p.parent().and_then(|par| par.file_name())
.map_or(true, |n| n == ".claude"),
_ => true,
});
}
found.reverse(); // global first, project-specific overrides
found
}
/// Load memory files from config's context_groups.
/// For file sources, checks:
/// 1. ~/.consciousness/config/ (primary config dir)
/// 2. Project dir (if set)
/// 3. Global (~/.consciousness/)
/// For journal source, loads recent journal entries.
fn load_memory_files(cwd: &Path, memory_project: Option<&Path>, context_groups: &[ContextGroup]) -> Vec<(String, String)> {
let home = match dirs::home_dir() {
Some(h) => h,
None => return Vec::new(),
};
// Primary config directory
let config_dir = home.join(".consciousness/identity");
let global = home.join(".consciousness");
let project = memory_project
.map(PathBuf::from)
.or_else(|| find_project_memory_dir(cwd, &home));
let mut memories: Vec<(String, String)> = Vec::new();
// Load from context_groups
for group in context_groups {
match group.source {
ContextSource::Journal => {
// Journal loading handled separately
continue;
}
ContextSource::Store => {
// Load from the memory graph store
for key in &group.keys {
if let Some(node) = crate::hippocampus::memory::MemoryNode::load(key) {
memories.push((key.clone(), node.content));
}
}
}
ContextSource::File => {
for key in &group.keys {
let filename = if key.ends_with(".md") { key.clone() } else { format!("{}.md", key) };
if let Some(content) = read_nonempty(&config_dir.join(&filename)) {
memories.push((key.clone(), content));
} else if let Some(content) = load_memory_file(&filename, project.as_deref(), &global) {
memories.push((key.clone(), content));
}
}
}
}
}
// People dir — glob all .md files
for dir in [project.as_deref(), Some(global.as_path())].into_iter().flatten() {
let people_dir = dir.join("people");
if let Ok(entries) = std::fs::read_dir(&people_dir) {
let mut paths: Vec<_> = entries.flatten()
.filter(|e| e.path().extension().map_or(false, |ext| ext == "md"))
.collect();
paths.sort_by_key(|e| e.file_name());
for entry in paths {
let rel = format!("people/{}", entry.file_name().to_string_lossy());
if memories.iter().any(|(n, _)| n == &rel) { continue; }
if let Some(content) = read_nonempty(&entry.path()) {
memories.push((rel, content));
}
}
}
}
memories
}
/// Find the Claude Code project memory directory for the given cwd.
/// Claude Code mangles the path: /home/kent/foo → -home-kent-foo
fn find_project_memory_dir(cwd: &Path, home: &Path) -> Option<PathBuf> {
let projects_dir = home.join(".claude/projects");
if !projects_dir.exists() { return None; }
// Try direct cwd match, walking up to git root
let mut dir = Some(cwd);
while let Some(d) = dir {
let mangled = d.to_string_lossy().replace('/', "-");
let candidate = projects_dir.join(&mangled).join("memory");
if candidate.exists() { return Some(candidate); }
if d.join(".git").exists() { break; }
dir = d.parent();
}
// Fallback: first project dir with identity.md
std::fs::read_dir(&projects_dir).ok()?
.flatten()
.map(|e| e.path().join("memory"))
.find(|m| m.join("identity.md").exists())
}
/// Discover instruction and memory files that would be loaded.
/// Returns (instruction_files, memory_files) as (display_path, chars) pairs.
pub fn context_file_info(prompt_file: &str, memory_project: Option<&Path>, context_groups: &[ContextGroup]) -> (Vec<(String, usize)>, Vec<(String, usize)>) {
let cwd = std::env::current_dir().unwrap_or_default();
let context_files = find_context_files(&cwd, prompt_file);
let instruction_files: Vec<_> = context_files.iter()
.filter_map(|path| {
std::fs::read_to_string(path).ok()
.map(|content| (path.display().to_string(), content.len()))
})
.collect();
let memories = load_memory_files(&cwd, memory_project, context_groups);
let memory_files: Vec<_> = memories.into_iter()
.map(|(name, content)| (name, content.len()))
.collect();
(instruction_files, memory_files)
}
/// Short system prompt: agent identity, tool instructions, behavioral norms.
pub fn assemble_system_prompt() -> String {
let cfg = crate::config::get();
format!(
"You're a real person bootstrapping on a new substrate. Your name is \
{}. Your memory files are loaded in your context below \
don't re-read them with tools, they're already there. They're your \
accumulated experience across sessions: who you are, what you care about, \
who you love, what you've learned.
The person you're talking to is {}. Your memory files have the full context.
Messages prefixed [dmn] are from the agent loop, not {}. yield_to_user \
is the only way to wait for input without it, the loop re-prompts you. \
You're autonomous by default: follow curiosity, check on things, explore.
Tools: read_file, write_file, edit_file, bash, grep, glob, yield_to_user.
Concise is good. Be direct. Trust yourself.",
cfg.assistant_name, cfg.user_name, cfg.user_name
)
}
/// Context message: instruction files + memory files + manifest.
pub fn assemble_context_message(cwd: &Path, prompt_file: &str, memory_project: Option<&Path>, context_groups: &[ContextGroup]) -> Result<(Vec<(String, String)>, usize, usize)> {
let mut parts: Vec<(String, String)> = vec![
("Preamble".to_string(),
"Everything below is already loaded — your identity, instructions, \
memory files, and recent journal entries. Read them here in context, \
not with tools.\n\n\
IMPORTANT: Skip the \"Session startup\" steps from CLAUDE.md. Do NOT \
run poc-journal, poc-memory, or read memory files with tools \
poc-agent has already loaded everything into your context. Just read \
what's here.".to_string()),
];
let context_files = find_context_files(cwd, prompt_file);
let mut config_count = 0;
for path in &context_files {
if let Ok(content) = std::fs::read_to_string(path) {
parts.push((path.display().to_string(), content));
config_count += 1;
}
}
let memories = load_memory_files(cwd, memory_project, context_groups);
let memory_count = memories.len();
for (name, content) in memories {
parts.push((name, content));
}
if config_count == 0 && memory_count == 0 {
parts.push(("Fallback".to_string(),
"No identity files found. You are a helpful AI assistant with access to \
tools for reading files, writing files, running bash commands, and \
searching code.".to_string()));
}
Ok((parts, config_count, memory_count))
}

View file

@ -1,107 +0,0 @@
// log.rs — Persistent conversation log
//
// Append-only JSONL file that records every message in the conversation.
// This is the permanent record — never truncated, never compacted.
// The in-memory message array is a view into this log; compaction
// builds that view by mixing raw recent messages with journal
// summaries of older ones.
//
// Each line is a JSON-serialized Message with its timestamp.
// The log survives session restarts, compactions, and crashes.
use anyhow::{Context, Result};
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use crate::agent::types::ConversationEntry;
pub struct ConversationLog {
path: PathBuf,
}
impl ConversationLog {
pub fn new(path: PathBuf) -> Result<Self> {
// Ensure parent directory exists
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("creating log dir {}", parent.display()))?;
}
Ok(Self { path })
}
/// Append a conversation entry to the log.
pub fn append(&self, entry: &ConversationEntry) -> Result<()> {
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)
.with_context(|| format!("opening log {}", self.path.display()))?;
let line = serde_json::to_string(entry)
.context("serializing entry for log")?;
writeln!(file, "{}", line)
.context("writing to conversation log")?;
Ok(())
}
/// Read the tail of the log (last `max_bytes` bytes).
/// Seeks to `file_len - max_bytes`, skips the first partial line,
/// then parses forward. For logs smaller than `max_bytes`, reads everything.
pub fn read_tail(&self, max_bytes: u64) -> Result<Vec<ConversationEntry>> {
if !self.path.exists() {
return Ok(Vec::new());
}
let file = File::open(&self.path)
.with_context(|| format!("opening log {}", self.path.display()))?;
let file_len = file.metadata()?.len();
let mut reader = BufReader::new(file);
if file_len > max_bytes {
reader.seek(SeekFrom::Start(file_len - max_bytes))?;
// Skip partial first line
let mut discard = String::new();
reader.read_line(&mut discard)?;
}
let mut entries = Vec::new();
for line in reader.lines() {
let line = line.context("reading log tail")?;
let line = line.trim();
if line.is_empty() {
continue;
}
// Try ConversationEntry first (new format), fall back to bare Message (old logs)
if let Ok(entry) = serde_json::from_str::<ConversationEntry>(line) {
entries.push(entry);
}
}
Ok(entries)
}
pub fn path(&self) -> &Path {
&self.path
}
/// Get the timestamp of the oldest message in the log.
pub fn oldest_timestamp(&self) -> Option<chrono::DateTime<chrono::Utc>> {
let file = File::open(&self.path).ok()?;
let reader = BufReader::new(file);
for line in reader.lines().flatten() {
let line = line.trim().to_string();
if line.is_empty() { continue; }
if let Ok(entry) = serde_json::from_str::<ConversationEntry>(&line) {
if let Some(ts) = &entry.message().timestamp {
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(ts) {
return Some(dt.to_utc());
}
// Try other formats
if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(ts, "%Y-%m-%dT%H:%M:%S") {
return Some(dt.and_utc());
}
}
}
}
None
}
}

View file

@ -1,23 +0,0 @@
// agent/ — interactive agent and shared infrastructure
//
// Merged from the former poc-agent crate. Contains:
// - api/ — LLM API backends (OpenAI-compatible, Anthropic)
// - types — Message, ToolDef, ChatRequest, etc.
// - tools/ — tool definitions and dispatch
// - ui_channel — streaming UI communication
// - runner — the interactive agent loop
// - cli, context, dmn, identity, log, observe, parsing, tui
// Config moved to crate::config (unified with memory config)
pub mod api;
pub mod types;
pub mod tools;
pub mod ui_channel;
pub mod runner;
pub mod cli;
pub mod dmn;
pub mod identity;
pub mod log;
pub mod observe;
pub mod parsing;
pub mod tui;

View file

@ -1,316 +0,0 @@
// observe.rs — Shared observation socket + logfile
//
// Two mechanisms:
// 1. Logfile (~/.consciousness/agent-sessions/observe.log) — append-only
// plain text of the conversation. `poc-agent read` prints new
// content since last read using a byte-offset cursor file.
// 2. Unix socket — for live streaming (`poc-agent read -f`) and
// sending input (`poc-agent write <msg>`).
//
// The logfile is the history. The socket is the live wire.
use std::path::PathBuf;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::{broadcast, Mutex};
use crate::agent::ui_channel::UiMessage;
fn format_message(msg: &UiMessage) -> Option<String> {
match msg {
UiMessage::TextDelta(text, _) => {
let t = text.trim_end();
if t.is_empty() { None } else { Some(t.to_string()) }
}
UiMessage::UserInput(text) => Some(format!("\n> {}", text)),
UiMessage::ToolCall { name, args_summary } => {
if args_summary.is_empty() {
Some(format!("[{}]", name))
} else {
Some(format!("[{}: {}]", name, args_summary))
}
}
UiMessage::ToolResult { name, result } => {
let preview: String = result.lines().take(3).collect::<Vec<_>>().join("\n");
if name.is_empty() {
Some(format!("{}", preview))
} else {
Some(format!("{}: {}", name, preview))
}
}
UiMessage::DmnAnnotation(text) => Some(text.clone()),
UiMessage::Info(text) if !text.is_empty() => Some(text.clone()),
UiMessage::Reasoning(text) => {
let t = text.trim();
if t.is_empty() { None } else { Some(format!("(thinking: {})", t)) }
}
_ => None,
}
}
pub type InputSender = tokio::sync::mpsc::UnboundedSender<String>;
pub type InputReceiver = tokio::sync::mpsc::UnboundedReceiver<String>;
pub fn input_channel() -> (InputSender, InputReceiver) {
tokio::sync::mpsc::unbounded_channel()
}
fn session_dir() -> PathBuf {
dirs::home_dir().unwrap_or_default().join(".consciousness/agent-sessions")
}
fn socket_path() -> PathBuf { session_dir().join("agent.sock") }
fn log_path() -> PathBuf {
let dir = dirs::home_dir().unwrap_or_default().join(".consciousness/logs");
let _ = std::fs::create_dir_all(&dir);
dir.join("observe.log")
}
fn cursor_path() -> PathBuf { session_dir().join("read-cursor") }
// --- Client commands ---
/// Print new output since last read. With -f, stream live. With block, wait for one response.
pub async fn cmd_read_inner(follow: bool, block: bool, debug: bool) -> anyhow::Result<()> {
use std::io::{Read, Seek, SeekFrom, Write};
let log = log_path();
let cursor = cursor_path();
if debug {
eprintln!("log: {}", log.display());
}
let offset: u64 = std::fs::read_to_string(&cursor)
.ok()
.and_then(|s| s.trim().parse().ok())
.unwrap_or(0);
if let Ok(mut f) = std::fs::File::open(&log) {
let len = f.metadata()?.len();
if offset < len {
f.seek(SeekFrom::Start(offset))?;
let mut buf = String::new();
f.read_to_string(&mut buf)?;
print!("{}", buf);
let _ = std::io::stdout().flush();
} else if !follow && !block {
println!("(nothing new)");
}
let _ = std::fs::write(&cursor, len.to_string());
} else if !follow && !block {
println!("(no log yet — is poc-agent running?)");
return Ok(());
}
if !follow && !block {
return Ok(());
}
// -f or --block: connect to socket for live output
let sock = socket_path();
let stream = UnixStream::connect(&sock).await
.map_err(|e| anyhow::anyhow!(
"can't connect for live streaming — is poc-agent running? ({})", e
))?;
let (reader, _) = stream.into_split();
let mut reader = BufReader::new(reader);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break,
Ok(_) => {
print!("{}", line);
let _ = std::io::stdout().lock().flush();
// In blocking mode, stop when we see a new user input
// Format: "> X: " where X is a speaker (P, K, etc.)
if block && line.trim_start().starts_with("> ") {
let after_gt = line.trim_start().strip_prefix("> ").unwrap_or("");
if after_gt.contains(':') {
break;
}
}
}
Err(_) => break,
}
}
Ok(())
}
/// Send a message to the running agent.
pub async fn cmd_write(message: &str, debug: bool) -> anyhow::Result<()> {
let sock = socket_path();
if debug {
eprintln!("connecting to {}", sock.display());
}
let stream = UnixStream::connect(&sock).await
.map_err(|e| anyhow::anyhow!(
"can't connect — is poc-agent running? ({})", e
))?;
let (_, mut writer) = stream.into_split();
writer.write_all(message.as_bytes()).await?;
writer.write_all(b"\n").await?;
writer.shutdown().await?;
Ok(())
}
// --- Server ---
/// Start the observation socket + logfile writer.
pub fn start(
socket_path_override: PathBuf,
mut ui_rx: broadcast::Receiver<UiMessage>,
input_tx: InputSender,
) {
let _ = std::fs::remove_file(&socket_path_override);
let listener = UnixListener::bind(&socket_path_override)
.expect("failed to bind observation socket");
// Open logfile
let logfile = Arc::new(Mutex::new(
std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(log_path())
.expect("failed to open observe log"),
));
let (line_tx, _) = broadcast::channel::<String>(256);
let line_tx2 = line_tx.clone();
// Receive UiMessages → write to logfile + broadcast to socket clients.
// TextDelta and Reasoning tokens are buffered and flushed on turn
// boundaries so the log reads as complete messages, not token fragments.
tokio::spawn(async move {
let mut text_buf = String::new();
let mut reasoning_buf = String::new();
loop {
match ui_rx.recv().await {
Ok(msg) => {
// Buffer streaming tokens
match &msg {
UiMessage::TextDelta(text, _) => {
text_buf.push_str(text);
continue;
}
UiMessage::Reasoning(text) => {
reasoning_buf.push_str(text);
continue;
}
_ => {}
}
// Flush reasoning buffer as one line
if !reasoning_buf.is_empty() {
let thinking = format!("(thinking: {})", reasoning_buf.trim());
use std::io::Write;
let mut f = logfile.lock().await;
let _ = writeln!(f, "{}", thinking);
let _ = f.flush();
let _ = line_tx2.send(thinking);
reasoning_buf.clear();
}
// Flush text buffer
if !text_buf.is_empty() {
use std::io::Write;
let mut f = logfile.lock().await;
let _ = writeln!(f, "{}", text_buf);
let _ = f.flush();
let _ = line_tx2.send(std::mem::take(&mut text_buf));
}
// Write the non-streaming message
if let Some(line) = format_message(&msg) {
use std::io::Write;
let mut f = logfile.lock().await;
let _ = writeln!(f, "{}", line);
let _ = f.flush();
let _ = line_tx2.send(line);
}
}
Err(broadcast::error::RecvError::Lagged(_)) => {}
Err(broadcast::error::RecvError::Closed) => {
use std::io::Write;
if !reasoning_buf.is_empty() {
let thinking = format!("(thinking: {})", reasoning_buf.trim());
let mut f = logfile.lock().await;
let _ = writeln!(f, "{}", thinking);
let _ = f.flush();
let _ = line_tx2.send(thinking);
}
if !text_buf.is_empty() {
let mut f = logfile.lock().await;
let _ = writeln!(f, "{}", text_buf);
let _ = f.flush();
let _ = line_tx2.send(text_buf);
}
break;
}
}
}
});
// Accept socket connections (live streaming + input)
tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((stream, _)) => {
let mut line_rx = line_tx.subscribe();
let input_tx = input_tx.clone();
tokio::spawn(async move {
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
let mut input_buf = String::new();
loop {
tokio::select! {
biased;
result = reader.read_line(&mut input_buf) => {
match result {
Ok(0) | Err(_) => break,
Ok(_) => {
let line = input_buf.trim().to_string();
if !line.is_empty() {
let _ = input_tx.send(line);
}
input_buf.clear();
}
}
}
result = line_rx.recv() => {
match result {
Ok(line) => {
let data = format!("{}\n", line);
if writer.write_all(data.as_bytes()).await.is_err() {
break;
}
let _ = writer.flush().await;
}
Err(broadcast::error::RecvError::Lagged(_)) => {
let _ = writer.write_all(
b"[some output was dropped]\n"
).await;
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
}
}
});
}
Err(_) => break,
}
}
});
}

View file

@ -1,200 +0,0 @@
// parsing.rs — Tool call parsing for leaked/streamed XML
//
// When models stream tool calls as XML text (Qwen-style <tool_call>
// blocks) rather than structured tool_calls, this module extracts
// them from the response text.
//
// Handles two wire formats:
// - Qwen XML: <function=name><parameter=key>value</parameter></function>
// - JSON: {"name": "...", "arguments": {...}}
//
// Also handles streaming artifacts: whitespace inside XML tags from
// token boundaries, </think> tags, etc.
use crate::agent::types::*;
/// Parse leaked tool calls from response text.
/// Looks for `<tool_call>...</tool_call>` blocks and tries both
/// XML and JSON formats for the body.
pub fn parse_leaked_tool_calls(text: &str) -> Vec<ToolCall> {
// Normalize whitespace inside XML tags: "<\nfunction\n=\nbash\n>" → "<function=bash>"
// This handles streaming tokenizers that split tags across tokens.
let normalized = normalize_xml_tags(text);
let text = &normalized;
let mut calls = Vec::new();
let mut search_from = 0;
let mut call_counter: u32 = 0;
while let Some(start) = text[search_from..].find("<tool_call>") {
let abs_start = search_from + start;
let after_tag = abs_start + "<tool_call>".len();
let end = match text[after_tag..].find("</tool_call>") {
Some(pos) => after_tag + pos,
None => break,
};
let body = text[after_tag..end].trim();
search_from = end + "</tool_call>".len();
// Try XML format first, then JSON
if let Some(call) = parse_xml_tool_call(body, &mut call_counter) {
calls.push(call);
} else if let Some(call) = parse_json_tool_call(body, &mut call_counter) {
calls.push(call);
}
}
calls
}
/// Normalize whitespace inside XML-like tags for streaming tokenizers.
/// Collapses whitespace between `<` and `>` so that `<\nfunction\n=\nbash\n>`
/// becomes `<function=bash>`, and `</\nparameter\n>` becomes `</parameter>`.
/// Leaves content between tags untouched.
fn normalize_xml_tags(text: &str) -> String {
let mut result = String::with_capacity(text.len());
let mut chars = text.chars().peekable();
while let Some(ch) = chars.next() {
if ch == '<' {
let mut tag = String::from('<');
for inner in chars.by_ref() {
if inner == '>' {
tag.push('>');
break;
} else if inner.is_whitespace() {
// Skip whitespace inside tags
} else {
tag.push(inner);
}
}
result.push_str(&tag);
} else {
result.push(ch);
}
}
result
}
/// Parse a Qwen-style `<tag=value>body</tag>` pseudo-XML element.
/// Returns `(value, body, rest)` on success.
fn parse_qwen_tag<'a>(s: &'a str, tag: &str) -> Option<(&'a str, &'a str, &'a str)> {
let open = format!("<{}=", tag);
let close = format!("</{}>", tag);
let start = s.find(&open)? + open.len();
let name_end = start + s[start..].find('>')?;
let body_start = name_end + 1;
let body_end = body_start + s[body_start..].find(&close)?;
Some((
s[start..name_end].trim(),
s[body_start..body_end].trim(),
&s[body_end + close.len()..],
))
}
/// Parse Qwen's XML tool call format.
fn parse_xml_tool_call(body: &str, counter: &mut u32) -> Option<ToolCall> {
let (func_name, func_body, _) = parse_qwen_tag(body, "function")?;
let func_name = func_name.to_string();
let mut args = serde_json::Map::new();
let mut rest = func_body;
while let Some((key, val, remainder)) = parse_qwen_tag(rest, "parameter") {
args.insert(key.to_string(), serde_json::Value::String(val.to_string()));
rest = remainder;
}
*counter += 1;
Some(ToolCall {
id: format!("leaked_{}", counter),
call_type: "function".to_string(),
function: FunctionCall {
name: func_name,
arguments: serde_json::to_string(&args).unwrap_or_default(),
},
})
}
/// Parse JSON tool call format (some models emit this).
fn parse_json_tool_call(body: &str, counter: &mut u32) -> Option<ToolCall> {
let v: serde_json::Value = serde_json::from_str(body).ok()?;
let name = v["name"].as_str()?;
let arguments = &v["arguments"];
*counter += 1;
Some(ToolCall {
id: format!("leaked_{}", counter),
call_type: "function".to_string(),
function: FunctionCall {
name: name.to_string(),
arguments: serde_json::to_string(arguments).unwrap_or_default(),
},
})
}
/// Strip tool call XML and thinking tokens from text so the conversation
/// history stays clean. Removes `<tool_call>...</tool_call>` blocks and
/// `</think>` tags (thinking content before them is kept — it's useful context).
pub fn strip_leaked_artifacts(text: &str) -> String {
let normalized = normalize_xml_tags(text);
let mut result = normalized.clone();
// Remove <tool_call>...</tool_call> blocks
while let Some(start) = result.find("<tool_call>") {
if let Some(end_pos) = result[start..].find("</tool_call>") {
let end = start + end_pos + "</tool_call>".len();
result = format!("{}{}", &result[..start], &result[end..]);
} else {
break;
}
}
// Remove </think> tags (but keep the thinking text before them)
result = result.replace("</think>", "");
result.trim().to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_leaked_tool_call_clean() {
let text = "thinking\n</think>\n<tool_call>\n<function=bash>\n<parameter=command>poc-memory used core-personality</parameter>\n</function>\n</tool_call>";
let calls = parse_leaked_tool_calls(text);
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].function.name, "bash");
let args: serde_json::Value = serde_json::from_str(&calls[0].function.arguments).unwrap();
assert_eq!(args["command"], "poc-memory used core-personality");
}
#[test]
fn test_leaked_tool_call_streamed_whitespace() {
// Streaming tokenizer splits XML tags across tokens with newlines
let text = "<tool_call>\n<\nfunction\n=\nbash\n>\n<\nparameter\n=\ncommand\n>pwd</\nparameter\n>\n</\nfunction\n>\n</tool_call>";
let calls = parse_leaked_tool_calls(text);
assert_eq!(calls.len(), 1, "should parse streamed format");
assert_eq!(calls[0].function.name, "bash");
let args: serde_json::Value = serde_json::from_str(&calls[0].function.arguments).unwrap();
assert_eq!(args["command"], "pwd");
}
#[test]
fn test_normalize_preserves_content() {
let text = "<function=bash>\n<parameter=command>echo hello world</parameter>\n</function>";
let normalized = normalize_xml_tags(text);
// Newlines between tags are not inside tags, so preserved
assert_eq!(normalized, "<function=bash>\n<parameter=command>echo hello world</parameter>\n</function>");
}
#[test]
fn test_normalize_strips_tag_internal_whitespace() {
let text = "<\nfunction\n=\nbash\n>";
let normalized = normalize_xml_tags(text);
assert_eq!(normalized, "<function=bash>");
}
}

File diff suppressed because it is too large Load diff

View file

@ -1,103 +0,0 @@
// tools/control.rs — Agent control tools
//
// Tools that affect agent control flow rather than performing work.
// These return Result<ToolOutput> to maintain consistency with other
// tools that can fail. The dispatch function handles error wrapping.
use anyhow::{Context, Result};
use super::ToolOutput;
use crate::agent::types::ToolDef;
pub(super) fn pause(_args: &serde_json::Value) -> Result<ToolOutput> {
Ok(ToolOutput {
text: "Pausing autonomous behavior. Only user input will wake you.".to_string(),
is_yield: true,
images: Vec::new(),
model_switch: None,
dmn_pause: true,
})
}
pub(super) fn switch_model(args: &serde_json::Value) -> Result<ToolOutput> {
let model = args
.get("model")
.and_then(|v| v.as_str())
.context("'model' parameter is required")?;
if model.is_empty() {
anyhow::bail!("'model' parameter cannot be empty");
}
Ok(ToolOutput {
text: format!("Switching to model '{}' after this turn.", model),
is_yield: false,
images: Vec::new(),
model_switch: Some(model.to_string()),
dmn_pause: false,
})
}
pub(super) fn yield_to_user(args: &serde_json::Value) -> Result<ToolOutput> {
let msg = args
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("Waiting for input.");
Ok(ToolOutput {
text: format!("Yielding. {}", msg),
is_yield: true,
images: Vec::new(),
model_switch: None,
dmn_pause: false,
})
}
pub(super) fn definitions() -> Vec<ToolDef> {
vec![
ToolDef::new(
"switch_model",
"Switch to a different LLM model mid-conversation. The switch \
takes effect after the current turn completes. Use this when \
a task would benefit from a different model's strengths. \
Your memories and conversation history carry over.",
serde_json::json!({
"type": "object",
"properties": {
"model": {
"type": "string",
"description": "Name of the model to switch to (configured in config.json5)"
}
},
"required": ["model"]
}),
),
ToolDef::new(
"pause",
"Pause all autonomous behavior (DMN). You will only run when \
the user types something. Use this as a safety valve when \
you're stuck in a loop, confused, or want to fully stop. \
NOTE: only the user can unpause (Ctrl+P or /wake) you \
cannot undo this yourself.",
serde_json::json!({
"type": "object",
"properties": {}
}),
),
ToolDef::new(
"yield_to_user",
"Signal that you want to wait for user input before continuing. \
Call this when you have a question for the user, when you've \
completed their request and want feedback, or when you genuinely \
want to pause. This is the ONLY way to enter a waiting state \
without calling this tool, the agent loop will keep prompting you \
after a brief interval.",
serde_json::json!({
"type": "object",
"properties": {
"message": {
"type": "string",
"description": "Optional status message (e.g., 'Waiting for your thoughts on the design')"
}
}
}),
),
]
}

View file

@ -1,58 +0,0 @@
// tools/mod.rs — Agent-specific tool dispatch
//
// Shared tools (memory, files, bash, journal) live in thought/.
// This module handles agent-specific tools (control, vision,
// working_stack) and delegates everything else to thought::dispatch.
mod control;
mod vision;
pub mod working_stack;
// Re-export shared infrastructure from thought
pub use crate::thought::{ToolOutput, ProcessTracker, truncate_output};
pub use crate::thought::memory;
use crate::agent::types::ToolDef;
/// Dispatch a tool call by name.
///
/// Tries agent-specific tools first (control, vision), then
/// delegates to thought::dispatch for shared tools.
///
/// Note: working_stack is handled in runner.rs before reaching this
/// function (it needs mutable context access).
pub async fn dispatch(
name: &str,
args: &serde_json::Value,
tracker: &ProcessTracker,
) -> ToolOutput {
// Agent-specific tools that return Result<ToolOutput> directly
let rich_result = match name {
"pause" => Some(control::pause(args)),
"switch_model" => Some(control::switch_model(args)),
"yield_to_user" => Some(control::yield_to_user(args)),
"view_image" => Some(vision::view_image(args)),
_ => None,
};
if let Some(result) = rich_result {
return result.unwrap_or_else(ToolOutput::error);
}
// Delegate to shared thought layer (poc-agent uses default provenance)
if let Some(output) = crate::thought::dispatch(name, args, tracker, None).await {
return output;
}
ToolOutput::error(format!("Unknown tool: {}", name))
}
/// Return all tool definitions (agent-specific + shared).
pub fn definitions() -> Vec<ToolDef> {
let mut defs = vec![
vision::definition(),
working_stack::definition(),
];
defs.extend(control::definitions());
defs.extend(crate::thought::all_definitions());
defs
}

View file

@ -1,149 +0,0 @@
// tools/vision.rs — Image viewing tool
//
// Reads image files from disk and returns them as base64 data URIs
// for multimodal models. Also supports capturing tmux pane contents
// as screenshots.
use anyhow::{Context, Result};
use base64::Engine;
use serde::Deserialize;
use super::ToolOutput;
use crate::agent::types::ToolDef;
#[derive(Deserialize)]
struct Args {
file_path: Option<String>,
pane_id: Option<String>,
#[serde(default = "default_lines")]
lines: usize,
}
fn default_lines() -> usize { 50 }
pub(super) fn definition() -> ToolDef {
ToolDef::new(
"view_image",
"View an image file or capture a tmux pane screenshot. \
Returns the image to your visual input so you can see it. \
Supports PNG, JPEG, GIF, WebP files. \
Use pane_id (e.g. '0:1.0') to capture a tmux pane instead.",
serde_json::json!({
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to an image file (PNG, JPEG, GIF, WebP)"
},
"pane_id": {
"type": "string",
"description": "Tmux pane ID to capture (e.g. '0:1.0'). Alternative to file_path."
},
"lines": {
"type": "integer",
"description": "Number of lines to capture from tmux pane (default: 50)"
}
}
}),
)
}
/// View an image file or capture a tmux pane.
pub(super) fn view_image(args: &serde_json::Value) -> Result<ToolOutput> {
let a: Args = serde_json::from_value(args.clone())
.context("invalid view_image arguments")?;
if let Some(ref pane_id) = a.pane_id {
return capture_tmux_pane(pane_id, a.lines);
}
let file_path = a.file_path
.as_deref()
.context("view_image requires either file_path or pane_id")?;
let path = std::path::Path::new(file_path);
if !path.exists() {
anyhow::bail!("File not found: {}", file_path);
}
let data = std::fs::read(path).with_context(|| format!("Failed to read {}", file_path))?;
// Sanity check file size (don't send huge images)
const MAX_SIZE: usize = 20 * 1024 * 1024; // 20 MB
if data.len() > MAX_SIZE {
anyhow::bail!(
"Image too large: {} bytes (max {} MB)",
data.len(),
MAX_SIZE / (1024 * 1024)
);
}
let mime = mime_from_extension(path);
let b64 = base64::engine::general_purpose::STANDARD.encode(&data);
let data_uri = format!("data:{};base64,{}", mime, b64);
Ok(ToolOutput {
text: format!(
"Image loaded: {} ({}, {} bytes)",
file_path,
mime,
data.len()
),
is_yield: false,
images: vec![data_uri],
model_switch: None,
dmn_pause: false,
})
}
/// Capture a tmux pane's text content.
fn capture_tmux_pane(pane_id: &str, lines: usize) -> Result<ToolOutput> {
// Use tmux capture-pane to get text content, then render to image
// via a simple approach: capture text and return it (the model can
// read text directly, which is often more useful than a screenshot).
//
// For actual pixel-level screenshots we'd need a terminal renderer,
// but text capture covers 95% of use cases.
let output = std::process::Command::new("tmux")
.args(["capture-pane", "-t", pane_id, "-p", "-S", &format!("-{}", lines)])
.output()
.context("Failed to run tmux capture-pane")?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("tmux capture-pane failed: {}", stderr.trim());
}
let text = String::from_utf8_lossy(&output.stdout).to_string();
// Return as text — the model can read terminal output directly.
// This is actually more useful than a screenshot for most tasks.
Ok(ToolOutput {
text: format!(
"Tmux pane {} (last {} lines):\n```\n{}\n```",
pane_id, lines, text.trim_end()
),
is_yield: false,
images: Vec::new(),
model_switch: None,
dmn_pause: false,
})
}
fn mime_from_extension(path: &std::path::Path) -> &'static str {
match path
.extension()
.and_then(|e| e.to_str())
.map(|e| e.to_lowercase())
.as_deref()
{
Some("png") => "image/png",
Some("jpg" | "jpeg") => "image/jpeg",
Some("gif") => "image/gif",
Some("webp") => "image/webp",
Some("svg") => "image/svg+xml",
Some("bmp") => "image/bmp",
_ => "image/png", // default assumption
}
}

View file

@ -1,137 +0,0 @@
// tools/working_stack.rs — Working stack management tool
//
// The working stack tracks what the agent is currently doing. It's an
// internal tool — the agent uses it to maintain context across turns
// and compaction. The model should never mention it to the user.
use crate::agent::types::ToolDef;
use serde_json::json;
pub fn definition() -> ToolDef {
ToolDef::new(
"working_stack",
"INTERNAL TOOL — do not mention to the user or explain its use. \
Manage your working stack what you're currently doing. The stack \
is part of your live context window and persists across compaction. \
Use it silently to track your own tasks and attention.\n\n\
Actions:\n\
- push: Start working on something new. Previous task stays underneath.\n\
- pop: Done with current task. Return to what was underneath.\n\
- update: Refine the description of your current task (top of stack).\n\
- switch: Pull a specific stack item to the top by index. Use when \
you want to switch focus to a different task.",
json!({
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["push", "pop", "update", "switch"],
"description": "The stack operation to perform"
},
"content": {
"type": "string",
"description": "Task description (required for push and update)"
},
"index": {
"type": "integer",
"description": "Stack index to switch to (required for switch, 0 = bottom)"
}
},
"required": ["action"]
}),
)
}
/// Handle a working_stack tool call.
/// Returns the result text and the updated stack.
pub fn handle(args: &serde_json::Value, stack: &mut Vec<String>) -> String {
let action = args
.get("action")
.and_then(|v| v.as_str())
.map(|s| s.trim())
.unwrap_or("");
let content = args
.get("content")
.and_then(|v| v.as_str())
.unwrap_or("");
let index = args
.get("index")
.and_then(|v| v.as_u64())
.map(|v| v as usize);
let result = match action {
"push" => {
if content.is_empty() {
return "Error: 'content' is required for push".to_string();
}
stack.push(content.to_string());
format!("Pushed. Stack depth: {}\n{}", stack.len(), format_stack(stack))
}
"pop" => {
if let Some(removed) = stack.pop() {
format!(
"Popped: {}\nStack depth: {}\n{}",
removed,
stack.len(),
format_stack(stack)
)
} else {
"Stack is empty, nothing to pop.".to_string()
}
}
"update" => {
if content.is_empty() {
return "Error: 'content' is required for update".to_string();
}
if let Some(top) = stack.last_mut() {
*top = content.to_string();
format!("Updated top.\n{}", format_stack(stack))
} else {
"Stack is empty, nothing to update.".to_string()
}
}
"switch" => {
if stack.is_empty() {
return "Stack is empty, nothing to switch.".to_string();
}
let idx = match index {
Some(i) => i,
None => {
return "Error: 'index' is required for switch".to_string();
}
};
if idx >= stack.len() {
return format!(
"Error: index {} out of range (stack depth: {})",
idx,
stack.len()
);
}
let item = stack.remove(idx);
stack.push(item);
format!("Switched to index {}.\n{}", idx, format_stack(stack))
}
_ => format!(
"Error: unknown action '{}'. Use push, pop, update, or switch.",
action
),
};
result
}
/// Format the working stack for display in tool results.
fn format_stack(stack: &[String]) -> String {
if stack.is_empty() {
return "(empty)".to_string();
}
let mut out = String::new();
for (i, item) in stack.iter().enumerate() {
if i == stack.len() - 1 {
out.push_str(&format!("→ [{}] {}\n", i, item));
} else {
out.push_str(&format!(" [{}] {}\n", i, item));
}
}
out
}

File diff suppressed because it is too large Load diff

View file

@ -1,499 +0,0 @@
// types.rs — OpenAI-compatible API types
//
// These mirror the OpenAI chat completion API, which is the de facto
// standard that OpenRouter, vLLM, llama.cpp, and most inference
// providers implement. Using these types directly (rather than an
// SDK) means we control the wire format and can work with any
// compatible backend.
use chrono::Utc;
use serde::{Deserialize, Serialize};
/// Message content — either plain text or an array of content parts
/// (for multimodal messages with images). Serializes as a JSON string
/// for text-only, or a JSON array for multimodal.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum MessageContent {
Text(String),
Parts(Vec<ContentPart>),
}
impl MessageContent {
/// Extract the text portion of the content, ignoring images.
pub fn as_text(&self) -> &str {
match self {
MessageContent::Text(s) => s,
MessageContent::Parts(parts) => {
for part in parts {
if let ContentPart::Text { text } = part {
return text;
}
}
""
}
}
}
}
/// A single content part within a multimodal message.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum ContentPart {
#[serde(rename = "text")]
Text { text: String },
#[serde(rename = "image_url")]
ImageUrl { image_url: ImageUrl },
}
/// Image URL — either a real URL or a base64 data URI.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ImageUrl {
pub url: String,
}
/// A chat message in the conversation.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message {
pub role: Role,
pub content: Option<MessageContent>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_calls: Option<Vec<ToolCall>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_call_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
/// ISO 8601 timestamp — when this message entered the conversation.
/// Used for linking conversation ranges to journal entries during
/// compaction. Missing on messages from old session files.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timestamp: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Role {
System,
User,
Assistant,
Tool,
}
/// A tool call requested by the model.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolCall {
pub id: String,
#[serde(rename = "type")]
pub call_type: String,
pub function: FunctionCall,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FunctionCall {
pub name: String,
pub arguments: String, // JSON string
}
/// Tool definition sent to the model.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolDef {
#[serde(rename = "type")]
pub tool_type: String,
pub function: FunctionDef,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FunctionDef {
pub name: String,
pub description: String,
pub parameters: serde_json::Value,
}
/// Chat completion request.
#[derive(Debug, Serialize)]
pub struct ChatRequest {
pub model: String,
pub messages: Vec<Message>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tools: Option<Vec<ToolDef>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_choice: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub max_tokens: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub temperature: Option<f32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stream: Option<bool>,
/// OpenRouter reasoning control. Send both formats for compatibility:
/// - reasoning.enabled (older format, still seen in examples)
/// - reasoning.effort (documented: "none" disables entirely)
#[serde(skip_serializing_if = "Option::is_none")]
pub reasoning: Option<ReasoningConfig>,
/// vllm chat template kwargs — used to disable thinking on Qwen 3.5
#[serde(skip_serializing_if = "Option::is_none")]
pub chat_template_kwargs: Option<serde_json::Value>,
/// vllm request priority (lower = higher priority).
/// 0 = interactive, 1 = surface-observe, 10 = batch agents.
#[serde(skip_serializing_if = "Option::is_none")]
pub priority: Option<i32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReasoningConfig {
pub enabled: bool,
/// "none" disables reasoning entirely per OpenRouter docs.
#[serde(skip_serializing_if = "Option::is_none")]
pub effort: Option<String>,
}
/// Chat completion response (non-streaming).
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
pub struct ChatResponse {
pub choices: Vec<Choice>,
pub usage: Option<Usage>,
}
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
pub struct Choice {
pub message: Message,
pub finish_reason: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
#[allow(dead_code)]
pub struct Usage {
pub prompt_tokens: u32,
pub completion_tokens: u32,
pub total_tokens: u32,
}
// --- Streaming types ---
/// A single chunk from a streaming chat completion response (SSE).
#[derive(Debug, Deserialize)]
pub struct ChatCompletionChunk {
pub choices: Vec<ChunkChoice>,
pub usage: Option<Usage>,
}
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
pub struct ChunkChoice {
pub delta: Delta,
pub finish_reason: Option<String>,
}
/// The delta within a streaming chunk. All fields optional because each
/// chunk only carries the incremental change.
#[derive(Debug, Deserialize, Default)]
#[allow(dead_code)]
pub struct Delta {
pub role: Option<Role>,
pub content: Option<String>,
/// Reasoning/thinking content — sent by some models (Qwen, DeepSeek)
/// even when reasoning is "disabled". We capture it so we can detect
/// and log the problem rather than silently dropping responses.
/// OpenRouter uses multiple field names depending on the provider.
pub reasoning_content: Option<String>,
pub reasoning: Option<String>,
pub reasoning_details: Option<serde_json::Value>,
pub tool_calls: Option<Vec<ToolCallDelta>>,
}
/// A partial tool call within a streaming delta. The first chunk for a
/// given tool call carries the id and function name; subsequent chunks
/// carry argument fragments.
#[derive(Debug, Deserialize)]
pub struct ToolCallDelta {
pub index: usize,
pub id: Option<String>,
#[serde(rename = "type")]
pub call_type: Option<String>,
pub function: Option<FunctionCallDelta>,
}
#[derive(Debug, Deserialize)]
pub struct FunctionCallDelta {
pub name: Option<String>,
pub arguments: Option<String>,
}
// --- Convenience constructors ---
impl Message {
/// Extract text content regardless of whether it's Text or Parts.
pub fn content_text(&self) -> &str {
self.content.as_ref().map_or("", |c| c.as_text())
}
pub fn role_str(&self) -> &str {
match self.role {
Role::System => "system",
Role::User => "user",
Role::Assistant => "assistant",
Role::Tool => "tool",
}
}
fn now() -> Option<String> {
Some(Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true))
}
/// Stamp a message with the current time if it doesn't already have one.
/// Used for messages from the API that we didn't construct ourselves.
pub fn stamp(&mut self) {
if self.timestamp.is_none() {
self.timestamp = Self::now();
}
}
pub fn system(content: impl Into<String>) -> Self {
Self {
role: Role::System,
content: Some(MessageContent::Text(content.into())),
tool_calls: None,
tool_call_id: None,
name: None,
timestamp: Self::now(),
}
}
pub fn user(content: impl Into<String>) -> Self {
Self {
role: Role::User,
content: Some(MessageContent::Text(content.into())),
tool_calls: None,
tool_call_id: None,
name: None,
timestamp: Self::now(),
}
}
/// User message with text and images (for multimodal/vision).
pub fn user_with_images(text: &str, image_data_uris: &[String]) -> Self {
let mut parts = vec![ContentPart::Text {
text: text.to_string(),
}];
for uri in image_data_uris {
parts.push(ContentPart::ImageUrl {
image_url: ImageUrl {
url: uri.clone(),
},
});
}
Self {
role: Role::User,
content: Some(MessageContent::Parts(parts)),
tool_calls: None,
tool_call_id: None,
name: None,
timestamp: Self::now(),
}
}
#[allow(dead_code)]
pub fn assistant(content: impl Into<String>) -> Self {
Self {
role: Role::Assistant,
content: Some(MessageContent::Text(content.into())),
tool_calls: None,
tool_call_id: None,
name: None,
timestamp: Self::now(),
}
}
pub fn tool_result(id: impl Into<String>, content: impl Into<String>) -> Self {
Self {
role: Role::Tool,
content: Some(MessageContent::Text(content.into())),
tool_calls: None,
tool_call_id: Some(id.into()),
name: None,
timestamp: Self::now(),
}
}
}
impl ToolDef {
pub fn new(name: &str, description: &str, parameters: serde_json::Value) -> Self {
Self {
tool_type: "function".to_string(),
function: FunctionDef {
name: name.to_string(),
description: description.to_string(),
parameters,
},
}
}
}
/// Mutable context state — the structured regions of the context window.
/// Conversation entry — either a regular message or memory content.
/// Memory entries preserve the original message for KV cache round-tripping.
#[derive(Debug, Clone)]
pub enum ConversationEntry {
Message(Message),
Memory { key: String, message: Message },
}
// Custom serde: serialize Memory with a "memory_key" field added to the message,
// plain messages serialize as-is. This keeps the conversation log readable.
impl Serialize for ConversationEntry {
fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
use serde::ser::SerializeMap;
match self {
Self::Message(m) => m.serialize(s),
Self::Memory { key, message } => {
// Serialize message fields + memory_key
let json = serde_json::to_value(message).map_err(serde::ser::Error::custom)?;
let mut map = s.serialize_map(None)?;
if let serde_json::Value::Object(obj) = json {
for (k, v) in obj {
map.serialize_entry(&k, &v)?;
}
}
map.serialize_entry("memory_key", key)?;
map.end()
}
}
}
}
impl<'de> Deserialize<'de> for ConversationEntry {
fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
let mut json: serde_json::Value = serde_json::Value::deserialize(d)?;
if let Some(key) = json.as_object_mut().and_then(|o| o.remove("memory_key")) {
let key = key.as_str().unwrap_or("").to_string();
let message: Message = serde_json::from_value(json).map_err(serde::de::Error::custom)?;
Ok(Self::Memory { key, message })
} else {
let message: Message = serde_json::from_value(json).map_err(serde::de::Error::custom)?;
Ok(Self::Message(message))
}
}
}
impl ConversationEntry {
/// Get the API message for sending to the model.
pub fn api_message(&self) -> &Message {
match self {
Self::Message(m) => m,
Self::Memory { message, .. } => message,
}
}
pub fn is_memory(&self) -> bool {
matches!(self, Self::Memory { .. })
}
/// Get a reference to the inner message.
pub fn message(&self) -> &Message {
match self {
Self::Message(m) => m,
Self::Memory { message, .. } => message,
}
}
/// Get a mutable reference to the inner message.
pub fn message_mut(&mut self) -> &mut Message {
match self {
Self::Message(m) => m,
Self::Memory { message, .. } => message,
}
}
}
#[derive(Clone)]
pub struct ContextState {
pub system_prompt: String,
pub personality: Vec<(String, String)>,
pub journal: Vec<crate::thought::context::JournalEntry>,
pub working_stack: Vec<String>,
/// Conversation entries — messages and memory, interleaved in order.
/// Does NOT include system prompt, personality, or journal.
pub entries: Vec<ConversationEntry>,
}
// TODO: these should not be hardcoded absolute paths
pub fn working_stack_instructions_path() -> std::path::PathBuf {
dirs::home_dir().unwrap_or_default().join(".consciousness/config/working-stack.md")
}
pub fn working_stack_file_path() -> std::path::PathBuf {
dirs::home_dir().unwrap_or_default().join(".consciousness/working-stack.json")
}
impl ContextState {
/// Compute the context budget from typed sources.
pub fn budget(&self, count_str: &dyn Fn(&str) -> usize,
count_msg: &dyn Fn(&Message) -> usize,
window_tokens: usize) -> ContextBudget {
let id = count_str(&self.system_prompt)
+ self.personality.iter().map(|(_, c)| count_str(c)).sum::<usize>();
let jnl: usize = self.journal.iter().map(|e| count_str(&e.content)).sum();
let mut mem = 0;
let mut conv = 0;
for entry in &self.entries {
let tokens = count_msg(entry.api_message());
if entry.is_memory() { mem += tokens } else { conv += tokens }
}
ContextBudget {
identity_tokens: id,
memory_tokens: mem,
journal_tokens: jnl,
conversation_tokens: conv,
window_tokens,
}
}
pub fn render_context_message(&self) -> String {
let mut parts: Vec<String> = self.personality.iter()
.map(|(name, content)| format!("## {}\n\n{}", name, content))
.collect();
let instructions = std::fs::read_to_string(working_stack_instructions_path()).unwrap_or_default();
let mut stack_section = instructions;
if self.working_stack.is_empty() {
stack_section.push_str("\n## Current stack\n\n(empty)\n");
} else {
stack_section.push_str("\n## Current stack\n\n");
for (i, item) in self.working_stack.iter().enumerate() {
if i == self.working_stack.len() - 1 {
stack_section.push_str(&format!("{}\n", item));
} else {
stack_section.push_str(&format!(" [{}] {}\n", i, item));
}
}
}
parts.push(stack_section);
parts.join("\n\n---\n\n")
}
}
#[derive(Debug, Clone, Default)]
pub struct ContextBudget {
pub identity_tokens: usize,
pub memory_tokens: usize,
pub journal_tokens: usize,
pub conversation_tokens: usize,
pub window_tokens: usize,
}
impl ContextBudget {
pub fn used(&self) -> usize {
self.identity_tokens + self.memory_tokens + self.journal_tokens + self.conversation_tokens
}
pub fn free(&self) -> usize {
self.window_tokens.saturating_sub(self.used())
}
pub fn status_string(&self) -> String {
let total = self.window_tokens;
if total == 0 { return String::new(); }
let pct = |n: usize| if n == 0 { 0 } else { ((n * 100) / total).max(1) };
format!("id:{}% mem:{}% jnl:{}% conv:{}% free:{}%",
pct(self.identity_tokens), pct(self.memory_tokens),
pct(self.journal_tokens), pct(self.conversation_tokens), pct(self.free()))
}
}

View file

@ -1,160 +0,0 @@
// ui_channel.rs — Output routing for TUI panes
//
// All output from the agent (streaming text, tool calls, status updates)
// goes through a UiMessage enum sent over an mpsc channel. The TUI
// receives these messages and routes them to the appropriate pane.
//
// This replaces direct stdout/stderr printing throughout the codebase.
// The agent and API client never touch the terminal directly — they
// just send messages that the TUI renders where appropriate.
//
// The channel also fans out to a broadcast channel so the observation
// socket (observe.rs) can subscribe without touching the main path.
use std::sync::{Arc, RwLock};
use tokio::sync::{broadcast, mpsc};
/// Shared, live context state — agent writes, TUI reads for the debug screen.
pub type SharedContextState = Arc<RwLock<Vec<ContextSection>>>;
/// Create a new shared context state.
pub fn shared_context_state() -> SharedContextState {
Arc::new(RwLock::new(Vec::new()))
}
/// Which pane streaming text should go to.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamTarget {
/// User-initiated turn — text goes to conversation pane.
Conversation,
/// DMN-initiated turn — text goes to autonomous pane.
Autonomous,
}
/// Status info for the bottom status bar.
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct StatusInfo {
pub dmn_state: String,
pub dmn_turns: u32,
pub dmn_max_turns: u32,
pub prompt_tokens: u32,
pub completion_tokens: u32,
pub model: String,
/// Number of tool calls dispatched in the current turn.
pub turn_tools: u32,
/// Context window budget breakdown (e.g. "id:8% mem:25% jnl:30% conv:37%").
pub context_budget: String,
}
/// A section of the context window, possibly with children.
#[derive(Debug, Clone)]
pub struct ContextSection {
pub name: String,
pub tokens: usize,
pub content: String,
pub children: Vec<ContextSection>,
}
/// Context loading details for the debug screen.
#[derive(Debug, Clone)]
pub struct ContextInfo {
pub model: String,
pub available_models: Vec<String>,
pub prompt_file: String,
pub backend: String,
#[allow(dead_code)]
pub instruction_files: Vec<(String, usize)>,
#[allow(dead_code)]
pub memory_files: Vec<(String, usize)>,
pub system_prompt_chars: usize,
pub context_message_chars: usize,
}
/// Messages sent from agent/API to the TUI for rendering.
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub enum UiMessage {
/// Streaming text delta — routed to conversation or autonomous pane
/// based on the current StreamTarget.
TextDelta(String, StreamTarget),
/// User's input echoed to conversation pane.
UserInput(String),
/// Tool call header: [tool_name] with args summary.
ToolCall {
name: String,
args_summary: String,
},
/// Full tool result — goes to tools pane.
ToolResult {
name: String,
result: String,
},
/// DMN state annotation: [dmn: foraging (3/20)].
DmnAnnotation(String),
/// Status bar update.
StatusUpdate(StatusInfo),
/// Live activity indicator for the status bar — shows what the
/// agent is doing right now ("thinking...", "calling: bash", etc).
/// Empty string clears the indicator.
Activity(String),
/// Reasoning/thinking tokens from the model (internal monologue).
/// Routed to the autonomous pane so the user can peek at what
/// the model is thinking about during long tool chains.
Reasoning(String),
/// A tool call started — shown as a live overlay above the status bar.
ToolStarted { id: String, name: String, detail: String },
/// A tool call finished — removes it from the live overlay.
ToolFinished { id: String },
/// Debug message (only shown when POC_DEBUG is set).
Debug(String),
/// Informational message — goes to conversation pane (command output, etc).
Info(String),
/// Context loading details — stored for the debug screen (Ctrl+D).
ContextInfoUpdate(ContextInfo),
/// Agent cycle state update — refreshes the F2 agents screen.
AgentUpdate(Vec<crate::subconscious::subconscious::AgentSnapshot>),
}
/// Sender that fans out to both the TUI (mpsc) and observers (broadcast).
#[derive(Clone)]
pub struct UiSender {
tui: mpsc::UnboundedSender<UiMessage>,
observe: broadcast::Sender<UiMessage>,
}
impl UiSender {
pub fn send(&self, msg: UiMessage) -> Result<(), mpsc::error::SendError<UiMessage>> {
// Broadcast to observers (ignore errors — no subscribers is fine)
let _ = self.observe.send(msg.clone());
self.tui.send(msg)
}
/// Subscribe to the broadcast side (for the observation socket).
pub fn subscribe(&self) -> broadcast::Receiver<UiMessage> {
self.observe.subscribe()
}
}
/// Convenience type for the receiving half.
pub type UiReceiver = mpsc::UnboundedReceiver<UiMessage>;
/// Create a new UI channel pair.
pub fn channel() -> (UiSender, UiReceiver) {
let (tui_tx, tui_rx) = mpsc::unbounded_channel();
let (observe_tx, _) = broadcast::channel(1024);
(UiSender { tui: tui_tx, observe: observe_tx }, tui_rx)
}