mcp-server: auto-reconnect on broken pipe

When the daemon restarts, consciousness-mcp's socket connection goes
stale. Previously this caused all subsequent memory/journal tool calls
to fail with "Broken pipe" errors until manually restarted.

Now request() detects broken pipe/connection reset errors and
automatically reconnects to the daemon socket, re-initializes the
MCP session, and retries the request.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-14 18:56:38 -04:00
commit 6ec15776f1

View file

@ -127,7 +127,7 @@ impl DaemonClient {
Ok(Self { reader, writer, next_id: 0 }) Ok(Self { reader, writer, next_id: 0 })
} }
fn request(&mut self, method: &str, params: Option<Value>) -> Result<Value, String> { fn request_raw(&mut self, method: &str, params: Option<Value>) -> Result<Value, String> {
self.next_id += 1; self.next_id += 1;
let req = json!({ let req = json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",
@ -150,6 +150,40 @@ impl DaemonClient {
} }
Ok(resp.result.unwrap_or(Value::Null)) Ok(resp.result.unwrap_or(Value::Null))
} }
/// Re-establish connection to daemon and re-initialize.
fn reconnect(&mut self) -> Result<(), String> {
eprintln!("consciousness-mcp: reconnecting to daemon...");
let path = socket_path();
let stream = UnixStream::connect(&path)
.map_err(|e| format!("reconnect to {:?}: {}", path, e))?;
self.reader = BufReader::new(stream.try_clone().map_err(|e| e.to_string())?);
self.writer = BufWriter::new(stream);
// Re-initialize MCP session
self.request_raw("initialize", Some(json!({
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "consciousness-mcp", "version": "0.4.0"}
})))?;
let _ = self.request_raw("notifications/initialized", None);
eprintln!("consciousness-mcp: reconnected");
Ok(())
}
/// Request with automatic reconnect on broken pipe.
fn request(&mut self, method: &str, params: Option<Value>) -> Result<Value, String> {
match self.request_raw(method, params.clone()) {
Ok(v) => Ok(v),
Err(e) if e.contains("Broken pipe") || e.contains("os error 32")
|| e.contains("Connection reset") => {
// Connection lost — try to reconnect once
self.reconnect()?;
self.request_raw(method, params)
}
Err(e) => Err(e),
}
}
} }
// ── Main loop ─────────────────────────────────────────────────── // ── Main loop ───────────────────────────────────────────────────