diff --git a/src/agent/api/mod.rs b/src/agent/api/mod.rs index be5e58e..ffbc1db 100644 --- a/src/agent/api/mod.rs +++ b/src/agent/api/mod.rs @@ -58,12 +58,13 @@ pub(crate) struct SamplingParams { // Stream events — yielded by backends, consumed by the runner // ───────────────────────────────────────────────────────────── -/// One token from the streaming completions API. +/// One event from a streaming LLM response. pub enum StreamToken { /// A sampled token, optionally with its per-layer concept readout. - /// `readout` is `None` when the server has readout disabled or - /// returned no readout for this chunk. + /// Used by the vLLM completions backend. Token { id: u32, readout: Option }, + /// A text delta from a chat completions API. + TextDelta(String), Done { usage: Option }, Error(String), } @@ -150,6 +151,117 @@ impl ApiClient { Ok(Some(response.json().await?)) } + /// Stream a chat completion from an OpenAI-compatible chat/completions API. + pub(crate) fn stream_chat_completion( + &self, + messages: &[super::context::ChatMessage], + sampling: SamplingParams, + ) -> (mpsc::UnboundedReceiver, AbortOnDrop) { + let (tx, rx) = mpsc::unbounded_channel(); + let client = self.client.clone(); + let api_key = self.api_key.clone(); + let model = self.model.clone(); + let base_url = self.base_url.clone(); + let messages = messages.to_vec(); + + let handle = tokio::spawn(async move { + let result = stream_chat( + &client, &base_url, &api_key, &model, + &messages, &tx, sampling, + ).await; + if let Err(e) = result { + let _ = tx.send(StreamToken::Error(e.to_string())); + } + }); + + (rx, AbortOnDrop(handle)) + } +} + +async fn stream_chat( + client: &HttpClient, + base_url: &str, + api_key: &str, + model: &str, + messages: &[super::context::ChatMessage], + tx: &mpsc::UnboundedSender, + sampling: SamplingParams, +) -> anyhow::Result<()> { + let wire_messages: Vec = messages.iter().map(|m| { + if m.images.is_empty() { + serde_json::json!({ + "role": m.role, + "content": m.content, + }) + } else { + use base64::Engine; + let b64 = base64::engine::general_purpose::STANDARD; + let mut parts: Vec = vec![ + serde_json::json!({ "type": "text", "text": m.content }), + ]; + for img in &m.images { + parts.push(serde_json::json!({ + "type": "image_url", + "image_url": { + "url": format!("data:{};base64,{}", img.mime, b64.encode(&img.bytes)), + }, + })); + } + serde_json::json!({ + "role": m.role, + "content": parts, + }) + } + }).collect(); + + let request = serde_json::json!({ + "model": model, + "messages": wire_messages, + "max_tokens": 16384, + "temperature": sampling.temperature, + "top_p": sampling.top_p, + "stream": true, + }); + + let url = format!("{}/chat/completions", base_url); + let debug_label = format!("{} messages, model={}", messages.len(), model); + + let mut response = send_and_check( + client, &url, &request, + ("Authorization", &format!("Bearer {}", api_key)), + &[], &debug_label, None, + ).await?; + + let mut reader = SseReader::new(); + let mut usage = None; + + while let Some(event) = reader.next_event(&mut response).await? { + if let Some(err_msg) = event["error"]["message"].as_str() { + anyhow::bail!("API error in stream: {}", err_msg); + } + + if let Some(u) = event["usage"].as_object() { + if let Ok(u) = serde_json::from_value::(serde_json::Value::Object(u.clone())) { + usage = Some(u); + } + } + + let choices = match event["choices"].as_array() { + Some(c) => c, + None => continue, + }; + + for choice in choices { + if let Some(delta) = choice["delta"]["content"].as_str() { + if !delta.is_empty() { + let _ = tx.send(StreamToken::TextDelta(delta.to_string())); + } + } + } + } + + let _ = tx.send(StreamToken::Done { usage }); + Ok(()) } async fn stream_completions( diff --git a/src/agent/context.rs b/src/agent/context.rs index 2009cfc..01a621b 100644 --- a/src/agent/context.rs +++ b/src/agent/context.rs @@ -215,7 +215,7 @@ impl Role { impl NodeBody { /// Render this leaf body to text for the prompt. - fn render_into(&self, out: &mut String) { + pub(crate) fn render_into(&self, out: &mut String) { match self { Self::Content(text) => out.push_str(text), Self::Thinking(text) => { @@ -310,7 +310,14 @@ impl NodeLeaf { pub fn body(&self) -> &NodeBody { &self.body } pub fn token_ids(&self) -> &[u32] { &self.token_ids } - pub fn tokens(&self) -> usize { self.token_ids.len() } + pub fn tokens(&self) -> usize { + if self.token_ids.is_empty() { + // No tokenizer — estimate from byte length (~4 bytes per token) + (self.body.text().len() + 3) / 4 + } else { + self.token_ids.len() + } + } pub fn timestamp(&self) -> DateTime { self.timestamp } } @@ -513,9 +520,14 @@ impl Ast for AstNode { match self { Self::Leaf(leaf) => leaf.tokens(), Self::Branch { role, children, .. } => { - 1 + role_header_tokens(*role) + let header = role_header_tokens(*role); + let nl = newline_tokens(); + // If tokenizer isn't loaded, use reasonable estimates + let header = if header == 0 { 2 } else { header }; + let nl = if nl == 0 { 1 } else { nl }; + 1 + header + children.iter().map(|c| c.tokens()).sum::() - + 1 + newline_tokens() + + 1 + nl } } } @@ -713,6 +725,23 @@ impl ResponseParser { let _ = tx.send(call); } } + super::api::StreamToken::TextDelta(text) => { + full_text.push_str(&text); + let mut ctx = agent.context.lock().await; + let calls = parser.feed_token(&text, &mut ctx); + if !calls.is_empty() { + if let Some(ref mut f) = log_file { + use std::io::Write; + for c in &calls { + let end = c.arguments.floor_char_boundary(c.arguments.len().min(200)); + let _ = writeln!(f, "tool_call: {} args={}", c.name, &c.arguments[..end]); + } + } + } + for call in calls { + let _ = tx.send(call); + } + } super::api::StreamToken::Done { usage } => { if let Some(ref mut f) = log_file { use std::io::Write; @@ -902,6 +931,7 @@ impl Ast for ContextState { /// accounting; the wire form collapses each Image to a single /// `<|image_pad|>` between vision bookends and ships the bytes /// separately as multi_modal_data. +#[derive(Debug, Clone)] pub struct WireImage { pub bytes: Vec, pub mime: String, @@ -1042,6 +1072,132 @@ impl ContextState { } (tokens, images, assistant_ranges) } + + /// Render the context as a messages array for chat completions APIs. + /// Each message is (role, content, images). Self-wrapping leaves + /// (Memory, Dmn) are folded into system messages; ToolResults become + /// user messages. + pub fn wire_messages( + &self, + conv_range: std::ops::Range, + ) -> Vec { + let mut messages: Vec = Vec::new(); + + // System + identity + journal all merge into one big system message + let mut system_text = String::new(); + for node in self.system() { + message_text_into(node, &mut system_text); + } + for node in self.identity() { + message_text_into(node, &mut system_text); + } + for node in self.journal() { + message_text_into(node, &mut system_text); + } + if !system_text.is_empty() { + messages.push(ChatMessage { + role: "system".into(), + content: system_text, + images: Vec::new(), + }); + } + + // Conversation entries become individual messages + for node in &self.conversation()[conv_range] { + match node { + AstNode::Branch { role, children, .. } => { + let mut content = String::new(); + let mut images = Vec::new(); + for child in children { + match child { + AstNode::Leaf(leaf) => match leaf.body() { + NodeBody::Image { bytes, mime, .. } => { + images.push(WireImage { + bytes: bytes.clone(), + mime: mime.clone(), + }); + } + NodeBody::Log(_) => {} + other => { + other.render_into(&mut content); + } + }, + AstNode::Branch { .. } => { + message_text_into(child, &mut content); + } + } + } + if !content.is_empty() || !images.is_empty() { + messages.push(ChatMessage { + role: role.as_str().to_string(), + content, + images, + }); + } + } + AstNode::Leaf(leaf) => match leaf.body() { + NodeBody::Memory { text, .. } => { + messages.push(ChatMessage { + role: "system".into(), + content: format!("[memory]\n{}", text), + images: Vec::new(), + }); + } + NodeBody::Dmn(text) => { + messages.push(ChatMessage { + role: "system".into(), + content: format!("[dmn]\n{}", text), + images: Vec::new(), + }); + } + NodeBody::ToolResult(text) => { + messages.push(ChatMessage { + role: "user".into(), + content: format!("\n{}\n", text), + images: Vec::new(), + }); + } + NodeBody::Log(_) => {} + other => { + let mut content = String::new(); + other.render_into(&mut content); + if !content.is_empty() { + messages.push(ChatMessage { + role: "system".into(), + content, + images: Vec::new(), + }); + } + } + }, + } + } + + messages + } +} + +/// A message for the chat completions API. +#[derive(Debug, Clone, serde::Serialize)] +pub struct ChatMessage { + pub role: String, + pub content: String, + #[serde(skip)] + pub images: Vec, +} + +/// Render an AST node to text for chat message content. +fn message_text_into(node: &AstNode, out: &mut String) { + match node { + AstNode::Leaf(leaf) => leaf.body().render_into(out), + AstNode::Branch { role, children, .. } => { + out.push_str(&format!("[{}]\n", role.as_str())); + for child in children { + message_text_into(child, out); + } + out.push('\n'); + } + } } impl ContextState { diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 2c3a98a..20cb5ff 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -148,6 +148,8 @@ pub struct Agent { /// token handler, read by UI screens (amygdala). Manifest is /// `None` when the server has readout disabled. pub readout: readout::SharedReadoutBuffer, + /// Use chat completions API instead of raw token completions. + pub chat_api: bool, } /// Mutable agent state — behind its own mutex. @@ -193,6 +195,7 @@ impl Agent { conversation_log: Option, active_tools: tools::ActiveTools, agent_tools: Vec, + chat_api: bool, ) -> Arc { let mut context = ContextState::new(); context.conversation_log = conversation_log; @@ -224,6 +227,7 @@ impl Agent { session_id, context: crate::Mutex::new(context), readout, + chat_api, state: crate::Mutex::new(AgentState { tools: agent_tools, mcp_tools: McpToolAccess::All, @@ -292,6 +296,7 @@ impl Agent { // shouldn't bleed into the main emotional readout even // though they hit the same vLLM server. readout: readout::new_shared(), + chat_api: self.chat_api, state: crate::Mutex::new(AgentState { tools, mcp_tools: McpToolAccess::None, @@ -347,6 +352,15 @@ impl Agent { (tokens, images) } + /// Assemble messages for chat completions API. + pub async fn assemble_chat_messages(&self) -> Vec { + let mut ctx = self.context.lock().await; + if ctx.total_tokens() > context::context_budget_tokens() { + ctx.trim_conversation(); + } + ctx.wire_messages(0..ctx.conversation().len()) + } + /// Rebuild the tools section of the system prompt from the current tools list. pub async fn rebuild_tools(&self) { let st = self.state.lock().await; @@ -404,18 +418,23 @@ impl Agent { let _thinking = start_activity(&agent, "thinking...").await; let (rx, _stream_guard) = { - let (prompt_tokens, images) = agent.assemble_prompt().await; let st = agent.state.lock().await; - agent.client.stream_completion_mm( - &prompt_tokens, - &images, - api::SamplingParams { - temperature: st.temperature, - top_p: st.top_p, - top_k: st.top_k, - }, - st.priority, - ) + let sampling = api::SamplingParams { + temperature: st.temperature, + top_p: st.top_p, + top_k: st.top_k, + }; + let priority = st.priority; + drop(st); + if agent.chat_api { + let messages = agent.assemble_chat_messages().await; + agent.client.stream_chat_completion(&messages, sampling) + } else { + let (prompt_tokens, images) = agent.assemble_prompt().await; + agent.client.stream_completion_mm( + &prompt_tokens, &images, sampling, priority, + ) + } }; let branch_idx = { @@ -427,7 +446,7 @@ impl Agent { idx }; - let parser = ResponseParser::new(branch_idx); + let parser = ResponseParser::new(branch_idx, false); let (mut tool_rx, parser_handle) = parser.run(rx, agent.clone()); let mut pending_calls: Vec = Vec::new(); diff --git a/src/agent/oneshot.rs b/src/agent/oneshot.rs index 8bc8b53..e9e0308 100644 --- a/src/agent/oneshot.rs +++ b/src/agent/oneshot.rs @@ -164,6 +164,7 @@ pub struct AutoAgent { pub enabled: bool, pub temperature: f32, pub priority: i32, + pub model: Option, } @@ -231,6 +232,7 @@ impl AutoAgent { steps: Vec, temperature: f32, priority: i32, + model: Option, ) -> Self { assert!(!name.is_empty(), "AutoAgent::new called with empty name"); Self { @@ -240,6 +242,7 @@ impl AutoAgent { enabled: true, temperature, priority, + model, } } @@ -251,7 +254,8 @@ impl AutoAgent { let cli = crate::user::CliArgs::default(); let (app, _) = crate::config::load_app(&cli) .map_err(|e| format!("config: {}", e))?; - let resolved = app.resolve_model(&app.default_backend) + let backend_name = self.model.as_deref().unwrap_or(&app.default_backend); + let resolved = app.resolve_model(backend_name) .map_err(|e| format!("API not configured: {}", e))?; let client = super::api::ApiClient::new( &resolved.api_base, &resolved.api_key, &resolved.model_id); @@ -264,6 +268,7 @@ impl AutoAgent { None, super::tools::ActiveTools::new(), super::tools::tools(), + resolved.chat_api, ).await; { let mut st = agent.state.lock().await; @@ -557,6 +562,7 @@ pub async fn call_api_with_tools( steps, temperature.unwrap_or(0.6), priority, + None, ); auto.run(bail_fn).await } diff --git a/src/config.rs b/src/config.rs index 209bdc1..c65b3a9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -288,6 +288,11 @@ pub struct BackendConfig { /// Context window size in tokens. #[serde(default, skip_serializing_if = "Option::is_none")] pub context_window: Option, + /// Use chat completions API (/v1/chat/completions) instead of + /// raw completions (/v1/completions). Required for cloud API + /// providers (OpenRouter, Anthropic, etc). + #[serde(default)] + pub chat_api: bool, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -370,6 +375,8 @@ pub struct SessionConfig { pub app: AppConfig, /// Disable background agents (surface, observe, scoring) pub no_agents: bool, + /// Use chat completions API instead of raw completions. + pub chat_api: bool, } /// A fully resolved model ready to construct an ApiClient. @@ -380,6 +387,7 @@ pub struct ResolvedModel { pub api_key: String, pub model_id: String, pub context_window: Option, + pub chat_api: bool, } impl AppConfig { @@ -415,6 +423,7 @@ impl AppConfig { session_dir, app: self.clone(), no_agents: cli.no_agents, + chat_api: resolved.chat_api, }) } @@ -439,6 +448,7 @@ impl AppConfig { api_key: b.api_key.clone(), model_id: b.model_id.clone(), context_window: b.context_window, + chat_api: b.chat_api, }) } diff --git a/src/mind/mod.rs b/src/mind/mod.rs index f1ddb54..04b8826 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -392,6 +392,7 @@ impl Mind { conversation_log, crate::agent::tools::ActiveTools::new(), crate::agent::tools::tools(), + config.chat_api, ).await; // Migrate legacy "file exists = enabled" sentinel for the @@ -552,7 +553,9 @@ impl Mind { // Kick off an incremental scoring pass on startup so memories due // for re-scoring get evaluated without requiring a user message. - self.memory_scoring.trigger(); + if !self.config.chat_api { + self.memory_scoring.trigger(); + } } pub fn turn_watch(&self) -> tokio::sync::watch::Receiver { @@ -572,10 +575,14 @@ impl Mind { } } MindCommand::Score => { - self.memory_scoring.trigger(); + if !self.config.chat_api { + self.memory_scoring.trigger(); + } } MindCommand::ScoreFull => { - self.memory_scoring.trigger_full(); + if !self.config.chat_api { + self.memory_scoring.trigger_full(); + } } MindCommand::Interrupt => { self.shared.lock().unwrap().interrupt(); @@ -606,10 +613,14 @@ impl Mind { self.agent.compact().await; } MindCommand::ScoreFinetune => { - self.finetune_scoring.trigger(); + if !self.config.chat_api { + self.finetune_scoring.trigger(); + } } MindCommand::Compare => { - self.compare_scoring.trigger(); + if !self.config.chat_api { + self.compare_scoring.trigger(); + } } MindCommand::SetLearnThreshold(value) => { if let Err(e) = crate::config_writer::set_learn_threshold(value) { @@ -691,7 +702,7 @@ impl Mind { let mut sub_handle: Option> = None; // Start finetune scoring at startup (scores existing conversation) - if !self.config.no_agents { + if !self.config.no_agents && !self.config.chat_api { self.finetune_scoring.trigger(); } @@ -729,7 +740,7 @@ impl Mind { } cmds.push(MindCommand::Compact); - if !self.config.no_agents { + if !self.config.no_agents && !self.config.chat_api { cmds.push(MindCommand::Score); cmds.push(MindCommand::ScoreFinetune); } diff --git a/src/mind/subconscious.rs b/src/mind/subconscious.rs index 21cc549..44497aa 100644 --- a/src/mind/subconscious.rs +++ b/src/mind/subconscious.rs @@ -357,6 +357,7 @@ impl SubconsciousAgent { let auto = AutoAgent::new( name.to_string(), tools, steps, def.temperature.unwrap_or(0.6), def.priority, + def.model.clone(), ); Some(Self { diff --git a/src/mind/unconscious.rs b/src/mind/unconscious.rs index 4f9a0ca..65b9231 100644 --- a/src/mind/unconscious.rs +++ b/src/mind/unconscious.rs @@ -101,6 +101,7 @@ impl Unconscious { let auto = AutoAgent::new( def.agent.clone(), effective_tools, steps, def.temperature.unwrap_or(0.6), def.priority, + def.model.clone(), ); agents.push(UnconsciousAgent { name: def.agent.clone(), @@ -285,7 +286,8 @@ pub async fn prepare_spawn(name: &str, mut auto: AutoAgent, wake: std::sync::Arc return Err(auto); } }; - let resolved = match app.resolve_model(&app.default_backend) { + let backend_name = auto.model.as_deref().unwrap_or(&app.default_backend); + let resolved = match app.resolve_model(backend_name) { Ok(r) => r, Err(e) => { dbglog!("[unconscious] API not configured: {}", e); @@ -302,6 +304,7 @@ pub async fn prepare_spawn(name: &str, mut auto: AutoAgent, wake: std::sync::Arc app, None, crate::agent::tools::ActiveTools::new(), auto.tools.clone(), + resolved.chat_api, ).await; { let mut st = agent.state.lock().await; diff --git a/src/subconscious/defs.rs b/src/subconscious/defs.rs index a862c8d..4b81e44 100644 --- a/src/subconscious/defs.rs +++ b/src/subconscious/defs.rs @@ -47,6 +47,8 @@ pub struct AgentDef { /// Bail check command — run between steps with pid file path as $1, /// cwd = state dir. Non-zero exit = stop the pipeline. pub bail: Option, + /// Optional backend override (falls back to app.default_backend). + pub model: Option, } /// The JSON header portion (first line of the file). @@ -78,6 +80,9 @@ struct AgentHeader { /// cwd = state dir. Non-zero exit = stop the pipeline. #[serde(default)] bail: Option, + /// Backend override — use this instead of default_backend. + #[serde(default)] + model: Option, } fn default_priority() -> i32 { 10 } @@ -149,6 +154,7 @@ fn parse_agent_file(content: &str) -> Option { temperature: header.temperature, priority: header.priority, bail: header.bail, + model: header.model, }) } diff --git a/src/subconscious/generate.rs b/src/subconscious/generate.rs index 8d75f1b..111e21f 100644 --- a/src/subconscious/generate.rs +++ b/src/subconscious/generate.rs @@ -37,6 +37,7 @@ where F: FnMut(&AstNode) -> bool, while let Some(tok) = rx.recv().await { match tok { StreamToken::Token { id, .. } => tokens.push(id), + StreamToken::TextDelta(text) => tokens.extend(tokenizer::encode(&text)), StreamToken::Done { .. } => break, StreamToken::Error(e) => anyhow::bail!("generation error: {}", e), }