// agents/api.rs — Direct API backend for agent execution // // Uses poc-agent's OpenAI-compatible API client to call models directly // (vllm, llama.cpp, OpenRouter, etc.) instead of shelling out to claude CLI. // Implements the tool loop: send prompt → if tool_calls, execute them → // send results back → repeat until text response. // // Activated when config has api_base_url set. use crate::agent::api::ApiClient; use crate::agent::types::*; use crate::thought::{self, ProcessTracker}; use crate::agent::ui_channel::StreamTarget; use std::sync::OnceLock; static API_CLIENT: OnceLock = OnceLock::new(); fn get_client() -> Result<&'static ApiClient, String> { Ok(API_CLIENT.get_or_init(|| { let config = crate::config::get(); let base_url = config.api_base_url.as_deref().unwrap_or(""); let api_key = config.api_key.as_deref().unwrap_or(""); let model = config.api_model.as_deref().unwrap_or("qwen-2.5-27b"); ApiClient::new(base_url, api_key, model) })) } /// Run agent prompts through the direct API with tool support. /// For multi-step agents, each prompt is injected as a new user message /// after the previous step's tool loop completes. The conversation /// context carries forward naturally between steps. /// Returns the final text response after all steps complete. pub async fn call_api_with_tools( _agent: &str, prompts: &[String], temperature: Option, bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, log: &dyn Fn(&str), ) -> Result { let client = get_client()?; // Set up a UI channel — we drain reasoning tokens into the log let (ui_tx, mut ui_rx) = crate::agent::ui_channel::channel(); // Build tool definitions — all shared tools (memory, files, bash, journal) let tool_defs = thought::all_definitions(); let tracker = ProcessTracker::new(); // Start with the first prompt as a user message let mut messages = vec![Message::user(&prompts[0])]; let mut next_prompt_idx = 1; // index of next prompt to inject let reasoning = crate::config::get().api_reasoning.clone(); let max_turns = 50 * prompts.len(); for turn in 0..max_turns { log(&format!("\n=== TURN {} ({} messages) ===\n", turn, messages.len())); let mut last_err = None; let mut msg_opt = None; let mut usage_opt = None; for attempt in 0..5 { match client.chat_completion_stream_temp( &messages, Some(&tool_defs), &ui_tx, StreamTarget::Autonomous, &reasoning, temperature, ).await { Ok((msg, usage)) => { msg_opt = Some(msg); usage_opt = usage; break; } Err(e) => { let err_str = e.to_string(); let is_transient = err_str.contains("IncompleteMessage") || err_str.contains("connection closed") || err_str.contains("connection reset") || err_str.contains("timed out") || err_str.contains("Connection refused"); if is_transient && attempt < 4 { log(&format!("transient error (attempt {}): {}, retrying...", attempt + 1, err_str)); tokio::time::sleep(std::time::Duration::from_secs(2 << attempt)).await; last_err = Some(e); continue; } let msg_bytes: usize = messages.iter() .map(|m| m.content_text().len()) .sum(); return Err(format!( "API error on turn {} (~{}KB payload, {} messages, {} attempts): {}", turn, msg_bytes / 1024, messages.len(), attempt + 1, e)); } } } let msg = msg_opt.unwrap(); if let Some(ref e) = last_err { log(&format!("succeeded after retry (previous error: {})", e)); } if let Some(u) = &usage_opt { log(&format!("tokens: {} prompt + {} completion", u.prompt_tokens, u.completion_tokens)); } // Drain reasoning tokens from the UI channel into the log { let mut reasoning_buf = String::new(); while let Ok(ui_msg) = ui_rx.try_recv() { if let crate::agent::ui_channel::UiMessage::Reasoning(r) = ui_msg { reasoning_buf.push_str(&r); } } if !reasoning_buf.is_empty() { log(&format!("\n{}\n", reasoning_buf.trim())); } } let has_content = msg.content.is_some(); let has_tools = msg.tool_calls.as_ref().is_some_and(|tc| !tc.is_empty()); if has_tools { // Push the assistant message with tool calls. // Sanitize arguments: vllm re-parses them as JSON when // preprocessing the conversation, so invalid JSON from the // model crashes the next request. let mut sanitized = msg.clone(); if let Some(ref mut calls) = sanitized.tool_calls { for call in calls { if serde_json::from_str::(&call.function.arguments).is_err() { log(&format!("sanitizing malformed args for {}: {}", call.function.name, &call.function.arguments)); call.function.arguments = "{}".to_string(); } } } messages.push(sanitized); // Execute each tool call for call in msg.tool_calls.as_ref().unwrap() { log(&format!("\nTOOL CALL: {}({})", call.function.name, &call.function.arguments)); let args: serde_json::Value = match serde_json::from_str(&call.function.arguments) { Ok(v) => v, Err(_) => { log(&format!("malformed tool call args: {}", &call.function.arguments)); messages.push(Message::tool_result( &call.id, "Error: your tool call had malformed JSON arguments. Please retry with valid JSON.", )); continue; } }; let output = match thought::dispatch(&call.function.name, &args, &tracker).await { Some(out) => out, None => thought::ToolOutput::error(format!("Unknown tool: {}", call.function.name)), }; log(&format!("TOOL RESULT ({} chars):\n{}", output.text.len(), output.text)); messages.push(Message::tool_result(&call.id, &output.text)); } continue; } // Text-only response — step complete let text = msg.content_text().to_string(); if text.is_empty() && !has_content { log("empty response, retrying"); messages.push(Message::user( "[system] Your previous response was empty. Please respond with text or use a tool." )); continue; } log(&format!("\n=== RESPONSE ===\n\n{}", text)); // If there are more prompts, check bail condition and inject the next one if next_prompt_idx < prompts.len() { // Run bail check before continuing to next step if let Some(ref check) = bail_fn { check(next_prompt_idx)?; } messages.push(Message::assistant(&text)); let next = &prompts[next_prompt_idx]; next_prompt_idx += 1; log(&format!("\n=== STEP {}/{} ===\n", next_prompt_idx, prompts.len())); messages.push(Message::user(next)); continue; } return Ok(text); } Err(format!("agent exceeded {} tool turns", max_turns)) } /// Synchronous wrapper — runs the async function on a dedicated thread /// with its own tokio runtime. Safe to call from any context. pub fn call_api_with_tools_sync( agent: &str, prompts: &[String], temperature: Option, bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, log: &(dyn Fn(&str) + Sync), ) -> Result { std::thread::scope(|s| { s.spawn(|| { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .map_err(|e| format!("tokio runtime: {}", e))?; let agent_name = format!("agent:{}", agent); rt.block_on( crate::store::TASK_AGENT.scope(agent_name, call_api_with_tools(agent, prompts, temperature, bail_fn, log)) ) }).join().unwrap() }) }