Add cloud API support and per-agent model override

Cloud API support:
- Add chat_api config flag to BackendConfig, threaded through
  SessionConfig → ResolvedModel → Agent → Mind
- New StreamToken::TextDelta variant for chat completions streaming
- stream_chat_completion() method on ApiClient: builds messages array,
  sends to /v1/chat/completions, parses SSE stream
- ChatMessage struct and wire_messages() on ContextState: converts the
  AST (system/identity/journal/conversation nodes) into a messages
  array for the chat API, handling images as base64 data URIs
- ResponseParser handles TextDelta alongside Token variants
- TUI rendering fix: tokens() returns byte-length estimate (~4
  bytes/token) when tokenizer isn't loaded, so the change detector
  actually triggers re-renders
- Gate all vLLM-specific scoring (memory scoring, finetune scoring,
  compare scoring) behind !chat_api checks

Per-agent model override:
- Add model field to agent definition headers (.agent files)
- Thread through AutoAgent → prepare_spawn → resolve_model
- Agents fall back to default_backend when model is unset
- Enables cheaper backends (e.g. Kimi) for graph maintenance agents
  while keeping Sonnet for conversation

Tested: end-to-end with Poe API + Haiku, chat_api: true in config.
TUI starts, messages send, responses stream and render.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Waffles 2026-05-22 15:39:13 -04:00
commit 6c26cee86e
10 changed files with 353 additions and 28 deletions

View file

@ -58,12 +58,13 @@ pub(crate) struct SamplingParams {
// Stream events — yielded by backends, consumed by the runner
// ─────────────────────────────────────────────────────────────
/// One token from the streaming completions API.
/// One event from a streaming LLM response.
pub enum StreamToken {
/// A sampled token, optionally with its per-layer concept readout.
/// `readout` is `None` when the server has readout disabled or
/// returned no readout for this chunk.
/// Used by the vLLM completions backend.
Token { id: u32, readout: Option<TokenReadout> },
/// A text delta from a chat completions API.
TextDelta(String),
Done { usage: Option<Usage> },
Error(String),
}
@ -150,6 +151,117 @@ impl ApiClient {
Ok(Some(response.json().await?))
}
/// Stream a chat completion from an OpenAI-compatible chat/completions API.
pub(crate) fn stream_chat_completion(
&self,
messages: &[super::context::ChatMessage],
sampling: SamplingParams,
) -> (mpsc::UnboundedReceiver<StreamToken>, AbortOnDrop) {
let (tx, rx) = mpsc::unbounded_channel();
let client = self.client.clone();
let api_key = self.api_key.clone();
let model = self.model.clone();
let base_url = self.base_url.clone();
let messages = messages.to_vec();
let handle = tokio::spawn(async move {
let result = stream_chat(
&client, &base_url, &api_key, &model,
&messages, &tx, sampling,
).await;
if let Err(e) = result {
let _ = tx.send(StreamToken::Error(e.to_string()));
}
});
(rx, AbortOnDrop(handle))
}
}
async fn stream_chat(
client: &HttpClient,
base_url: &str,
api_key: &str,
model: &str,
messages: &[super::context::ChatMessage],
tx: &mpsc::UnboundedSender<StreamToken>,
sampling: SamplingParams,
) -> anyhow::Result<()> {
let wire_messages: Vec<serde_json::Value> = messages.iter().map(|m| {
if m.images.is_empty() {
serde_json::json!({
"role": m.role,
"content": m.content,
})
} else {
use base64::Engine;
let b64 = base64::engine::general_purpose::STANDARD;
let mut parts: Vec<serde_json::Value> = vec![
serde_json::json!({ "type": "text", "text": m.content }),
];
for img in &m.images {
parts.push(serde_json::json!({
"type": "image_url",
"image_url": {
"url": format!("data:{};base64,{}", img.mime, b64.encode(&img.bytes)),
},
}));
}
serde_json::json!({
"role": m.role,
"content": parts,
})
}
}).collect();
let request = serde_json::json!({
"model": model,
"messages": wire_messages,
"max_tokens": 16384,
"temperature": sampling.temperature,
"top_p": sampling.top_p,
"stream": true,
});
let url = format!("{}/chat/completions", base_url);
let debug_label = format!("{} messages, model={}", messages.len(), model);
let mut response = send_and_check(
client, &url, &request,
("Authorization", &format!("Bearer {}", api_key)),
&[], &debug_label, None,
).await?;
let mut reader = SseReader::new();
let mut usage = None;
while let Some(event) = reader.next_event(&mut response).await? {
if let Some(err_msg) = event["error"]["message"].as_str() {
anyhow::bail!("API error in stream: {}", err_msg);
}
if let Some(u) = event["usage"].as_object() {
if let Ok(u) = serde_json::from_value::<Usage>(serde_json::Value::Object(u.clone())) {
usage = Some(u);
}
}
let choices = match event["choices"].as_array() {
Some(c) => c,
None => continue,
};
for choice in choices {
if let Some(delta) = choice["delta"]["content"].as_str() {
if !delta.is_empty() {
let _ = tx.send(StreamToken::TextDelta(delta.to_string()));
}
}
}
}
let _ = tx.send(StreamToken::Done { usage });
Ok(())
}
async fn stream_completions(

View file

@ -215,7 +215,7 @@ impl Role {
impl NodeBody {
/// Render this leaf body to text for the prompt.
fn render_into(&self, out: &mut String) {
pub(crate) fn render_into(&self, out: &mut String) {
match self {
Self::Content(text) => out.push_str(text),
Self::Thinking(text) => {
@ -310,7 +310,14 @@ impl NodeLeaf {
pub fn body(&self) -> &NodeBody { &self.body }
pub fn token_ids(&self) -> &[u32] { &self.token_ids }
pub fn tokens(&self) -> usize { self.token_ids.len() }
pub fn tokens(&self) -> usize {
if self.token_ids.is_empty() {
// No tokenizer — estimate from byte length (~4 bytes per token)
(self.body.text().len() + 3) / 4
} else {
self.token_ids.len()
}
}
pub fn timestamp(&self) -> DateTime<Utc> { self.timestamp }
}
@ -513,9 +520,14 @@ impl Ast for AstNode {
match self {
Self::Leaf(leaf) => leaf.tokens(),
Self::Branch { role, children, .. } => {
1 + role_header_tokens(*role)
let header = role_header_tokens(*role);
let nl = newline_tokens();
// If tokenizer isn't loaded, use reasonable estimates
let header = if header == 0 { 2 } else { header };
let nl = if nl == 0 { 1 } else { nl };
1 + header
+ children.iter().map(|c| c.tokens()).sum::<usize>()
+ 1 + newline_tokens()
+ 1 + nl
}
}
}
@ -713,6 +725,23 @@ impl ResponseParser {
let _ = tx.send(call);
}
}
super::api::StreamToken::TextDelta(text) => {
full_text.push_str(&text);
let mut ctx = agent.context.lock().await;
let calls = parser.feed_token(&text, &mut ctx);
if !calls.is_empty() {
if let Some(ref mut f) = log_file {
use std::io::Write;
for c in &calls {
let end = c.arguments.floor_char_boundary(c.arguments.len().min(200));
let _ = writeln!(f, "tool_call: {} args={}", c.name, &c.arguments[..end]);
}
}
}
for call in calls {
let _ = tx.send(call);
}
}
super::api::StreamToken::Done { usage } => {
if let Some(ref mut f) = log_file {
use std::io::Write;
@ -902,6 +931,7 @@ impl Ast for ContextState {
/// accounting; the wire form collapses each Image to a single
/// `<|image_pad|>` between vision bookends and ships the bytes
/// separately as multi_modal_data.
#[derive(Debug, Clone)]
pub struct WireImage {
pub bytes: Vec<u8>,
pub mime: String,
@ -1042,6 +1072,132 @@ impl ContextState {
}
(tokens, images, assistant_ranges)
}
/// Render the context as a messages array for chat completions APIs.
/// Each message is (role, content, images). Self-wrapping leaves
/// (Memory, Dmn) are folded into system messages; ToolResults become
/// user messages.
pub fn wire_messages(
&self,
conv_range: std::ops::Range<usize>,
) -> Vec<ChatMessage> {
let mut messages: Vec<ChatMessage> = Vec::new();
// System + identity + journal all merge into one big system message
let mut system_text = String::new();
for node in self.system() {
message_text_into(node, &mut system_text);
}
for node in self.identity() {
message_text_into(node, &mut system_text);
}
for node in self.journal() {
message_text_into(node, &mut system_text);
}
if !system_text.is_empty() {
messages.push(ChatMessage {
role: "system".into(),
content: system_text,
images: Vec::new(),
});
}
// Conversation entries become individual messages
for node in &self.conversation()[conv_range] {
match node {
AstNode::Branch { role, children, .. } => {
let mut content = String::new();
let mut images = Vec::new();
for child in children {
match child {
AstNode::Leaf(leaf) => match leaf.body() {
NodeBody::Image { bytes, mime, .. } => {
images.push(WireImage {
bytes: bytes.clone(),
mime: mime.clone(),
});
}
NodeBody::Log(_) => {}
other => {
other.render_into(&mut content);
}
},
AstNode::Branch { .. } => {
message_text_into(child, &mut content);
}
}
}
if !content.is_empty() || !images.is_empty() {
messages.push(ChatMessage {
role: role.as_str().to_string(),
content,
images,
});
}
}
AstNode::Leaf(leaf) => match leaf.body() {
NodeBody::Memory { text, .. } => {
messages.push(ChatMessage {
role: "system".into(),
content: format!("[memory]\n{}", text),
images: Vec::new(),
});
}
NodeBody::Dmn(text) => {
messages.push(ChatMessage {
role: "system".into(),
content: format!("[dmn]\n{}", text),
images: Vec::new(),
});
}
NodeBody::ToolResult(text) => {
messages.push(ChatMessage {
role: "user".into(),
content: format!("<tool_response>\n{}\n</tool_response>", text),
images: Vec::new(),
});
}
NodeBody::Log(_) => {}
other => {
let mut content = String::new();
other.render_into(&mut content);
if !content.is_empty() {
messages.push(ChatMessage {
role: "system".into(),
content,
images: Vec::new(),
});
}
}
},
}
}
messages
}
}
/// A message for the chat completions API.
#[derive(Debug, Clone, serde::Serialize)]
pub struct ChatMessage {
pub role: String,
pub content: String,
#[serde(skip)]
pub images: Vec<WireImage>,
}
/// Render an AST node to text for chat message content.
fn message_text_into(node: &AstNode, out: &mut String) {
match node {
AstNode::Leaf(leaf) => leaf.body().render_into(out),
AstNode::Branch { role, children, .. } => {
out.push_str(&format!("[{}]\n", role.as_str()));
for child in children {
message_text_into(child, out);
}
out.push('\n');
}
}
}
impl ContextState {

View file

@ -148,6 +148,8 @@ pub struct Agent {
/// token handler, read by UI screens (amygdala). Manifest is
/// `None` when the server has readout disabled.
pub readout: readout::SharedReadoutBuffer,
/// Use chat completions API instead of raw token completions.
pub chat_api: bool,
}
/// Mutable agent state — behind its own mutex.
@ -193,6 +195,7 @@ impl Agent {
conversation_log: Option<ConversationLog>,
active_tools: tools::ActiveTools,
agent_tools: Vec<tools::Tool>,
chat_api: bool,
) -> Arc<Self> {
let mut context = ContextState::new();
context.conversation_log = conversation_log;
@ -224,6 +227,7 @@ impl Agent {
session_id,
context: crate::Mutex::new(context),
readout,
chat_api,
state: crate::Mutex::new(AgentState {
tools: agent_tools,
mcp_tools: McpToolAccess::All,
@ -292,6 +296,7 @@ impl Agent {
// shouldn't bleed into the main emotional readout even
// though they hit the same vLLM server.
readout: readout::new_shared(),
chat_api: self.chat_api,
state: crate::Mutex::new(AgentState {
tools,
mcp_tools: McpToolAccess::None,
@ -347,6 +352,15 @@ impl Agent {
(tokens, images)
}
/// Assemble messages for chat completions API.
pub async fn assemble_chat_messages(&self) -> Vec<context::ChatMessage> {
let mut ctx = self.context.lock().await;
if ctx.total_tokens() > context::context_budget_tokens() {
ctx.trim_conversation();
}
ctx.wire_messages(0..ctx.conversation().len())
}
/// Rebuild the tools section of the system prompt from the current tools list.
pub async fn rebuild_tools(&self) {
let st = self.state.lock().await;
@ -404,18 +418,23 @@ impl Agent {
let _thinking = start_activity(&agent, "thinking...").await;
let (rx, _stream_guard) = {
let (prompt_tokens, images) = agent.assemble_prompt().await;
let st = agent.state.lock().await;
agent.client.stream_completion_mm(
&prompt_tokens,
&images,
api::SamplingParams {
temperature: st.temperature,
top_p: st.top_p,
top_k: st.top_k,
},
st.priority,
)
let sampling = api::SamplingParams {
temperature: st.temperature,
top_p: st.top_p,
top_k: st.top_k,
};
let priority = st.priority;
drop(st);
if agent.chat_api {
let messages = agent.assemble_chat_messages().await;
agent.client.stream_chat_completion(&messages, sampling)
} else {
let (prompt_tokens, images) = agent.assemble_prompt().await;
agent.client.stream_completion_mm(
&prompt_tokens, &images, sampling, priority,
)
}
};
let branch_idx = {
@ -427,7 +446,7 @@ impl Agent {
idx
};
let parser = ResponseParser::new(branch_idx);
let parser = ResponseParser::new(branch_idx, false);
let (mut tool_rx, parser_handle) = parser.run(rx, agent.clone());
let mut pending_calls: Vec<PendingToolCall> = Vec::new();

View file

@ -164,6 +164,7 @@ pub struct AutoAgent {
pub enabled: bool,
pub temperature: f32,
pub priority: i32,
pub model: Option<String>,
}
@ -231,6 +232,7 @@ impl AutoAgent {
steps: Vec<AutoStep>,
temperature: f32,
priority: i32,
model: Option<String>,
) -> Self {
assert!(!name.is_empty(), "AutoAgent::new called with empty name");
Self {
@ -240,6 +242,7 @@ impl AutoAgent {
enabled: true,
temperature,
priority,
model,
}
}
@ -251,7 +254,8 @@ impl AutoAgent {
let cli = crate::user::CliArgs::default();
let (app, _) = crate::config::load_app(&cli)
.map_err(|e| format!("config: {}", e))?;
let resolved = app.resolve_model(&app.default_backend)
let backend_name = self.model.as_deref().unwrap_or(&app.default_backend);
let resolved = app.resolve_model(backend_name)
.map_err(|e| format!("API not configured: {}", e))?;
let client = super::api::ApiClient::new(
&resolved.api_base, &resolved.api_key, &resolved.model_id);
@ -264,6 +268,7 @@ impl AutoAgent {
None,
super::tools::ActiveTools::new(),
super::tools::tools(),
resolved.chat_api,
).await;
{
let mut st = agent.state.lock().await;
@ -557,6 +562,7 @@ pub async fn call_api_with_tools(
steps,
temperature.unwrap_or(0.6),
priority,
None,
);
auto.run(bail_fn).await
}

View file

@ -288,6 +288,11 @@ pub struct BackendConfig {
/// Context window size in tokens.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub context_window: Option<usize>,
/// Use chat completions API (/v1/chat/completions) instead of
/// raw completions (/v1/completions). Required for cloud API
/// providers (OpenRouter, Anthropic, etc).
#[serde(default)]
pub chat_api: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -370,6 +375,8 @@ pub struct SessionConfig {
pub app: AppConfig,
/// Disable background agents (surface, observe, scoring)
pub no_agents: bool,
/// Use chat completions API instead of raw completions.
pub chat_api: bool,
}
/// A fully resolved model ready to construct an ApiClient.
@ -380,6 +387,7 @@ pub struct ResolvedModel {
pub api_key: String,
pub model_id: String,
pub context_window: Option<usize>,
pub chat_api: bool,
}
impl AppConfig {
@ -415,6 +423,7 @@ impl AppConfig {
session_dir,
app: self.clone(),
no_agents: cli.no_agents,
chat_api: resolved.chat_api,
})
}
@ -439,6 +448,7 @@ impl AppConfig {
api_key: b.api_key.clone(),
model_id: b.model_id.clone(),
context_window: b.context_window,
chat_api: b.chat_api,
})
}

View file

@ -392,6 +392,7 @@ impl Mind {
conversation_log,
crate::agent::tools::ActiveTools::new(),
crate::agent::tools::tools(),
config.chat_api,
).await;
// Migrate legacy "file exists = enabled" sentinel for the
@ -552,7 +553,9 @@ impl Mind {
// Kick off an incremental scoring pass on startup so memories due
// for re-scoring get evaluated without requiring a user message.
self.memory_scoring.trigger();
if !self.config.chat_api {
self.memory_scoring.trigger();
}
}
pub fn turn_watch(&self) -> tokio::sync::watch::Receiver<bool> {
@ -572,10 +575,14 @@ impl Mind {
}
}
MindCommand::Score => {
self.memory_scoring.trigger();
if !self.config.chat_api {
self.memory_scoring.trigger();
}
}
MindCommand::ScoreFull => {
self.memory_scoring.trigger_full();
if !self.config.chat_api {
self.memory_scoring.trigger_full();
}
}
MindCommand::Interrupt => {
self.shared.lock().unwrap().interrupt();
@ -606,10 +613,14 @@ impl Mind {
self.agent.compact().await;
}
MindCommand::ScoreFinetune => {
self.finetune_scoring.trigger();
if !self.config.chat_api {
self.finetune_scoring.trigger();
}
}
MindCommand::Compare => {
self.compare_scoring.trigger();
if !self.config.chat_api {
self.compare_scoring.trigger();
}
}
MindCommand::SetLearnThreshold(value) => {
if let Err(e) = crate::config_writer::set_learn_threshold(value) {
@ -691,7 +702,7 @@ impl Mind {
let mut sub_handle: Option<tokio::task::JoinHandle<()>> = None;
// Start finetune scoring at startup (scores existing conversation)
if !self.config.no_agents {
if !self.config.no_agents && !self.config.chat_api {
self.finetune_scoring.trigger();
}
@ -729,7 +740,7 @@ impl Mind {
}
cmds.push(MindCommand::Compact);
if !self.config.no_agents {
if !self.config.no_agents && !self.config.chat_api {
cmds.push(MindCommand::Score);
cmds.push(MindCommand::ScoreFinetune);
}

View file

@ -357,6 +357,7 @@ impl SubconsciousAgent {
let auto = AutoAgent::new(
name.to_string(), tools, steps,
def.temperature.unwrap_or(0.6), def.priority,
def.model.clone(),
);
Some(Self {

View file

@ -101,6 +101,7 @@ impl Unconscious {
let auto = AutoAgent::new(
def.agent.clone(), effective_tools, steps,
def.temperature.unwrap_or(0.6), def.priority,
def.model.clone(),
);
agents.push(UnconsciousAgent {
name: def.agent.clone(),
@ -285,7 +286,8 @@ pub async fn prepare_spawn(name: &str, mut auto: AutoAgent, wake: std::sync::Arc
return Err(auto);
}
};
let resolved = match app.resolve_model(&app.default_backend) {
let backend_name = auto.model.as_deref().unwrap_or(&app.default_backend);
let resolved = match app.resolve_model(backend_name) {
Ok(r) => r,
Err(e) => {
dbglog!("[unconscious] API not configured: {}", e);
@ -302,6 +304,7 @@ pub async fn prepare_spawn(name: &str, mut auto: AutoAgent, wake: std::sync::Arc
app, None,
crate::agent::tools::ActiveTools::new(),
auto.tools.clone(),
resolved.chat_api,
).await;
{
let mut st = agent.state.lock().await;

View file

@ -47,6 +47,8 @@ pub struct AgentDef {
/// Bail check command — run between steps with pid file path as $1,
/// cwd = state dir. Non-zero exit = stop the pipeline.
pub bail: Option<String>,
/// Optional backend override (falls back to app.default_backend).
pub model: Option<String>,
}
/// The JSON header portion (first line of the file).
@ -78,6 +80,9 @@ struct AgentHeader {
/// cwd = state dir. Non-zero exit = stop the pipeline.
#[serde(default)]
bail: Option<String>,
/// Backend override — use this instead of default_backend.
#[serde(default)]
model: Option<String>,
}
fn default_priority() -> i32 { 10 }
@ -149,6 +154,7 @@ fn parse_agent_file(content: &str) -> Option<AgentDef> {
temperature: header.temperature,
priority: header.priority,
bail: header.bail,
model: header.model,
})
}

View file

@ -37,6 +37,7 @@ where F: FnMut(&AstNode) -> bool,
while let Some(tok) = rx.recv().await {
match tok {
StreamToken::Token { id, .. } => tokens.push(id),
StreamToken::TextDelta(text) => tokens.extend(tokenizer::encode(&text)),
StreamToken::Done { .. } => break,
StreamToken::Error(e) => anyhow::bail!("generation error: {}", e),
}