llm: add 5-minute timeout to claude subprocess
The daemon was getting stuck when a claude subprocess hung — no completion logged, job blocked forever, pending queue growing. Use spawn() + watchdog thread instead of blocking output(). The watchdog sleeps in 1s increments checking a cancel flag, sends SIGTERM at 5 minutes, SIGKILL after 5s grace. Cancel flag ensures the watchdog exits promptly when the child finishes normally.
This commit is contained in:
parent
b62fffc326
commit
93f98a0a5d
1 changed files with 48 additions and 3 deletions
|
|
@ -38,10 +38,14 @@ fn log_usage(agent: &str, model: &str, prompt: &str, response: &str,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Maximum time to wait for a claude subprocess before killing it.
|
||||||
|
const SUBPROCESS_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(300); // 5 minutes
|
||||||
|
|
||||||
/// Call a model via claude CLI. Returns the response text.
|
/// Call a model via claude CLI. Returns the response text.
|
||||||
///
|
///
|
||||||
/// Sets PR_SET_PDEATHSIG on the child so it gets SIGTERM if the
|
/// Sets PR_SET_PDEATHSIG on the child so it gets SIGTERM if the
|
||||||
/// parent daemon exits — no more orphaned claude processes.
|
/// parent daemon exits — no more orphaned claude processes.
|
||||||
|
/// Times out after 5 minutes to prevent blocking the daemon forever.
|
||||||
fn call_model(agent: &str, model: &str, prompt: &str) -> Result<String, String> {
|
fn call_model(agent: &str, model: &str, prompt: &str) -> Result<String, String> {
|
||||||
// Write prompt to temp file (claude CLI needs file input for large prompts)
|
// Write prompt to temp file (claude CLI needs file input for large prompts)
|
||||||
let tmp = std::env::temp_dir().join(format!("poc-llm-{}-{:?}.txt",
|
let tmp = std::env::temp_dir().join(format!("poc-llm-{}-{:?}.txt",
|
||||||
|
|
@ -53,6 +57,8 @@ fn call_model(agent: &str, model: &str, prompt: &str) -> Result<String, String>
|
||||||
cmd.args(["-p", "--model", model, "--tools", "", "--no-session-persistence",
|
cmd.args(["-p", "--model", model, "--tools", "", "--no-session-persistence",
|
||||||
"--strict-mcp-config"])
|
"--strict-mcp-config"])
|
||||||
.stdin(fs::File::open(&tmp).map_err(|e| format!("open temp: {}", e))?)
|
.stdin(fs::File::open(&tmp).map_err(|e| format!("open temp: {}", e))?)
|
||||||
|
.stdout(std::process::Stdio::piped())
|
||||||
|
.stderr(std::process::Stdio::piped())
|
||||||
.env_remove("CLAUDECODE");
|
.env_remove("CLAUDECODE");
|
||||||
|
|
||||||
// Use separate OAuth credentials for agent work if configured
|
// Use separate OAuth credentials for agent work if configured
|
||||||
|
|
@ -65,19 +71,58 @@ fn call_model(agent: &str, model: &str, prompt: &str) -> Result<String, String>
|
||||||
|
|
||||||
let start = std::time::Instant::now();
|
let start = std::time::Instant::now();
|
||||||
|
|
||||||
let result = unsafe {
|
let mut child = unsafe {
|
||||||
cmd.pre_exec(|| {
|
cmd.pre_exec(|| {
|
||||||
libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGTERM);
|
libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGTERM);
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
.output()
|
.spawn()
|
||||||
|
.map_err(|e| format!("spawn claude: {}", e))?
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Spawn a watchdog thread that kills the child after the timeout.
|
||||||
|
// Uses a cancellation flag so the thread exits promptly when the child finishes.
|
||||||
|
let child_id = child.id();
|
||||||
|
let cancel = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
|
||||||
|
let cancel_flag = cancel.clone();
|
||||||
|
let watchdog = std::thread::spawn(move || {
|
||||||
|
// Sleep in 1s increments so we can check the cancel flag
|
||||||
|
let deadline = std::time::Instant::now() + SUBPROCESS_TIMEOUT;
|
||||||
|
while std::time::Instant::now() < deadline {
|
||||||
|
if cancel_flag.load(std::sync::atomic::Ordering::Relaxed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
std::thread::sleep(std::time::Duration::from_secs(1));
|
||||||
|
}
|
||||||
|
if cancel_flag.load(std::sync::atomic::Ordering::Relaxed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Send SIGTERM, then SIGKILL after 5s grace period
|
||||||
|
unsafe { libc::kill(child_id as i32, libc::SIGTERM); }
|
||||||
|
for _ in 0..5 {
|
||||||
|
std::thread::sleep(std::time::Duration::from_secs(1));
|
||||||
|
if cancel_flag.load(std::sync::atomic::Ordering::Relaxed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
unsafe { libc::kill(child_id as i32, libc::SIGKILL); }
|
||||||
|
});
|
||||||
|
|
||||||
|
let result = child.wait_with_output();
|
||||||
|
|
||||||
|
// Cancel the watchdog thread
|
||||||
|
cancel.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
watchdog.join().ok();
|
||||||
|
|
||||||
fs::remove_file(&tmp).ok();
|
fs::remove_file(&tmp).ok();
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
Ok(output) => {
|
Ok(output) => {
|
||||||
let elapsed = start.elapsed().as_millis();
|
let elapsed = start.elapsed().as_millis();
|
||||||
|
if elapsed > SUBPROCESS_TIMEOUT.as_millis() - 1000 {
|
||||||
|
log_usage(agent, model, prompt, "TIMEOUT", elapsed, false);
|
||||||
|
return Err(format!("claude timed out after {:.0}s", elapsed as f64 / 1000.0));
|
||||||
|
}
|
||||||
if output.status.success() {
|
if output.status.success() {
|
||||||
let response = String::from_utf8_lossy(&output.stdout).trim().to_string();
|
let response = String::from_utf8_lossy(&output.stdout).trim().to_string();
|
||||||
log_usage(agent, model, prompt, &response, elapsed, true);
|
log_usage(agent, model, prompt, &response, elapsed, true);
|
||||||
|
|
@ -89,7 +134,7 @@ fn call_model(agent: &str, model: &str, prompt: &str) -> Result<String, String>
|
||||||
Err(format!("claude exited {}: {}", output.status, preview.trim()))
|
Err(format!("claude exited {}: {}", output.status, preview.trim()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => Err(format!("spawn claude: {}", e)),
|
Err(e) => Err(format!("wait claude: {}", e)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue