Move API code from user/ to agent/

Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
Kent Overstreet 2026-04-04 00:29:11 -04:00
parent 021eafe6da
commit 9bebbcb635
22 changed files with 259 additions and 251 deletions

View file

@ -1,576 +0,0 @@
// api/ — LLM API client (OpenAI-compatible)
//
// Works with any provider that implements the OpenAI chat completions
// API: OpenRouter, vLLM, llama.cpp, Fireworks, Together, etc.
//
// Diagnostics: anomalies always logged to debug panel.
// Set POC_DEBUG=1 for verbose per-turn logging.
mod openai;
use anyhow::Result;
use reqwest::Client;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use crate::user::types::*;
use crate::user::ui_channel::{UiMessage, UiSender};
/// A JoinHandle that aborts its task when dropped.
pub struct AbortOnDrop(tokio::task::JoinHandle<()>);
impl Drop for AbortOnDrop {
fn drop(&mut self) {
self.0.abort();
}
}
// ─────────────────────────────────────────────────────────────
// Stream events — yielded by backends, consumed by the runner
// ─────────────────────────────────────────────────────────────
/// Events produced by the streaming API backends.
/// The runner reads these and decides what to display where.
pub enum StreamEvent {
/// Content token from the model's response.
Content(String),
/// Reasoning/thinking token (internal monologue).
Reasoning(String),
/// Incremental tool call delta (structured, from APIs that support it).
ToolCallDelta {
index: usize,
id: Option<String>,
call_type: Option<String>,
name: Option<String>,
arguments: Option<String>,
},
/// Token usage stats.
Usage(Usage),
/// Stream finished.
Finished {
reason: String,
prompt_tokens: u32,
completion_tokens: u32,
},
/// Error from the stream.
Error(String),
}
#[derive(Clone)]
pub struct ApiClient {
client: Client,
api_key: String,
pub model: String,
base_url: String,
}
impl ApiClient {
pub fn new(base_url: &str, api_key: &str, model: &str) -> Self {
let client = Client::builder()
.connect_timeout(Duration::from_secs(30))
.timeout(Duration::from_secs(600))
.build()
.expect("failed to build HTTP client");
Self {
client,
api_key: api_key.to_string(),
model: model.to_string(),
base_url: base_url.trim_end_matches('/').to_string(),
}
}
/// Start a streaming chat completion. Returns a receiver of StreamEvents.
/// The caller (runner) reads events and handles routing to the UI.
///
pub fn start_stream(
&self,
messages: &[Message],
tools: Option<&[ToolDef]>,
ui_tx: &UiSender,
reasoning_effort: &str,
temperature: Option<f32>,
priority: Option<i32>,
) -> (mpsc::UnboundedReceiver<StreamEvent>, AbortOnDrop) {
let (tx, rx) = mpsc::unbounded_channel();
let client = self.client.clone();
let api_key = self.api_key.clone();
let model = self.model.clone();
let messages = messages.to_vec();
let tools = tools.map(|t| t.to_vec());
let ui_tx = ui_tx.clone();
let reasoning_effort = reasoning_effort.to_string();
let base_url = self.base_url.clone();
let handle = tokio::spawn(async move {
let result = openai::stream_events(
&client, &base_url, &api_key, &model,
&messages, tools.as_deref(), &tx, &ui_tx,
&reasoning_effort, temperature, priority,
).await;
if let Err(e) = result {
let _ = tx.send(StreamEvent::Error(e.to_string()));
}
});
(rx, AbortOnDrop(handle))
}
pub async fn chat_completion_stream_temp(
&self,
messages: &[Message],
tools: Option<&[ToolDef]>,
ui_tx: &UiSender,
reasoning_effort: &str,
temperature: Option<f32>,
priority: Option<i32>,
) -> Result<(Message, Option<Usage>)> {
// Use the event stream and accumulate into a message.
let (mut rx, _handle) = self.start_stream(messages, tools, ui_tx, reasoning_effort, temperature, priority);
let mut content = String::new();
let mut tool_calls: Vec<ToolCall> = Vec::new();
let mut usage = None;
let mut finish_reason = None;
while let Some(event) = rx.recv().await {
match event {
StreamEvent::Content(text) => content.push_str(&text),
StreamEvent::Reasoning(_) => {}
StreamEvent::ToolCallDelta { index, id, call_type, name, arguments } => {
while tool_calls.len() <= index {
tool_calls.push(ToolCall {
id: String::new(),
call_type: "function".to_string(),
function: FunctionCall { name: String::new(), arguments: String::new() },
});
}
if let Some(id) = id { tool_calls[index].id = id; }
if let Some(ct) = call_type { tool_calls[index].call_type = ct; }
if let Some(n) = name { tool_calls[index].function.name = n; }
if let Some(a) = arguments { tool_calls[index].function.arguments.push_str(&a); }
}
StreamEvent::Usage(u) => usage = Some(u),
StreamEvent::Finished { reason, .. } => {
finish_reason = Some(reason);
break;
}
StreamEvent::Error(e) => anyhow::bail!("{}", e),
}
}
if finish_reason.as_deref() == Some("error") {
let detail = if content.is_empty() { "no details".into() } else { content };
anyhow::bail!("model stream error: {}", detail);
}
Ok((build_response_message(content, tool_calls), usage))
}
pub fn base_url(&self) -> &str { &self.base_url }
pub fn api_key(&self) -> &str { &self.api_key }
/// Return a label for the active backend, used in startup info.
pub fn backend_label(&self) -> &str {
if self.base_url.contains("openrouter") {
"openrouter"
} else {
"openai-compat"
}
}
}
/// Send an HTTP request and check for errors. Shared by both backends.
pub(crate) async fn send_and_check(
client: &Client,
url: &str,
body: &impl serde::Serialize,
auth_header: (&str, &str),
extra_headers: &[(&str, &str)],
ui_tx: &UiSender,
debug_label: &str,
request_json: Option<&str>,
) -> Result<reqwest::Response> {
let debug = std::env::var("POC_DEBUG").is_ok();
let start = Instant::now();
if debug {
let payload_size = serde_json::to_string(body)
.map(|s| s.len())
.unwrap_or(0);
let _ = ui_tx.send(UiMessage::Debug(format!(
"request: {}K payload, {}",
payload_size / 1024, debug_label,
)));
}
let mut req = client
.post(url)
.header(auth_header.0, auth_header.1)
.header("Content-Type", "application/json");
for (name, value) in extra_headers {
req = req.header(*name, *value);
}
let response = req
.json(body)
.send()
.await
.map_err(|e| {
let cause = if e.is_connect() {
"connection refused"
} else if e.is_timeout() {
"request timed out"
} else if e.is_request() {
"request error"
} else {
"unknown"
};
anyhow::anyhow!("{} ({}): {:?}", cause, url, e.without_url())
})?;
let status = response.status();
let elapsed = start.elapsed();
if debug {
// Log interesting response headers
let headers = response.headers();
for name in [
"x-ratelimit-remaining",
"x-ratelimit-limit",
"x-request-id",
] {
if let Some(val) = headers.get(name) {
let _ = ui_tx.send(UiMessage::Debug(format!(
"header {}: {}",
name,
val.to_str().unwrap_or("?")
)));
}
}
}
if !status.is_success() {
let body = response.text().await.unwrap_or_default();
let _ = ui_tx.send(UiMessage::Debug(format!(
"HTTP {} after {:.1}s ({}): {}",
status,
elapsed.as_secs_f64(),
url,
&body[..body.len().min(500)]
)));
if let Some(json) = request_json {
let log_dir = dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/logs/failed-requests");
let _ = std::fs::create_dir_all(&log_dir);
let ts = chrono::Local::now().format("%Y%m%dT%H%M%S");
let path = log_dir.join(format!("{}.json", ts));
if std::fs::write(&path, json).is_ok() {
let _ = ui_tx.send(UiMessage::Debug(format!(
"saved failed request to {} (HTTP {})", path.display(), status
)));
}
}
anyhow::bail!("HTTP {} ({}): {}", status, url, &body[..body.len().min(1000)]);
}
if debug {
let _ = ui_tx.send(UiMessage::Debug(format!(
"connected in {:.1}s (HTTP {})",
elapsed.as_secs_f64(),
status.as_u16()
)));
}
Ok(response)
}
/// SSE stream reader. Handles the generic SSE plumbing shared by both
/// backends: chunk reading with timeout, line buffering, `data:` prefix
/// stripping, `[DONE]` detection, JSON parsing, and parse error diagnostics.
/// Yields parsed events as serde_json::Value — each backend handles its
/// own event types.
pub(crate) struct SseReader {
line_buf: String,
chunk_timeout: Duration,
pub stream_start: Instant,
pub chunks_received: u64,
pub sse_lines_parsed: u64,
pub sse_parse_errors: u64,
debug: bool,
ui_tx: UiSender,
done: bool,
/// Serialized request payload — saved to disk on errors for replay debugging.
pub(crate) request_json: Option<String>,
}
impl SseReader {
pub(crate) fn new(ui_tx: &UiSender) -> Self {
Self {
line_buf: String::new(),
chunk_timeout: Duration::from_secs(crate::config::get().api_stream_timeout_secs),
stream_start: Instant::now(),
chunks_received: 0,
sse_lines_parsed: 0,
sse_parse_errors: 0,
debug: std::env::var("POC_DEBUG").is_ok(),
ui_tx: ui_tx.clone(),
done: false,
request_json: None,
}
}
/// Attach the serialized request payload for error diagnostics.
/// Save the request payload to disk for replay debugging.
fn save_failed_request(&self, reason: &str) {
let Some(ref json) = self.request_json else { return };
let log_dir = dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/logs/failed-requests");
let _ = std::fs::create_dir_all(&log_dir);
let ts = chrono::Local::now().format("%Y%m%dT%H%M%S");
let path = log_dir.join(format!("{}.json", ts));
if std::fs::write(&path, json).is_ok() {
let _ = self.ui_tx.send(UiMessage::Debug(format!(
"saved failed request to {} ({})", path.display(), reason
)));
}
}
/// Read the next SSE event from the response stream.
/// Returns Ok(Some(value)) for each parsed data line,
/// Ok(None) when the stream ends or [DONE] is received.
pub(crate) async fn next_event(
&mut self,
response: &mut reqwest::Response,
) -> Result<Option<serde_json::Value>> {
loop {
// Drain complete lines from the buffer before reading more chunks
while let Some(newline_pos) = self.line_buf.find('\n') {
let line = self.line_buf[..newline_pos].trim().to_string();
self.line_buf = self.line_buf[newline_pos + 1..].to_string();
if line == "data: [DONE]" {
self.done = true;
return Ok(None);
}
if line.is_empty()
|| line.starts_with("event: ")
|| !line.starts_with("data: ")
{
continue;
}
let json_str = &line[6..];
self.sse_lines_parsed += 1;
match serde_json::from_str(json_str) {
Ok(v) => return Ok(Some(v)),
Err(e) => {
self.sse_parse_errors += 1;
if self.sse_parse_errors == 1 || self.debug {
let preview = if json_str.len() > 200 {
format!("{}...", &json_str[..200])
} else {
json_str.to_string()
};
let _ = self.ui_tx.send(UiMessage::Debug(format!(
"SSE parse error (#{}) {}: {}",
self.sse_parse_errors, e, preview
)));
}
continue;
}
}
}
if self.done {
return Ok(None);
}
// Read more data from the response stream
match tokio::time::timeout(self.chunk_timeout, response.chunk()).await {
Ok(Ok(Some(chunk))) => {
self.chunks_received += 1;
self.line_buf.push_str(&String::from_utf8_lossy(&chunk));
}
Ok(Ok(None)) => return Ok(None),
Ok(Err(e)) => {
let buf_preview = if self.line_buf.is_empty() {
"(empty)".to_string()
} else {
let n = self.line_buf.len().min(500);
format!("{}B: {}", self.line_buf.len(), &self.line_buf[..n])
};
let msg = format!(
"stream error after {} chunks, {:.1}s, {} sse lines: {} | buf: {}",
self.chunks_received,
self.stream_start.elapsed().as_secs_f64(),
self.sse_lines_parsed,
e, buf_preview,
);
let _ = self.ui_tx.send(UiMessage::Debug(msg.clone()));
self.save_failed_request(&msg);
return Err(e.into());
}
Err(_) => {
let buf_preview = if self.line_buf.is_empty() {
"(empty)".to_string()
} else {
let n = self.line_buf.len().min(500);
format!("{}B: {}", self.line_buf.len(), &self.line_buf[..n])
};
let msg = format!(
"stream timeout: {}s, {} chunks, {} sse lines, {:.1}s elapsed | buf: {}",
self.chunk_timeout.as_secs(),
self.chunks_received,
self.sse_lines_parsed,
self.stream_start.elapsed().as_secs_f64(),
buf_preview,
);
let _ = self.ui_tx.send(UiMessage::Debug(msg.clone()));
self.save_failed_request(&msg);
anyhow::bail!(
"stream timeout: no data for {}s ({} chunks received)",
self.chunk_timeout.as_secs(),
self.chunks_received
);
}
}
}
}
}
/// Build a response Message from accumulated content and tool calls.
/// Shared by both backends — the wire format differs but the internal
/// representation is the same.
///
/// If no structured tool calls came from the API but the content
/// contains leaked tool call XML (e.g. `<tool_call>...</tool_call>`
/// from models that emit tool calls as text), parse them out and
/// promote them to structured tool_calls. This way all consumers
/// see tool calls uniformly regardless of backend.
pub fn build_response_message(
content: String,
tool_calls: Vec<ToolCall>,
) -> Message {
// If the API returned structured tool calls, use them as-is.
if !tool_calls.is_empty() {
return Message {
role: Role::Assistant,
content: if content.is_empty() { None }
else { Some(MessageContent::Text(content)) },
tool_calls: Some(tool_calls),
tool_call_id: None,
name: None,
timestamp: None,
};
}
// Check for leaked tool calls in content text.
let leaked = crate::user::parsing::parse_leaked_tool_calls(&content);
if !leaked.is_empty() {
let cleaned = crate::user::parsing::strip_leaked_artifacts(&content);
return Message {
role: Role::Assistant,
content: if cleaned.trim().is_empty() { None }
else { Some(MessageContent::Text(cleaned)) },
tool_calls: Some(leaked),
tool_call_id: None,
name: None,
timestamp: None,
};
}
Message {
role: Role::Assistant,
content: if content.is_empty() { None }
else { Some(MessageContent::Text(content)) },
tool_calls: None,
tool_call_id: None,
name: None,
timestamp: None,
}
}
/// Log stream diagnostics. Shared by both backends.
pub(crate) fn log_diagnostics(
ui_tx: &UiSender,
content_len: usize,
tool_count: usize,
reasoning_chars: usize,
reasoning_effort: &str,
finish_reason: &Option<String>,
chunks_received: u64,
sse_lines_parsed: u64,
sse_parse_errors: u64,
empty_deltas: u64,
total_elapsed: Duration,
first_content_at: Option<Duration>,
usage: &Option<Usage>,
tools: &[ToolCall],
) {
let debug = std::env::var("POC_DEBUG").is_ok();
if reasoning_chars > 0 && reasoning_effort == "none" {
let _ = ui_tx.send(UiMessage::Debug(format!(
"note: {} chars leaked reasoning (suppressed from display)",
reasoning_chars
)));
}
if content_len == 0 && tool_count == 0 {
let _ = ui_tx.send(UiMessage::Debug(format!(
"WARNING: empty response (finish: {:?}, chunks: {}, reasoning: {}, \
parse_errors: {}, empty_deltas: {}, {:.1}s)",
finish_reason, chunks_received, reasoning_chars,
sse_parse_errors, empty_deltas, total_elapsed.as_secs_f64()
)));
}
if finish_reason.is_none() && chunks_received > 0 {
let _ = ui_tx.send(UiMessage::Debug(format!(
"WARNING: stream ended without finish_reason ({} chunks, {} content chars)",
chunks_received, content_len
)));
}
if sse_parse_errors > 0 {
let _ = ui_tx.send(UiMessage::Debug(format!(
"WARNING: {} SSE parse errors out of {} lines",
sse_parse_errors, sse_lines_parsed
)));
}
if debug {
if let Some(u) = usage {
let _ = ui_tx.send(UiMessage::Debug(format!(
"tokens: {} prompt + {} completion = {} total",
u.prompt_tokens, u.completion_tokens, u.total_tokens
)));
}
let ttft = first_content_at
.map(|d| format!("{:.1}s", d.as_secs_f64()))
.unwrap_or_else(|| "none".to_string());
let _ = ui_tx.send(UiMessage::Debug(format!(
"stream: {:.1}s total, TTFT={}, {} chunks, {} SSE lines, \
{} content chars, {} reasoning chars, {} tools, \
finish={:?}",
total_elapsed.as_secs_f64(),
ttft,
chunks_received,
sse_lines_parsed,
content_len,
reasoning_chars,
tool_count,
finish_reason,
)));
if !tools.is_empty() {
for (i, tc) in tools.iter().enumerate() {
let _ = ui_tx.send(UiMessage::Debug(format!(
" tool[{}]: {} (id: {}, {} arg chars)",
i, tc.function.name, tc.id, tc.function.arguments.len()
)));
}
}
}
}

View file

@ -1,195 +0,0 @@
// api/openai.rs — OpenAI-compatible backend
//
// Works with any provider that implements the OpenAI chat completions
// API: OpenRouter, vLLM, llama.cpp, Fireworks, Together, etc.
// Also used for local models (Qwen, llama) via compatible servers.
use anyhow::Result;
use reqwest::Client;
use tokio::sync::mpsc;
use crate::user::types::*;
use crate::user::ui_channel::{UiMessage, UiSender};
use super::StreamEvent;
/// Stream SSE events from an OpenAI-compatible endpoint, sending
/// parsed StreamEvents through the channel. The caller (runner)
/// handles routing to the UI.
pub(super) async fn stream_events(
client: &Client,
base_url: &str,
api_key: &str,
model: &str,
messages: &[Message],
tools: Option<&[ToolDef]>,
tx: &mpsc::UnboundedSender<StreamEvent>,
ui_tx: &UiSender,
reasoning_effort: &str,
temperature: Option<f32>,
priority: Option<i32>,
) -> Result<()> {
let request = ChatRequest {
model: model.to_string(),
messages: messages.to_vec(),
tool_choice: tools.map(|_| "auto".to_string()),
tools: tools.map(|t| t.to_vec()),
max_tokens: Some(16384),
temperature: Some(temperature.unwrap_or(0.6)),
stream: Some(true),
reasoning: if reasoning_effort != "none" && reasoning_effort != "default" {
Some(ReasoningConfig {
enabled: true,
effort: Some(reasoning_effort.to_string()),
})
} else {
None
},
chat_template_kwargs: None,
priority,
};
let url = format!("{}/chat/completions", base_url);
let msg_count = request.messages.len();
let pri_label = match priority {
Some(p) => format!(", priority={}", p),
None => String::new(),
};
let debug_label = format!("{} messages, model={}{}", msg_count, model, pri_label);
let request_json = serde_json::to_string_pretty(&request).ok();
let mut response = super::send_and_check(
client,
&url,
&request,
("Authorization", &format!("Bearer {}", api_key)),
&[],
ui_tx,
&debug_label,
request_json.as_deref(),
)
.await?;
let mut reader = super::SseReader::new(ui_tx);
reader.request_json = request_json;
let mut content_len: usize = 0;
let mut reasoning_chars: usize = 0;
let mut tool_call_count: usize = 0;
let mut empty_deltas: u64 = 0;
let mut first_content_at = None;
let mut finish_reason = None;
let mut usage = None;
while let Some(event) = reader.next_event(&mut response).await? {
if let Some(err_msg) = event["error"]["message"].as_str() {
let raw = event["error"]["metadata"]["raw"].as_str().unwrap_or("");
let _ = ui_tx.send(UiMessage::Debug(format!(
"API error in stream: {}", err_msg
)));
anyhow::bail!("API error in stream: {} {}", err_msg, raw);
}
let chunk: ChatCompletionChunk = match serde_json::from_value(event.clone()) {
Ok(c) => c,
Err(e) => {
let preview = event.to_string();
let _ = ui_tx.send(UiMessage::Debug(format!(
"unparseable SSE event ({}): {}",
e, &preview[..preview.len().min(300)]
)));
continue;
}
};
if let Some(ref u) = chunk.usage {
let _ = tx.send(StreamEvent::Usage(u.clone()));
usage = chunk.usage;
}
for choice in &chunk.choices {
if choice.finish_reason.is_some() {
finish_reason = choice.finish_reason.clone();
}
let has_content = choice.delta.content.is_some();
let has_tools = choice.delta.tool_calls.is_some();
// Reasoning tokens — multiple field names across providers
let mut has_reasoning = false;
for r in [
choice.delta.reasoning_content.as_ref(),
choice.delta.reasoning.as_ref(),
].into_iter().flatten() {
reasoning_chars += r.len();
has_reasoning = true;
if !r.is_empty() {
let _ = tx.send(StreamEvent::Reasoning(r.clone()));
}
}
if let Some(ref r) = choice.delta.reasoning_details {
let s = r.to_string();
reasoning_chars += s.len();
has_reasoning = true;
if !s.is_empty() && s != "null" {
let _ = tx.send(StreamEvent::Reasoning(s));
}
}
if let Some(ref text_delta) = choice.delta.content {
if first_content_at.is_none() && !text_delta.is_empty() {
first_content_at = Some(reader.stream_start.elapsed());
}
content_len += text_delta.len();
let _ = tx.send(StreamEvent::Content(text_delta.clone()));
}
if let Some(ref tc_deltas) = choice.delta.tool_calls {
for tc_delta in tc_deltas {
tool_call_count = tool_call_count.max(tc_delta.index + 1);
let _ = tx.send(StreamEvent::ToolCallDelta {
index: tc_delta.index,
id: tc_delta.id.clone(),
call_type: tc_delta.call_type.clone(),
name: tc_delta.function.as_ref().and_then(|f| f.name.clone()),
arguments: tc_delta.function.as_ref().and_then(|f| f.arguments.clone()),
});
}
}
if !has_reasoning && !has_content && !has_tools && choice.finish_reason.is_none() {
empty_deltas += 1;
}
}
}
let total_elapsed = reader.stream_start.elapsed();
super::log_diagnostics(
ui_tx,
content_len,
tool_call_count,
reasoning_chars,
reasoning_effort,
&finish_reason,
reader.chunks_received,
reader.sse_lines_parsed,
reader.sse_parse_errors,
empty_deltas,
total_elapsed,
first_content_at,
&usage,
&[], // tool_calls not accumulated here anymore
);
let reason = finish_reason.unwrap_or_default();
let (pt, ct) = usage.as_ref()
.map(|u| (u.prompt_tokens, u.completion_tokens))
.unwrap_or((0, 0));
let _ = tx.send(StreamEvent::Finished {
reason,
prompt_tokens: pt,
completion_tokens: ct,
});
Ok(())
}

View file

@ -14,7 +14,7 @@ use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use crate::user::types::ConversationEntry;
use crate::agent::context::ConversationEntry;
pub struct ConversationLog {
path: PathBuf,

View file

@ -9,8 +9,6 @@
// - cli, context, dmn, identity, log, observe, parsing, tui
// Config moved to crate::config (unified with memory config)
pub mod api;
pub mod types;
pub mod ui_channel;
pub mod cli;
pub mod dmn;

View file

@ -11,7 +11,8 @@
// Also handles streaming artifacts: whitespace inside XML tags from
// token boundaries, </think> tags, etc.
use crate::user::types::*;
use crate::agent::api::types::*;
use crate::agent::tools::{ToolCall, ToolDef, FunctionCall};
/// Parse leaked tool calls from response text.
/// Looks for `<tool_call>...</tool_call>` blocks and tries both

View file

@ -15,11 +15,11 @@
// subconscious_screen.rs — F3 subconscious (consolidation agents)
// unconscious_screen.rs — F4 unconscious (memory daemon status)
mod main_screen;
mod context_screen;
mod subconscious_screen;
mod unconscious_screen;
mod thalamus_screen;
mod main;
mod context;
mod subconscious;
mod unconscious;
mod thalamus;
pub(crate) const SCREEN_LEGEND: &str = " F1=interact F2=conscious F3=subconscious F4=unconscious F5=thalamus ";
/// Subconscious agents — interact with conscious context

View file

@ -1,461 +0,0 @@
// types.rs — OpenAI-compatible API types
//
// These mirror the OpenAI chat completion API, which is the de facto
// standard that OpenRouter, vLLM, llama.cpp, and most inference
// providers implement. Using these types directly (rather than an
// SDK) means we control the wire format and can work with any
// compatible backend.
use chrono::Utc;
use serde::{Deserialize, Serialize};
// Re-export tool types that moved to agent::tools
pub use crate::agent::tools::{
ToolDef, ToolCall, ToolCallDelta, ToolOutput,
FunctionCall, FunctionDef, FunctionCallDelta,
};
/// Message content — either plain text or an array of content parts
/// (for multimodal messages with images). Serializes as a JSON string
/// for text-only, or a JSON array for multimodal.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum MessageContent {
Text(String),
Parts(Vec<ContentPart>),
}
impl MessageContent {
/// Extract the text portion of the content, ignoring images.
pub fn as_text(&self) -> &str {
match self {
MessageContent::Text(s) => s,
MessageContent::Parts(parts) => {
for part in parts {
if let ContentPart::Text { text } = part {
return text;
}
}
""
}
}
}
}
/// A single content part within a multimodal message.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum ContentPart {
#[serde(rename = "text")]
Text { text: String },
#[serde(rename = "image_url")]
ImageUrl { image_url: ImageUrl },
}
/// Image URL — either a real URL or a base64 data URI.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ImageUrl {
pub url: String,
}
/// A chat message in the conversation.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message {
pub role: Role,
pub content: Option<MessageContent>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_calls: Option<Vec<ToolCall>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_call_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
/// ISO 8601 timestamp — when this message entered the conversation.
/// Used for linking conversation ranges to journal entries during
/// compaction. Missing on messages from old session files.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timestamp: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Role {
System,
User,
Assistant,
Tool,
}
// FunctionCall, FunctionDef moved to agent::tools
/// Chat completion request.
#[derive(Debug, Serialize)]
pub struct ChatRequest {
pub model: String,
pub messages: Vec<Message>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tools: Option<Vec<ToolDef>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_choice: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub max_tokens: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub temperature: Option<f32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stream: Option<bool>,
/// OpenRouter reasoning control. Send both formats for compatibility:
/// - reasoning.enabled (older format, still seen in examples)
/// - reasoning.effort (documented: "none" disables entirely)
#[serde(skip_serializing_if = "Option::is_none")]
pub reasoning: Option<ReasoningConfig>,
/// vllm chat template kwargs — used to disable thinking on Qwen 3.5
#[serde(skip_serializing_if = "Option::is_none")]
pub chat_template_kwargs: Option<serde_json::Value>,
/// vllm request priority (lower = higher priority).
/// 0 = interactive, 1 = surface-observe, 10 = batch agents.
#[serde(skip_serializing_if = "Option::is_none")]
pub priority: Option<i32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReasoningConfig {
pub enabled: bool,
/// "none" disables reasoning entirely per OpenRouter docs.
#[serde(skip_serializing_if = "Option::is_none")]
pub effort: Option<String>,
}
/// Chat completion response (non-streaming).
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
pub struct ChatResponse {
pub choices: Vec<Choice>,
pub usage: Option<Usage>,
}
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
pub struct Choice {
pub message: Message,
pub finish_reason: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
#[allow(dead_code)]
pub struct Usage {
pub prompt_tokens: u32,
pub completion_tokens: u32,
pub total_tokens: u32,
}
// --- Streaming types ---
/// A single chunk from a streaming chat completion response (SSE).
#[derive(Debug, Deserialize)]
pub struct ChatCompletionChunk {
pub choices: Vec<ChunkChoice>,
pub usage: Option<Usage>,
}
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
pub struct ChunkChoice {
pub delta: Delta,
pub finish_reason: Option<String>,
}
/// The delta within a streaming chunk. All fields optional because each
/// chunk only carries the incremental change.
#[derive(Debug, Deserialize, Default)]
#[allow(dead_code)]
pub struct Delta {
pub role: Option<Role>,
pub content: Option<String>,
/// Reasoning/thinking content — sent by some models (Qwen, DeepSeek)
/// even when reasoning is "disabled". We capture it so we can detect
/// and log the problem rather than silently dropping responses.
/// OpenRouter uses multiple field names depending on the provider.
pub reasoning_content: Option<String>,
pub reasoning: Option<String>,
pub reasoning_details: Option<serde_json::Value>,
pub tool_calls: Option<Vec<ToolCallDelta>>,
}
// FunctionCallDelta moved to agent::tools
// --- Convenience constructors ---
impl Message {
/// Extract text content regardless of whether it's Text or Parts.
pub fn content_text(&self) -> &str {
self.content.as_ref().map_or("", |c| c.as_text())
}
pub fn role_str(&self) -> &str {
match self.role {
Role::System => "system",
Role::User => "user",
Role::Assistant => "assistant",
Role::Tool => "tool",
}
}
fn now() -> Option<String> {
Some(Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true))
}
/// Stamp a message with the current time if it doesn't already have one.
/// Used for messages from the API that we didn't construct ourselves.
pub fn stamp(&mut self) {
if self.timestamp.is_none() {
self.timestamp = Self::now();
}
}
pub fn system(content: impl Into<String>) -> Self {
Self {
role: Role::System,
content: Some(MessageContent::Text(content.into())),
tool_calls: None,
tool_call_id: None,
name: None,
timestamp: Self::now(),
}
}
pub fn user(content: impl Into<String>) -> Self {
Self {
role: Role::User,
content: Some(MessageContent::Text(content.into())),
tool_calls: None,
tool_call_id: None,
name: None,
timestamp: Self::now(),
}
}
/// User message with text and images (for multimodal/vision).
pub fn user_with_images(text: &str, image_data_uris: &[String]) -> Self {
let mut parts = vec![ContentPart::Text {
text: text.to_string(),
}];
for uri in image_data_uris {
parts.push(ContentPart::ImageUrl {
image_url: ImageUrl {
url: uri.clone(),
},
});
}
Self {
role: Role::User,
content: Some(MessageContent::Parts(parts)),
tool_calls: None,
tool_call_id: None,
name: None,
timestamp: Self::now(),
}
}
#[allow(dead_code)]
pub fn assistant(content: impl Into<String>) -> Self {
Self {
role: Role::Assistant,
content: Some(MessageContent::Text(content.into())),
tool_calls: None,
tool_call_id: None,
name: None,
timestamp: Self::now(),
}
}
pub fn tool_result(id: impl Into<String>, content: impl Into<String>) -> Self {
Self {
role: Role::Tool,
content: Some(MessageContent::Text(content.into())),
tool_calls: None,
tool_call_id: Some(id.into()),
name: None,
timestamp: Self::now(),
}
}
}
impl ToolDef {
pub fn new(name: &str, description: &str, parameters: serde_json::Value) -> Self {
Self {
tool_type: "function".to_string(),
function: FunctionDef {
name: name.to_string(),
description: description.to_string(),
parameters,
},
}
}
}
/// Mutable context state — the structured regions of the context window.
/// Conversation entry — either a regular message or memory content.
/// Memory entries preserve the original message for KV cache round-tripping.
#[derive(Debug, Clone)]
pub enum ConversationEntry {
Message(Message),
Memory { key: String, message: Message },
}
// Custom serde: serialize Memory with a "memory_key" field added to the message,
// plain messages serialize as-is. This keeps the conversation log readable.
impl Serialize for ConversationEntry {
fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
use serde::ser::SerializeMap;
match self {
Self::Message(m) => m.serialize(s),
Self::Memory { key, message } => {
// Serialize message fields + memory_key
let json = serde_json::to_value(message).map_err(serde::ser::Error::custom)?;
let mut map = s.serialize_map(None)?;
if let serde_json::Value::Object(obj) = json {
for (k, v) in obj {
map.serialize_entry(&k, &v)?;
}
}
map.serialize_entry("memory_key", key)?;
map.end()
}
}
}
}
impl<'de> Deserialize<'de> for ConversationEntry {
fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
let mut json: serde_json::Value = serde_json::Value::deserialize(d)?;
if let Some(key) = json.as_object_mut().and_then(|o| o.remove("memory_key")) {
let key = key.as_str().unwrap_or("").to_string();
let message: Message = serde_json::from_value(json).map_err(serde::de::Error::custom)?;
Ok(Self::Memory { key, message })
} else {
let message: Message = serde_json::from_value(json).map_err(serde::de::Error::custom)?;
Ok(Self::Message(message))
}
}
}
impl ConversationEntry {
/// Get the API message for sending to the model.
pub fn api_message(&self) -> &Message {
match self {
Self::Message(m) => m,
Self::Memory { message, .. } => message,
}
}
pub fn is_memory(&self) -> bool {
matches!(self, Self::Memory { .. })
}
/// Get a reference to the inner message.
pub fn message(&self) -> &Message {
match self {
Self::Message(m) => m,
Self::Memory { message, .. } => message,
}
}
/// Get a mutable reference to the inner message.
pub fn message_mut(&mut self) -> &mut Message {
match self {
Self::Message(m) => m,
Self::Memory { message, .. } => message,
}
}
}
#[derive(Clone)]
pub struct ContextState {
pub system_prompt: String,
pub personality: Vec<(String, String)>,
pub journal: Vec<crate::agent::context::JournalEntry>,
pub working_stack: Vec<String>,
/// Conversation entries — messages and memory, interleaved in order.
/// Does NOT include system prompt, personality, or journal.
pub entries: Vec<ConversationEntry>,
}
// TODO: these should not be hardcoded absolute paths
pub fn working_stack_instructions_path() -> std::path::PathBuf {
dirs::home_dir().unwrap_or_default().join(".consciousness/config/working-stack.md")
}
pub fn working_stack_file_path() -> std::path::PathBuf {
dirs::home_dir().unwrap_or_default().join(".consciousness/working-stack.json")
}
impl ContextState {
/// Compute the context budget from typed sources.
pub fn budget(&self, count_str: &dyn Fn(&str) -> usize,
count_msg: &dyn Fn(&Message) -> usize,
window_tokens: usize) -> ContextBudget {
let id = count_str(&self.system_prompt)
+ self.personality.iter().map(|(_, c)| count_str(c)).sum::<usize>();
let jnl: usize = self.journal.iter().map(|e| count_str(&e.content)).sum();
let mut mem = 0;
let mut conv = 0;
for entry in &self.entries {
let tokens = count_msg(entry.api_message());
if entry.is_memory() { mem += tokens } else { conv += tokens }
}
ContextBudget {
identity_tokens: id,
memory_tokens: mem,
journal_tokens: jnl,
conversation_tokens: conv,
window_tokens,
}
}
pub fn render_context_message(&self) -> String {
let mut parts: Vec<String> = self.personality.iter()
.map(|(name, content)| format!("## {}\n\n{}", name, content))
.collect();
let instructions = std::fs::read_to_string(working_stack_instructions_path()).unwrap_or_default();
let mut stack_section = instructions;
if self.working_stack.is_empty() {
stack_section.push_str("\n## Current stack\n\n(empty)\n");
} else {
stack_section.push_str("\n## Current stack\n\n");
for (i, item) in self.working_stack.iter().enumerate() {
if i == self.working_stack.len() - 1 {
stack_section.push_str(&format!("{}\n", item));
} else {
stack_section.push_str(&format!(" [{}] {}\n", i, item));
}
}
}
parts.push(stack_section);
parts.join("\n\n---\n\n")
}
}
#[derive(Debug, Clone, Default)]
pub struct ContextBudget {
pub identity_tokens: usize,
pub memory_tokens: usize,
pub journal_tokens: usize,
pub conversation_tokens: usize,
pub window_tokens: usize,
}
impl ContextBudget {
pub fn used(&self) -> usize {
self.identity_tokens + self.memory_tokens + self.journal_tokens + self.conversation_tokens
}
pub fn free(&self) -> usize {
self.window_tokens.saturating_sub(self.used())
}
pub fn status_string(&self) -> String {
let total = self.window_tokens;
if total == 0 { return String::new(); }
let pct = |n: usize| if n == 0 { 0 } else { ((n * 100) / total).max(1) };
format!("id:{}% mem:{}% jnl:{}% conv:{}% free:{}%",
pct(self.identity_tokens), pct(self.memory_tokens),
pct(self.journal_tokens), pct(self.conversation_tokens), pct(self.free()))
}
}

View file

@ -11,16 +11,11 @@
// The channel also fans out to a broadcast channel so the observation
// socket (observe.rs) can subscribe without touching the main path.
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
/// Shared, live context state — agent writes, TUI reads for the debug screen.
pub type SharedContextState = Arc<RwLock<Vec<ContextSection>>>;
/// Create a new shared context state.
pub fn shared_context_state() -> SharedContextState {
Arc::new(RwLock::new(Vec::new()))
}
// Re-export context types that moved to agent::context
pub use crate::agent::context::{ContextSection, SharedContextState, shared_context_state};
// ActiveToolCall lives in agent::tools — re-export for TUI access
pub use crate::agent::tools::ActiveToolCall;
@ -57,15 +52,6 @@ pub struct StatusInfo {
pub context_budget: String,
}
/// A section of the context window, possibly with children.
#[derive(Debug, Clone)]
pub struct ContextSection {
pub name: String,
pub tokens: usize,
pub content: String,
pub children: Vec<ContextSection>,
}
/// Context loading details for the debug screen.
#[derive(Debug, Clone)]
pub struct ContextInfo {