consciousness/src/subconscious/api.rs
ProofOfConcept 3e410347a2 api: retry transient connection errors, misc fixes
- Retry up to 5 times with exponential backoff (2s, 4s, 8s, 16s)
  on transient errors: IncompleteMessage, connection closed/reset/
  refused, timeouts. Non-transient errors fail immediately.
- tail command: print to stdout instead of stderr
- state_dir rename: output_dir → state_dir throughout knowledge.rs

Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
2026-03-26 17:48:44 -04:00

247 lines
9.9 KiB
Rust

// agents/api.rs — Direct API backend for agent execution
//
// Uses poc-agent's OpenAI-compatible API client to call models directly
// (vllm, llama.cpp, OpenRouter, etc.) instead of shelling out to claude CLI.
// Implements the tool loop: send prompt → if tool_calls, execute them →
// send results back → repeat until text response.
//
// Activated when config has api_base_url set.
use crate::agent::api::ApiClient;
use crate::agent::types::*;
use crate::agent::tools::{self, ProcessTracker};
use crate::agent::ui_channel::StreamTarget;
use std::sync::OnceLock;
static API_CLIENT: OnceLock<ApiClient> = OnceLock::new();
fn get_client() -> Result<&'static ApiClient, String> {
Ok(API_CLIENT.get_or_init(|| {
let config = crate::config::get();
let base_url = config.api_base_url.as_deref().unwrap_or("");
let api_key = config.api_key.as_deref().unwrap_or("");
let model = config.api_model.as_deref().unwrap_or("qwen-2.5-27b");
ApiClient::new(base_url, api_key, model)
}))
}
/// Run agent prompts through the direct API with tool support.
/// For multi-step agents, each prompt is injected as a new user message
/// after the previous step's tool loop completes. The conversation
/// context carries forward naturally between steps.
/// Returns the final text response after all steps complete.
pub async fn call_api_with_tools(
agent: &str,
prompts: &[String],
temperature: Option<f32>,
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
log: &dyn Fn(&str),
) -> Result<String, String> {
let client = get_client()?;
// Set up a UI channel — we drain reasoning tokens into the log
let (ui_tx, mut ui_rx) = crate::agent::ui_channel::channel();
// Build tool definitions — memory and journal tools for graph operations
let all_defs = tools::definitions();
let tool_defs: Vec<ToolDef> = all_defs.into_iter()
.filter(|d| d.function.name.starts_with("memory_")
|| d.function.name.starts_with("journal_")
|| d.function.name == "output")
.collect();
let tracker = ProcessTracker::new();
// Start with the first prompt as a user message
let mut messages = vec![Message::user(&prompts[0])];
let mut next_prompt_idx = 1; // index of next prompt to inject
let reasoning = crate::config::get().api_reasoning.clone();
let max_turns = 50;
for turn in 0..max_turns {
log(&format!("\n=== TURN {} ({} messages) ===\n", turn, messages.len()));
let mut last_err = None;
let mut msg_opt = None;
let mut usage_opt = None;
for attempt in 0..5 {
match client.chat_completion_stream_temp(
&messages,
Some(&tool_defs),
&ui_tx,
StreamTarget::Autonomous,
&reasoning,
temperature,
).await {
Ok((msg, usage)) => {
msg_opt = Some(msg);
usage_opt = usage;
break;
}
Err(e) => {
let err_str = e.to_string();
let is_transient = err_str.contains("IncompleteMessage")
|| err_str.contains("connection closed")
|| err_str.contains("connection reset")
|| err_str.contains("timed out")
|| err_str.contains("Connection refused");
if is_transient && attempt < 4 {
log(&format!("transient error (attempt {}): {}, retrying...",
attempt + 1, err_str));
tokio::time::sleep(std::time::Duration::from_secs(2 << attempt)).await;
last_err = Some(e);
continue;
}
let msg_bytes: usize = messages.iter()
.map(|m| m.content_text().len())
.sum();
return Err(format!(
"API error on turn {} (~{}KB payload, {} messages, {} attempts): {}",
turn, msg_bytes / 1024, messages.len(), attempt + 1, e));
}
}
}
let msg = msg_opt.unwrap();
if let Some(ref e) = last_err {
log(&format!("succeeded after retry (previous error: {})", e));
}
if let Some(u) = &usage_opt {
log(&format!("tokens: {} prompt + {} completion",
u.prompt_tokens, u.completion_tokens));
}
// Drain reasoning tokens from the UI channel into the log
{
let mut reasoning_buf = String::new();
while let Ok(ui_msg) = ui_rx.try_recv() {
if let crate::agent::ui_channel::UiMessage::Reasoning(r) = ui_msg {
reasoning_buf.push_str(&r);
}
}
if !reasoning_buf.is_empty() {
log(&format!("<think>\n{}\n</think>", reasoning_buf.trim()));
}
}
let has_content = msg.content.is_some();
let has_tools = msg.tool_calls.as_ref().is_some_and(|tc| !tc.is_empty());
if has_tools {
// Push the assistant message with tool calls.
// Sanitize arguments: vllm re-parses them as JSON when
// preprocessing the conversation, so invalid JSON from the
// model crashes the next request.
let mut sanitized = msg.clone();
if let Some(ref mut calls) = sanitized.tool_calls {
for call in calls {
if serde_json::from_str::<serde_json::Value>(&call.function.arguments).is_err() {
log(&format!("sanitizing malformed args for {}: {}",
call.function.name, &call.function.arguments));
call.function.arguments = "{}".to_string();
}
}
}
messages.push(sanitized);
// Execute each tool call
for call in msg.tool_calls.as_ref().unwrap() {
log(&format!("\nTOOL CALL: {}({})",
call.function.name,
&call.function.arguments));
let args: serde_json::Value = match serde_json::from_str(&call.function.arguments) {
Ok(v) => v,
Err(_) => {
log(&format!("malformed tool call args: {}", &call.function.arguments));
messages.push(Message::tool_result(
&call.id,
"Error: your tool call had malformed JSON arguments. Please retry with valid JSON.",
));
continue;
}
};
let output = if call.function.name.starts_with("memory_")
|| call.function.name.starts_with("journal_")
|| call.function.name == "output" {
let prov = format!("agent:{}", agent);
match crate::agent::tools::memory::dispatch(
&call.function.name, &args, Some(&prov),
) {
Ok(text) => crate::agent::tools::ToolOutput {
text, is_yield: false, images: Vec::new(),
model_switch: None, dmn_pause: false,
},
Err(e) => crate::agent::tools::ToolOutput {
text: format!("Error: {}", e),
is_yield: false, images: Vec::new(),
model_switch: None, dmn_pause: false,
},
}
} else {
tools::dispatch(&call.function.name, &args, &tracker).await
};
log(&format!("TOOL RESULT ({} chars):\n{}", output.text.len(), output.text));
messages.push(Message::tool_result(&call.id, &output.text));
}
continue;
}
// Text-only response — step complete
let text = msg.content_text().to_string();
if text.is_empty() && !has_content {
log("empty response, retrying");
messages.push(Message::user(
"[system] Your previous response was empty. Please respond with text or use a tool."
));
continue;
}
log(&format!("\n=== RESPONSE ===\n\n{}", text));
// If there are more prompts, check bail condition and inject the next one
if next_prompt_idx < prompts.len() {
// Run bail check before continuing to next step
if let Some(ref check) = bail_fn {
check(next_prompt_idx)?;
}
messages.push(Message::assistant(&text));
let next = &prompts[next_prompt_idx];
next_prompt_idx += 1;
log(&format!("\n=== STEP {}/{} ===\n", next_prompt_idx, prompts.len()));
messages.push(Message::user(next));
continue;
}
return Ok(text);
}
Err(format!("agent exceeded {} tool turns", max_turns))
}
/// Synchronous wrapper — runs the async function on a dedicated thread
/// with its own tokio runtime. Safe to call from any context.
pub fn call_api_with_tools_sync(
agent: &str,
prompts: &[String],
temperature: Option<f32>,
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
log: &(dyn Fn(&str) + Sync),
) -> Result<String, String> {
std::thread::scope(|s| {
s.spawn(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| format!("tokio runtime: {}", e))?;
let prov = format!("agent:{}", agent);
rt.block_on(
crate::store::TASK_PROVENANCE.scope(prov,
call_api_with_tools(agent, prompts, temperature, bail_fn, log))
)
}).join().unwrap()
})
}