api: retry transient connection errors, misc fixes
- Retry up to 5 times with exponential backoff (2s, 4s, 8s, 16s) on transient errors: IncompleteMessage, connection closed/reset/ refused, timeouts. Non-transient errors fail immediately. - tail command: print to stdout instead of stderr - state_dir rename: output_dir → state_dir throughout knowledge.rs Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
parent
5d803441c9
commit
3e410347a2
3 changed files with 53 additions and 31 deletions
|
|
@ -33,18 +33,18 @@ pub fn cmd_tail(n: usize, full: bool) -> Result<(), String> {
|
||||||
};
|
};
|
||||||
let del = if node.deleted { " [DELETED]" } else { "" };
|
let del = if node.deleted { " [DELETED]" } else { "" };
|
||||||
if full {
|
if full {
|
||||||
eprintln!("--- {} (v{}) {} via {} w={:.3}{} ---",
|
println!("--- {} (v{}) {} via {} w={:.3}{} ---",
|
||||||
node.key, node.version, ts, node.provenance, node.weight, del);
|
node.key, node.version, ts, node.provenance, node.weight, del);
|
||||||
eprintln!("{}\n", node.content);
|
println!("{}\n", node.content);
|
||||||
} else {
|
} else {
|
||||||
let preview = crate::util::first_n_chars(&node.content, 100).replace('\n', "\\n");
|
let preview = crate::util::first_n_chars(&node.content, 100).replace('\n', "\\n");
|
||||||
eprintln!(" {} v{} w={:.2}{}",
|
println!(" {} v{} w={:.2}{}",
|
||||||
ts, node.version, node.weight, del);
|
ts, node.version, node.weight, del);
|
||||||
eprintln!(" {} via {}", node.key, node.provenance);
|
println!(" {} via {}", node.key, node.provenance);
|
||||||
if !preview.is_empty() {
|
if !preview.is_empty() {
|
||||||
eprintln!(" {}", preview);
|
println!(" {}", preview);
|
||||||
}
|
}
|
||||||
eprintln!();
|
println!();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -61,29 +61,52 @@ pub async fn call_api_with_tools(
|
||||||
for turn in 0..max_turns {
|
for turn in 0..max_turns {
|
||||||
log(&format!("\n=== TURN {} ({} messages) ===\n", turn, messages.len()));
|
log(&format!("\n=== TURN {} ({} messages) ===\n", turn, messages.len()));
|
||||||
|
|
||||||
let (msg, usage) = client.chat_completion_stream_temp(
|
let mut last_err = None;
|
||||||
|
let mut msg_opt = None;
|
||||||
|
let mut usage_opt = None;
|
||||||
|
for attempt in 0..5 {
|
||||||
|
match client.chat_completion_stream_temp(
|
||||||
&messages,
|
&messages,
|
||||||
Some(&tool_defs),
|
Some(&tool_defs),
|
||||||
&ui_tx,
|
&ui_tx,
|
||||||
StreamTarget::Autonomous,
|
StreamTarget::Autonomous,
|
||||||
&reasoning,
|
&reasoning,
|
||||||
temperature,
|
temperature,
|
||||||
).await.map_err(|e| {
|
).await {
|
||||||
|
Ok((msg, usage)) => {
|
||||||
|
msg_opt = Some(msg);
|
||||||
|
usage_opt = usage;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
let err_str = e.to_string();
|
||||||
|
let is_transient = err_str.contains("IncompleteMessage")
|
||||||
|
|| err_str.contains("connection closed")
|
||||||
|
|| err_str.contains("connection reset")
|
||||||
|
|| err_str.contains("timed out")
|
||||||
|
|| err_str.contains("Connection refused");
|
||||||
|
if is_transient && attempt < 4 {
|
||||||
|
log(&format!("transient error (attempt {}): {}, retrying...",
|
||||||
|
attempt + 1, err_str));
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(2 << attempt)).await;
|
||||||
|
last_err = Some(e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
let msg_bytes: usize = messages.iter()
|
let msg_bytes: usize = messages.iter()
|
||||||
.map(|m| m.content_text().len())
|
.map(|m| m.content_text().len())
|
||||||
.sum();
|
.sum();
|
||||||
let err_str = e.to_string();
|
return Err(format!(
|
||||||
let hint = if err_str.contains("IncompleteMessage") || err_str.contains("connection closed") {
|
"API error on turn {} (~{}KB payload, {} messages, {} attempts): {}",
|
||||||
format!(" — likely exceeded model context window (~{}KB ≈ {}K tokens)",
|
turn, msg_bytes / 1024, messages.len(), attempt + 1, e));
|
||||||
msg_bytes / 1024, msg_bytes / 4096)
|
}
|
||||||
} else {
|
}
|
||||||
String::new()
|
}
|
||||||
};
|
let msg = msg_opt.unwrap();
|
||||||
format!("API error on turn {} (~{}KB payload, {} messages): {}{}",
|
if let Some(ref e) = last_err {
|
||||||
turn, msg_bytes / 1024, messages.len(), e, hint)
|
log(&format!("succeeded after retry (previous error: {})", e));
|
||||||
})?;
|
}
|
||||||
|
|
||||||
if let Some(u) = &usage {
|
if let Some(u) = &usage_opt {
|
||||||
log(&format!("tokens: {} prompt + {} completion",
|
log(&format!("tokens: {} prompt + {} completion",
|
||||||
u.prompt_tokens, u.completion_tokens));
|
u.prompt_tokens, u.completion_tokens));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -264,9 +264,8 @@ pub fn spawn_agent(
|
||||||
|
|
||||||
let log_dir = store::memory_dir().join("logs");
|
let log_dir = store::memory_dir().join("logs");
|
||||||
fs::create_dir_all(&log_dir).ok();
|
fs::create_dir_all(&log_dir).ok();
|
||||||
let agent_log = fs::OpenOptions::new()
|
let agent_log = fs::File::create(
|
||||||
.create(true).append(true)
|
log_dir.join(format!("{}-{}.log", agent_name, store::compact_timestamp())))
|
||||||
.open(log_dir.join(format!("{}.log", agent_name)))
|
|
||||||
.unwrap_or_else(|_| fs::File::create("/dev/null").unwrap());
|
.unwrap_or_else(|_| fs::File::create("/dev/null").unwrap());
|
||||||
|
|
||||||
let child = std::process::Command::new("poc-memory")
|
let child = std::process::Command::new("poc-memory")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue