Compare commits

..

10 commits

Author SHA1 Message Date
ProofOfConcept
a596e007b2 Mouse selection, copy/paste, yield_to_user fixes
- Mouse text selection with highlight rendering in panes
- OSC 52 clipboard copy on selection, middle-click paste via tmux buffer
- Bracketed paste support (Event::Paste)
- yield_to_user: no tool result appended, ends turn immediately
- yield_to_user: no parameters, just a control signal
- Drop arboard dependency, use crossterm OSC 52 + tmux for clipboard

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-09 18:10:54 -04:00
Kent Overstreet
7dd9daa2b9 Improved response logging 2026-04-09 17:05:24 -04:00
Kent Overstreet
8a2f488d22 yield_to_user ends turn 2026-04-09 16:47:49 -04:00
Kent Overstreet
0af97774f4 Parsing fixes
Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
2026-04-09 16:42:16 -04:00
Kent Overstreet
b55230ce3f fix normalize_xml_tags() 2026-04-09 15:34:37 -04:00
Kent Overstreet
8d14c59d56 Fix: read lsp_servers/mcp_servers from top-level config
Config struct deserializes from the "memory" subsection of config.json5,
but lsp_servers and mcp_servers are top-level keys. Now explicitly
extracted from the root after initial deserialization.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-09 13:25:33 -04:00
Kent Overstreet
949dacd861 Fast startup: mmap backward scan instead of reading full log
Uses JsonlBackwardIter (SIMD memrchr3) to scan the conversation log
newest-first without reading/parsing the whole file. Stops as soon
as the conversation budget is full. Only the kept nodes get
retokenized and pushed into context.

18MB log → only tokenize the ~50 nodes that fit in the budget.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-09 13:09:26 -04:00
Kent Overstreet
7da3efc5df Fast startup: only retokenize tail of conversation log
restore_from_log reads the full log but walks backwards from the tail,
retokenizing each node as it goes. Stops when conversation budget is
full. Only the nodes that fit get pushed into context.

Added AstNode::retokenize() — recomputes token_ids on all leaves
after deserialization (serde skip means they're empty).

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-09 13:06:19 -04:00
Kent Overstreet
6ec0e1c766 LSP client: spawn language servers, expose code intelligence tools
New lsp.rs: LspRegistry manages persistent LSP server connections.
Spawns child processes, speaks LSP protocol (Content-Length framed
JSON-RPC over stdio). Server indexes the project once; queries are
cheap.

Tools: lsp_definition, lsp_references, lsp_hover, lsp_symbols,
lsp_callers. Each takes file/line/character, queries the running
language server.

LspRegistry lives on Agent as Option<Arc>, shared across forks.
Still needs: config-driven server startup (like MCP).

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-09 12:59:25 -04:00
Kent Overstreet
8b5614ba99 MCP client: spawn external tool servers, dispatch via JSON-RPC
New mcp_client.rs: McpRegistry manages MCP server connections.
Spawns child processes, speaks JSON-RPC 2.0 over stdio. Discovers
tools via tools/list, dispatches calls via tools/call.

dispatch_with_agent falls through to MCP after checking internal
tools. McpRegistry lives on Agent (shared across forks).

Still needs: config-driven server startup, system prompt integration.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-09 12:59:25 -04:00
15 changed files with 1160 additions and 200 deletions

1
Cargo.lock generated
View file

@ -671,6 +671,7 @@ version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8b9f2e4c67f833b660cdb0a3523065869fb35570177239812ed4c905aeff87b"
dependencies = [
"base64 0.22.1",
"bitflags 2.11.0",
"crossterm_winapi",
"derive_more",

View file

@ -20,6 +20,7 @@ edition.workspace = true
[dependencies]
anyhow = "1"
crossterm = { version = "0.29", features = ["event-stream", "bracketed-paste", "osc52"] }
clap = { version = "4", features = ["derive"] }
figment = { version = "0.10", features = ["env"] }
dirs = "6"
@ -30,7 +31,6 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
json5 = "1.3"
crossterm = { version = "0.29", features = ["event-stream"] }
ratatui = { version = "0.30", features = ["unstable-rendered-line-info"] }
tui-markdown = { git = "https://github.com/koverstreet/tui-markdown", subdirectory = "tui-markdown" }
tui-textarea = { version = "0.10.2", package = "tui-textarea-2" }

View file

@ -45,7 +45,7 @@ pub(crate) struct SamplingParams {
/// One token from the streaming completions API.
pub enum StreamToken {
Token { text: String, id: u32 },
Token(u32),
Done { usage: Option<Usage> },
Error(String),
}
@ -159,20 +159,19 @@ async fn stream_completions(
};
for choice in choices {
let text = choice["text"].as_str().unwrap_or("");
let token_ids = choice["token_ids"].as_array();
if let Some(ids) = token_ids {
for (i, id_val) in ids.iter().enumerate() {
if let Some(ids) = choice["token_ids"].as_array() {
for id_val in ids {
if let Some(id) = id_val.as_u64() {
let _ = tx.send(StreamToken::Token {
text: if i == 0 { text.to_string() } else { String::new() },
id: id as u32,
});
let _ = tx.send(StreamToken::Token(id as u32));
}
}
} else if let Some(text) = choice["text"].as_str() {
// Fallback: provider didn't return token_ids, encode locally
if !text.is_empty() {
for id in super::tokenizer::encode(text) {
let _ = tx.send(StreamToken::Token(id));
}
}
} else if !text.is_empty() {
let _ = tx.send(StreamToken::Token { text: text.to_string(), id: 0 });
}
}
}

View file

@ -296,6 +296,23 @@ impl AstNode {
// -- Builder --------------------------------------------------------------
pub fn retokenize(self) -> Self {
match self {
Self::Leaf(leaf) => {
let token_ids = if leaf.body.is_prompt_visible() {
tokenizer::encode(&leaf.body.render())
} else {
vec![]
};
Self::Leaf(NodeLeaf { token_ids, ..leaf })
}
Self::Branch { role, children } => Self::Branch {
role,
children: children.into_iter().map(|c| c.retokenize()).collect(),
},
}
}
pub fn with_timestamp(mut self, ts: DateTime<Utc>) -> Self {
match &mut self {
Self::Leaf(leaf) => leaf.timestamp = Some(ts),
@ -427,49 +444,57 @@ fn format_tool_call_xml(name: &str, args_json: &str) -> String {
xml
}
fn normalize_xml_tags(text: &str) -> String {
let mut result = String::with_capacity(text.len());
let mut chars = text.chars().peekable();
while let Some(ch) = chars.next() {
if ch == '<' {
let mut tag = String::from('<');
for inner in chars.by_ref() {
if inner == '>' {
tag.push('>');
break;
} else if inner.is_whitespace() {
// Skip whitespace inside tags
} else {
tag.push(inner);
/// Search for a sequence of literal parts separated by optional ASCII whitespace.
/// Returns (start, end) byte positions of the overall match.
///
/// Handles the case where streaming tokenization inserts whitespace inside
/// XML tag structure, e.g. `< function = bash >` instead of `<function=bash>`.
fn find_ws_seq(s: &str, parts: &[&str]) -> Option<(usize, usize)> {
let bytes = s.as_bytes();
let mut search_from = 0;
'outer: loop {
let start = s[search_from..].find(parts[0])? + search_from;
let mut pos = start + parts[0].len();
for &part in &parts[1..] {
while pos < bytes.len() && bytes[pos].is_ascii_whitespace() {
pos += 1;
}
if !s[pos..].starts_with(part) {
search_from = start + 1;
continue 'outer;
}
result.push_str(&tag);
} else {
result.push(ch);
pos += part.len();
}
return Some((start, pos));
}
result
}
/// Parse a Qwen-style XML tag: `<tag=name>body</tag>`.
/// Tolerates whitespace inside tag delimiters (streaming artifact).
/// Body content is returned verbatim except for a single leading/trailing
/// newline (XML formatting convention).
fn parse_qwen_tag<'a>(s: &'a str, tag: &str) -> Option<(&'a str, &'a str, &'a str)> {
let open = format!("<{}=", tag);
// Open tag: tolerate whitespace from streaming tokenization
let (_, after_eq) = find_ws_seq(s, &["<", tag, "="])?;
let gt_offset = s[after_eq..].find('>')?;
let name = s[after_eq..after_eq + gt_offset].trim();
let body_start = after_eq + gt_offset + 1;
// Close tag: exact match — model doesn't insert whitespace in close tags
let close = format!("</{}>", tag);
let close_offset = s[body_start..].find(&close)?;
let body = &s[body_start..body_start + close_offset];
// Strip the single leading/trailing newline from XML formatting,
// but preserve all other whitespace (indentation matters for code).
let body = body.strip_prefix('\n').unwrap_or(body);
let body = body.strip_suffix('\n').unwrap_or(body);
let rest = &s[body_start + close_offset + close.len()..];
let start = s.find(&open)? + open.len();
let name_end = start + s[start..].find('>')?;
let body_start = name_end + 1;
let body_end = body_start + s[body_start..].find(&close)?;
Some((
s[start..name_end].trim(),
s[body_start..body_end].trim(),
&s[body_end + close.len()..],
))
Some((name, body, rest))
}
fn parse_tool_call_body(body: &str) -> Option<(String, String)> {
let normalized = normalize_xml_tags(body);
let body = normalized.trim();
let body = body.trim();
parse_xml_tool_call(body)
.or_else(|| parse_json_tool_call(body))
}
@ -494,6 +519,38 @@ fn parse_json_tool_call(body: &str) -> Option<(String, String)> {
Some((name.to_string(), serde_json::to_string(arguments).unwrap_or_default()))
}
/// Search `buf` for `close_tag`. If found, append everything before it to
/// `accum`, advance `buf` past the tag, and return the accumulated content.
/// If not found, drain the safe prefix (preserving any partial tag match at
/// the end of buf) into `accum`.
fn scan_close_tag(buf: &mut String, close_tag: &str, accum: &mut String) -> Option<String> {
if let Some(pos) = buf.find(close_tag) {
accum.push_str(&buf[..pos]);
*buf = buf[pos + close_tag.len()..].to_string();
Some(std::mem::take(accum))
} else {
let drained = drain_safe(buf, close_tag.len());
if !drained.is_empty() {
accum.push_str(&drained);
}
None
}
}
/// Remove everything from `buf` except the last `tag_len` bytes, which might
/// be a partial tag. Returns the removed prefix.
fn drain_safe(buf: &mut String, tag_len: usize) -> String {
let safe = buf.len().saturating_sub(tag_len);
if safe > 0 {
let safe = buf.floor_char_boundary(safe);
let drained = buf[..safe].to_string();
*buf = buf[safe..].to_string();
drained
} else {
String::new()
}
}
impl ResponseParser {
pub fn new(branch_idx: usize) -> Self {
Self {
@ -529,10 +586,11 @@ impl ResponseParser {
let mut full_text = String::new();
while let Some(event) = stream.recv().await {
match event {
super::api::StreamToken::Token { text, id } => {
super::api::StreamToken::Token(id) => {
let text = super::tokenizer::decode(&[id]);
full_text.push_str(&text);
let mut ctx = agent.context.lock().await;
let calls = parser.feed_token(&text, id, &mut ctx);
let calls = parser.feed_token(&text, &mut ctx);
if !calls.is_empty() {
if let Some(ref mut f) = log_file {
use std::io::Write;
@ -549,19 +607,18 @@ impl ResponseParser {
super::api::StreamToken::Done { usage } => {
if let Some(ref mut f) = log_file {
use std::io::Write;
let tc_count = full_text.matches("<tool_call>").count();
let ctx_tokens = agent.context.lock().await.tokens();
let _ = writeln!(f, "done: {} chars, {} <tool_call> tags, ctx: {} tokens",
full_text.len(), tc_count, ctx_tokens);
if tc_count == 0 && full_text.len() > 0 {
let ctx = agent.context.lock().await;
let children = ctx.conversation().get(parser.branch_idx)
.map(|n| n.children()).unwrap_or(&[]);
let n_think = children.iter().filter(|c| matches!(c.leaf().map(|l| l.body()), Some(NodeBody::Thinking(_)))).count();
let n_content = children.iter().filter(|c| matches!(c.leaf().map(|l| l.body()), Some(NodeBody::Content(_)))).count();
let n_tool = children.iter().filter(|c| matches!(c.leaf().map(|l| l.body()), Some(NodeBody::ToolCall { .. }))).count();
let _ = writeln!(f, "done: {} chars, {} content + {} think + {} tool_call, ctx: {} tokens",
full_text.len(), n_content, n_think, n_tool, ctx.tokens());
drop(ctx);
if full_text.len() > 0 && n_content == 0 && n_tool == 0 {
let end = full_text.floor_char_boundary(full_text.len().min(2000));
let _ = writeln!(f, "full text:\n{}", &full_text[..end]);
}
for (i, part) in full_text.split("<tool_call>").enumerate() {
if i > 0 {
let end = part.floor_char_boundary(part.len().min(200));
let _ = writeln!(f, "tool_call body: {}...", &part[..end]);
}
let _ = writeln!(f, " unparsed text: {}", &full_text[..end]);
}
}
if let Some(u) = usage {
@ -581,42 +638,33 @@ impl ResponseParser {
(rx, handle)
}
pub fn feed_token(&mut self, text: &str, _token_id: u32, ctx: &mut ContextState) -> Vec<PendingToolCall> {
pub fn feed_token(&mut self, text: &str, ctx: &mut ContextState) -> Vec<PendingToolCall> {
const THINK_OPEN: &str = "<think>";
const THINK_CLOSE: &str = "</think>";
const TOOL_CALL_OPEN: &str = "<tool_call>";
const TOOL_CALL_CLOSE: &str = "</tool_call>";
const OPEN_TAGS: &[&str] = &[THINK_OPEN, TOOL_CALL_OPEN];
let mut pending = Vec::new();
self.buf.push_str(text);
loop {
if self.in_think {
match self.buf.find("</think>") {
Some(end) => {
self.think_buf.push_str(&self.buf[..end]);
self.buf = self.buf[end + 8..].to_string();
if let Some(content) = scan_close_tag(&mut self.buf, THINK_CLOSE, &mut self.think_buf) {
self.in_think = false;
let text = std::mem::take(&mut self.think_buf).trim().to_string();
let text = content.trim().to_string();
if !text.is_empty() {
self.push_child(ctx, AstNode::thinking(text));
}
continue;
}
None => {
let safe = self.buf.len().saturating_sub(8);
if safe > 0 {
let safe = self.buf.floor_char_boundary(safe);
self.think_buf.push_str(&self.buf[..safe]);
self.buf = self.buf[safe..].to_string();
}
break;
}
}
}
if self.in_tool_call {
match self.buf.find("</tool_call>") {
Some(end) => {
self.tool_call_buf.push_str(&self.buf[..end]);
self.buf = self.buf[end + 12..].to_string();
if let Some(content) = scan_close_tag(&mut self.buf, TOOL_CALL_CLOSE, &mut self.tool_call_buf) {
self.in_tool_call = false;
if let Some((name, args)) = parse_tool_call_body(&self.tool_call_buf) {
if let Some((name, args)) = parse_tool_call_body(&content) {
self.flush_content(ctx);
self.push_child(ctx, AstNode::tool_call(&name, &args));
self.call_counter += 1;
@ -626,52 +674,36 @@ impl ResponseParser {
id: format!("call_{}", self.call_counter),
});
}
self.tool_call_buf.clear();
continue;
}
None => {
let safe = self.buf.len().saturating_sub(12);
if safe > 0 {
let safe = self.buf.floor_char_boundary(safe);
self.tool_call_buf.push_str(&self.buf[..safe]);
self.buf = self.buf[safe..].to_string();
}
break;
}
}
}
let think_pos = self.buf.find("<think>");
let tool_pos = self.buf.find("<tool_call>");
let next_tag = match (think_pos, tool_pos) {
(Some(a), Some(b)) => Some(a.min(b)),
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(None, None) => None,
};
// Not inside a tag — find the earliest opening tag
let next = OPEN_TAGS.iter()
.filter_map(|tag| self.buf.find(tag).map(|pos| (pos, *tag)))
.min_by_key(|(pos, _)| *pos);
match next_tag {
Some(pos) => {
match next {
Some((pos, tag)) => {
if pos > 0 {
self.content_parts.push(self.buf[..pos].to_string());
}
if self.buf[pos..].starts_with("<think>") {
self.buf = self.buf[pos + 7..].to_string();
self.buf = self.buf[pos + tag.len()..].to_string();
self.flush_content(ctx);
self.in_think = true;
} else {
self.buf = self.buf[pos + 11..].to_string();
self.flush_content(ctx);
self.in_tool_call = true;
match tag {
THINK_OPEN => self.in_think = true,
TOOL_CALL_OPEN => self.in_tool_call = true,
_ => unreachable!(),
}
continue;
}
None => {
let safe = self.buf.len().saturating_sub(11);
if safe > 0 {
let safe = self.buf.floor_char_boundary(safe);
self.content_parts.push(self.buf[..safe].to_string());
self.buf = self.buf[safe..].to_string();
// Keep a tail that might be a partial opening tag
let max_tag = OPEN_TAGS.iter().map(|t| t.len()).max().unwrap();
let drained = drain_safe(&mut self.buf, max_tag);
if !drained.is_empty() {
self.content_parts.push(drained);
}
break;
}
@ -993,7 +1025,9 @@ mod tests {
#[test]
fn test_tool_call_xml_parse_streamed_whitespace() {
let body = "<\nfunction\n=\nbash\n>\n<\nparameter\n=\ncommand\n>pwd</\nparameter\n>\n</\nfunction\n>";
// Streaming tokenization can insert whitespace in opening tags,
// but close tags are always emitted verbatim.
let body = "<\nfunction\n=\nbash\n>\n<\nparameter\n=\ncommand\n>pwd</parameter>\n</function>";
let (name, args) = parse_tool_call_body(body).unwrap();
assert_eq!(name, "bash");
let args: serde_json::Value = serde_json::from_str(&args).unwrap();
@ -1010,15 +1044,12 @@ mod tests {
}
#[test]
fn test_normalize_preserves_content() {
let text = "<function=bash>\n<parameter=command>echo hello world</parameter>\n</function>";
let normalized = normalize_xml_tags(text);
assert_eq!(normalized, text);
}
#[test]
fn test_normalize_strips_tag_internal_whitespace() {
assert_eq!(normalize_xml_tags("<\nfunction\n=\nbash\n>"), "<function=bash>");
fn test_tool_call_preserves_code_with_angle_brackets() {
let body = "<function=edit>\n<parameter=code>if x < y {\n std::mem::swap(&mut a, &mut b);\n}</parameter>\n</function>";
let (name, args) = parse_tool_call_body(body).unwrap();
assert_eq!(name, "edit");
let args: serde_json::Value = serde_json::from_str(&args).unwrap();
assert_eq!(args["code"], "if x < y {\n std::mem::swap(&mut a, &mut b);\n}");
}
// -- ResponseParser tests -------------------------------------------------
@ -1032,7 +1063,7 @@ mod tests {
let mut calls = Vec::new();
for chunk in chunks {
// Feed each chunk as a single token (id=0 for tests)
calls.extend(p.feed_token(chunk, 0, &mut ctx));
calls.extend(p.feed_token(chunk, &mut ctx));
}
p.finish(&mut ctx);
(ctx, calls)
@ -1094,7 +1125,7 @@ mod tests {
ctx.push_no_log(Section::Conversation, AstNode::branch(Role::Assistant, vec![]));
let mut p = ResponseParser::new(0);
for ch in text.chars() {
p.feed_token(&ch.to_string(), 0, &mut ctx);
p.feed_token(&ch.to_string(), &mut ctx);
}
p.finish(&mut ctx);
let b = bodies(assistant_children(&ctx));
@ -1111,7 +1142,7 @@ mod tests {
let mut p = ResponseParser::new(0);
let mut tool_calls = 0;
for ch in text.chars() {
tool_calls += p.feed_token(&ch.to_string(), 0, &mut ctx).len();
tool_calls += p.feed_token(&ch.to_string(), &mut ctx).len();
}
p.finish(&mut ctx);
assert_eq!(tool_calls, 1);

View file

@ -138,8 +138,17 @@ pub struct Agent {
}
/// Mutable agent state — behind its own mutex.
/// Which external MCP tools an agent can access.
#[derive(Clone)]
pub enum McpToolAccess {
None,
All,
Some(Vec<String>),
}
pub struct AgentState {
pub tools: Vec<tools::Tool>,
pub mcp_tools: McpToolAccess,
pub last_prompt_tokens: u32,
pub reasoning_effort: String,
pub temperature: f32,
@ -174,8 +183,7 @@ impl Agent {
context.conversation_log = conversation_log;
context.push_no_log(Section::System, AstNode::system_msg(&system_prompt));
let tool_defs: Vec<String> = tools::tools().iter()
.map(|t| t.to_json()).collect();
let tool_defs = tools::all_tool_definitions().await;
if !tool_defs.is_empty() {
let tools_text = format!(
"# Tools\n\nYou have access to the following functions:\n\n<tools>\n{}\n</tools>\n\n\
@ -202,6 +210,7 @@ impl Agent {
context: tokio::sync::Mutex::new(context),
state: tokio::sync::Mutex::new(AgentState {
tools: tools::tools(),
mcp_tools: McpToolAccess::All,
last_prompt_tokens: 0,
reasoning_effort: "none".to_string(),
temperature: 0.6,
@ -237,6 +246,7 @@ impl Agent {
context: tokio::sync::Mutex::new(ctx),
state: tokio::sync::Mutex::new(AgentState {
tools,
mcp_tools: McpToolAccess::None,
last_prompt_tokens: 0,
reasoning_effort: "none".to_string(),
temperature: st.temperature,
@ -408,8 +418,10 @@ impl Agent {
}
}
Agent::apply_tool_results(&agent, results, &mut ds).await;
if !agent.state.lock().await.pending_yield {
continue;
}
}
// Text-only response — extract text and return
let text = {
@ -457,6 +469,7 @@ impl Agent {
) {
let mut nodes = Vec::new();
for (call, output) in &results {
if call.name == "yield_to_user" { continue; }
ds.had_tool_calls = true;
if output.starts_with("Error:") { ds.tool_errors += 1; }
nodes.push(Self::make_tool_result_node(call, output));
@ -558,28 +571,46 @@ impl Agent {
}
pub async fn restore_from_log(&self) -> bool {
let nodes = {
let tail = {
let ctx = self.context.lock().await;
match &ctx.conversation_log {
Some(log) => match log.read_nodes(64 * 1024 * 1024) {
Ok(nodes) if !nodes.is_empty() => nodes,
_ => return false,
Some(log) => match log.read_tail() {
Ok(t) => t,
Err(_) => return false,
},
None => return false,
}
};
let budget = context::context_budget_tokens();
let fixed = {
let ctx = self.context.lock().await;
ctx.system().iter().chain(ctx.identity().iter())
.map(|n| n.tokens()).sum::<usize>()
};
let conv_budget = budget.saturating_sub(fixed);
// Walk backwards (newest first), retokenize, stop at budget
let mut kept = Vec::new();
let mut total = 0;
for node in tail.iter() {
let node = node.retokenize();
let tok = node.tokens();
if total + tok > conv_budget && !kept.is_empty() { break; }
total += tok;
kept.push(node);
}
kept.reverse();
{
let mut ctx = self.context.lock().await;
ctx.clear(Section::Conversation);
// Push without logging — these are already in the log
for node in nodes {
for node in kept {
ctx.push_no_log(Section::Conversation, node);
}
}
self.compact().await;
let mut st = self.state.lock().await;
st.last_prompt_tokens = self.context.lock().await.tokens() as u32;
self.state.lock().await.last_prompt_tokens = self.context.lock().await.tokens() as u32;
true
}

View file

@ -33,14 +33,12 @@ pub(super) fn tools() -> [super::Tool; 3] {
})) },
Tool { name: "yield_to_user",
description: "Wait for user input before continuing. The only way to enter a waiting state.",
parameters_json: r#"{"type":"object","properties":{"message":{"type":"string","description":"Optional status message"}}}"#,
handler: Arc::new(|agent, v| Box::pin(async move {
let msg = v.get("message").and_then(|v| v.as_str()).unwrap_or("Waiting for input.");
parameters_json: r#"{"type":"object","properties":{}}"#,
handler: Arc::new(|agent, _| Box::pin(async move {
if let Some(agent) = agent {
let mut a = agent.state.lock().await;
a.pending_yield = true;
agent.state.lock().await.pending_yield = true;
}
Ok(format!("Yielding. {}", msg))
Ok(String::new())
})) },
]
}

419
src/agent/tools/lsp.rs Normal file
View file

@ -0,0 +1,419 @@
// tools/lsp.rs — LSP client for code intelligence
//
// Spawns language servers on demand when a file is first queried.
// Finds the project root (git/cargo/etc.) automatically. Maintains
// persistent connections — the server indexes once, queries are cheap.
use anyhow::{Context, Result};
use serde_json::json;
use std::collections::HashSet;
use std::path::Path;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
struct LspServer {
root_path: String,
stdin: BufWriter<ChildStdin>,
stdout: BufReader<ChildStdout>,
_child: Child,
next_id: i64,
opened_files: HashSet<String>,
last_access: u64,
}
impl LspServer {
async fn request(&mut self, method: &str, params: serde_json::Value) -> Result<serde_json::Value> {
self.next_id += 1;
let id = self.next_id;
let msg = json!({ "jsonrpc": "2.0", "id": id, "method": method, "params": params });
self.send_message(&msg).await?;
self.read_response(id).await
}
async fn notify(&mut self, method: &str, params: serde_json::Value) -> Result<()> {
let msg = json!({ "jsonrpc": "2.0", "method": method, "params": params });
self.send_message(&msg).await
}
async fn send_message(&mut self, msg: &serde_json::Value) -> Result<()> {
let body = serde_json::to_string(msg)?;
let header = format!("Content-Length: {}\r\n\r\n", body.len());
self.stdin.write_all(header.as_bytes()).await?;
self.stdin.write_all(body.as_bytes()).await?;
self.stdin.flush().await?;
Ok(())
}
async fn read_response(&mut self, expected_id: i64) -> Result<serde_json::Value> {
loop {
let mut content_length: usize = 0;
loop {
let mut line = String::new();
self.stdout.read_line(&mut line).await?;
let line = line.trim();
if line.is_empty() { break; }
if let Some(len) = line.strip_prefix("Content-Length: ") {
content_length = len.parse()?;
}
}
if content_length == 0 {
anyhow::bail!("LSP: no Content-Length header");
}
let mut body = vec![0u8; content_length];
self.stdout.read_exact(&mut body).await?;
let msg: serde_json::Value = serde_json::from_slice(&body)?;
if let Some(id) = msg.get("id").and_then(|v| v.as_i64()) {
if id == expected_id {
if let Some(err) = msg.get("error") {
anyhow::bail!("LSP error: {}", err);
}
return Ok(msg.get("result").cloned().unwrap_or(serde_json::Value::Null));
}
}
}
}
async fn ensure_open(&mut self, path: &str) -> Result<String> {
let uri = format!("file://{}", path);
if !self.opened_files.contains(&uri) {
let text = std::fs::read_to_string(path)
.with_context(|| format!("reading {}", path))?;
self.notify("textDocument/didOpen", json!({
"textDocument": {
"uri": uri,
"languageId": detect_language(path),
"version": 1,
"text": text,
}
})).await?;
self.opened_files.insert(uri.clone());
}
Ok(uri)
}
}
fn detect_language(path: &str) -> &'static str {
match Path::new(path).extension().and_then(|e| e.to_str()) {
Some("rs") => "rust",
Some("c" | "h") => "c",
Some("cpp" | "cc" | "cxx" | "hpp") => "cpp",
Some("py") => "python",
Some("js") => "javascript",
Some("ts") => "typescript",
Some("go") => "go",
Some("java") => "java",
_ => "plaintext",
}
}
fn find_project_root(file_path: &str) -> Option<String> {
let mut dir = Path::new(file_path).parent()?;
loop {
for marker in &[".git", "Cargo.toml", "package.json", "go.mod", "pyproject.toml", "Makefile"] {
if dir.join(marker).exists() {
return Some(dir.to_string_lossy().to_string());
}
}
dir = dir.parent()?;
}
}
const IDLE_TIMEOUT_SECS: u64 = 600;
use std::sync::OnceLock;
use tokio::sync::Mutex as TokioMutex;
struct Registry {
configs: Vec<crate::config::LspServerConfig>,
servers: Vec<LspServer>,
}
static REGISTRY: OnceLock<TokioMutex<Registry>> = OnceLock::new();
fn registry() -> &'static TokioMutex<Registry> {
REGISTRY.get_or_init(|| {
let configs = crate::config::get().lsp_servers.clone();
TokioMutex::new(Registry { configs, servers: Vec::new() })
})
}
fn now() -> u64 {
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs()
}
impl LspServer {
async fn spawn(command: &str, args: &[String], root_path: &str) -> Result<LspServer> {
let mut child = Command::new(command)
.args(args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::null())
.spawn()
.with_context(|| format!("spawning LSP: {} {}", command, args.join(" ")))?;
let mut server = LspServer {
root_path: root_path.to_string(),
stdin: BufWriter::new(child.stdin.take().unwrap()),
stdout: BufReader::new(child.stdout.take().unwrap()),
_child: child,
next_id: 0,
opened_files: HashSet::new(),
last_access: now(),
};
server.request("initialize", json!({
"processId": std::process::id(),
"rootUri": format!("file://{}", root_path),
"capabilities": {
"textDocument": {
"definition": { "dynamicRegistration": false },
"references": { "dynamicRegistration": false },
"hover": { "dynamicRegistration": false },
"documentSymbol": { "dynamicRegistration": false },
"callHierarchy": { "dynamicRegistration": false },
}
},
})).await.with_context(|| format!("initializing LSP for {}", root_path))?;
server.notify("initialized", json!({})).await?;
dbglog!("[lsp] server started for {}", root_path);
Ok(server)
}
}
impl Registry {
fn reap_idle(&mut self) {
let n = now();
self.servers.retain(|s| n.saturating_sub(s.last_access) < IDLE_TIMEOUT_SECS);
}
fn find_config(&self, lang: &str) -> Option<&crate::config::LspServerConfig> {
self.configs.iter().find(|c| {
if c.languages.is_empty() {
// Auto: rust-analyzer for rust, etc.
c.command.contains(lang) || c.name == lang
} else {
c.languages.iter().any(|l| l == lang)
}
})
}
async fn ensure_server(&mut self, file_path: &str) -> Result<usize> {
let root = find_project_root(file_path)
.ok_or_else(|| anyhow::anyhow!("no project root found for {}", file_path))?;
let lang = detect_language(file_path);
self.reap_idle();
if let Some(idx) = self.servers.iter().position(|s| s.root_path == root) {
self.servers[idx].last_access = now();
return Ok(idx);
}
let config = self.find_config(lang)
.ok_or_else(|| anyhow::anyhow!("no LSP server configured for {}", lang))?
.clone();
let server = LspServer::spawn(&config.command, &config.args, &root).await?;
self.servers.push(server);
Ok(self.servers.len() - 1)
}
async fn conn_for(&mut self, path: &str) -> Result<(&mut LspServer, String)> {
let idx = self.ensure_server(path).await?;
let server = &mut self.servers[idx];
let uri = server.ensure_open(path).await?;
Ok((server, uri))
}
}
// -- Operation table ----------------------------------------------------------
use std::sync::Arc;
struct LspOp {
tool_name: &'static str,
description: &'static str,
method: &'static str,
needs_position: bool,
extra_params: fn() -> serde_json::Value,
format: fn(&serde_json::Value) -> String,
// Two-step RPCs (e.g. incoming_calls) use a second method on the first result
followup: Option<&'static str>,
}
fn no_extra() -> serde_json::Value { json!({}) }
fn ref_extra() -> serde_json::Value { json!({"context": {"includeDeclaration": true}}) }
fn fmt_locations(result: &serde_json::Value) -> String {
let locations = if result.is_array() {
result.as_array().unwrap().clone()
} else if result.is_object() {
vec![result.clone()]
} else {
return "No results.".into();
};
let mut out = String::new();
for loc in &locations {
let uri = loc["uri"].as_str().or_else(|| loc["targetUri"].as_str()).unwrap_or("");
let range = if loc.get("range").is_some() { &loc["range"] } else { &loc["targetRange"] };
let line = range["start"]["line"].as_u64().unwrap_or(0) + 1;
let file = uri.strip_prefix("file://").unwrap_or(uri);
out.push_str(&format!("{}:{}\n", file, line));
}
if out.is_empty() { "No results.".into() } else { out }
}
fn fmt_hover(result: &serde_json::Value) -> String {
if result.is_null() { return "No hover information.".into(); }
let contents = &result["contents"];
if let Some(s) = contents.as_str() { return s.to_string(); }
if let Some(obj) = contents.as_object() {
return obj.get("value").and_then(|v| v.as_str()).unwrap_or("").to_string();
}
serde_json::to_string_pretty(result).unwrap_or_default()
}
fn fmt_symbols(result: &serde_json::Value) -> String {
if let Some(symbols) = result.as_array() {
let mut out = String::new();
fmt_symbols_recursive(symbols, &mut out, 0);
if out.is_empty() { "No symbols found.".into() } else { out }
} else {
"No symbols found.".into()
}
}
fn fmt_symbols_recursive(symbols: &[serde_json::Value], out: &mut String, depth: usize) {
let indent = " ".repeat(depth);
for sym in symbols {
let name = sym["name"].as_str().unwrap_or("?");
let kind = match sym["kind"].as_u64().unwrap_or(0) {
2 => "Module", 5 => "Class", 6 => "Method", 8 => "Field",
10 => "Enum", 11 => "Interface", 12 => "Function", 13 => "Variable",
14 => "Constant", 22 => "EnumMember", 23 => "Struct", 26 => "TypeParameter",
_ => "Symbol",
};
let line = sym["range"]["start"]["line"].as_u64()
.or_else(|| sym["location"]["range"]["start"]["line"].as_u64())
.unwrap_or(0) + 1;
out.push_str(&format!("{}{} ({}) - Line {}\n", indent, name, kind, line));
if let Some(children) = sym.get("children").and_then(|c| c.as_array()) {
fmt_symbols_recursive(children, out, depth + 1);
}
}
}
fn fmt_callers(result: &serde_json::Value) -> String {
if let Some(calls) = result.as_array() {
let mut out = String::new();
for call in calls {
if let Some(from) = call.get("from") {
let name = from["name"].as_str().unwrap_or("?");
let uri = from["uri"].as_str().unwrap_or("");
let line = from["range"]["start"]["line"].as_u64().unwrap_or(0) + 1;
let file = uri.strip_prefix("file://").unwrap_or(uri);
out.push_str(&format!("{}:{}: {}\n", file, line, name));
}
}
if out.is_empty() { "No incoming calls.".into() } else { out }
} else {
"No incoming calls.".into()
}
}
static OPS: &[LspOp] = &[
LspOp {
tool_name: "lsp_definition",
description: "Find where a symbol is defined.",
method: "textDocument/definition",
needs_position: true,
extra_params: no_extra,
format: fmt_locations,
followup: None,
},
LspOp {
tool_name: "lsp_references",
description: "Find all references to a symbol.",
method: "textDocument/references",
needs_position: true,
extra_params: ref_extra,
format: fmt_locations,
followup: None,
},
LspOp {
tool_name: "lsp_hover",
description: "Get type info and documentation for a symbol.",
method: "textDocument/hover",
needs_position: true,
extra_params: no_extra,
format: fmt_hover,
followup: None,
},
LspOp {
tool_name: "lsp_symbols",
description: "List all symbols in a file.",
method: "textDocument/documentSymbol",
needs_position: false,
extra_params: no_extra,
format: fmt_symbols,
followup: None,
},
LspOp {
tool_name: "lsp_callers",
description: "Find all functions that call the function at a position.",
method: "textDocument/prepareCallHierarchy",
needs_position: true,
extra_params: no_extra,
format: fmt_callers,
followup: Some("callHierarchy/incomingCalls"),
},
];
const POS_PARAMS: &str = r#"{"type":"object","properties":{"file":{"type":"string"},"line":{"type":"integer"},"character":{"type":"integer"}},"required":["file","line","character"]}"#;
const FILE_PARAMS: &str = r#"{"type":"object","properties":{"file":{"type":"string"}},"required":["file"]}"#;
async fn dispatch_op(op: &LspOp, v: &serde_json::Value) -> Result<String> {
let file = v["file"].as_str().ok_or_else(|| anyhow::anyhow!("file required"))?;
let mut reg = registry().lock().await;
let (conn, uri) = reg.conn_for(file).await?;
let mut params = json!({ "textDocument": { "uri": uri } });
if op.needs_position {
let line = v["line"].as_u64().ok_or_else(|| anyhow::anyhow!("line required"))? as u32 - 1;
let character = v["character"].as_u64().unwrap_or(0) as u32;
params["position"] = json!({ "line": line, "character": character });
}
let extra = (op.extra_params)();
if let Some(obj) = extra.as_object() {
for (k, v) in obj { params[k] = v.clone(); }
}
let result = conn.request(op.method, params).await?;
if let Some(followup) = op.followup {
let item = result.as_array().and_then(|a| a.first())
.ok_or_else(|| anyhow::anyhow!("no item at this position"))?;
let result2 = conn.request(followup, json!({ "item": item })).await?;
return Ok((op.format)(&result2));
}
Ok((op.format)(&result))
}
pub(super) fn tools() -> Vec<super::Tool> {
OPS.iter().map(|op| {
let name = op.tool_name;
super::Tool {
name: op.tool_name,
description: op.description,
parameters_json: if op.needs_position { POS_PARAMS } else { FILE_PARAMS },
handler: Arc::new(move |_agent, v| Box::pin(async move {
let op = OPS.iter().find(|o| o.tool_name == name).unwrap();
dispatch_op(op, &v).await
})),
}
}).collect()
}

View file

@ -0,0 +1,192 @@
// tools/mcp_client.rs — MCP client for external tool servers
//
// Spawns external MCP servers, discovers their tools, dispatches calls.
// JSON-RPC 2.0 over stdio (newline-delimited). Global registry, lazy
// init from config.
use anyhow::{Context, Result};
use serde::Deserialize;
use serde_json::json;
use std::sync::OnceLock;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use tokio::sync::Mutex as TokioMutex;
#[derive(Debug, Clone)]
pub struct McpTool {
pub name: String,
pub description: String,
pub parameters_json: String,
}
struct McpServer {
#[allow(dead_code)]
name: String,
stdin: BufWriter<ChildStdin>,
stdout: BufReader<ChildStdout>,
_child: Child,
next_id: u64,
tools: Vec<McpTool>,
}
#[derive(Debug, Deserialize)]
struct JsonRpcResponse {
id: Option<u64>,
result: Option<serde_json::Value>,
error: Option<JsonRpcError>,
}
#[derive(Debug, Deserialize)]
struct JsonRpcError {
code: i64,
message: String,
}
impl McpServer {
async fn request(&mut self, method: &str, params: Option<serde_json::Value>) -> Result<serde_json::Value> {
self.next_id += 1;
let id = self.next_id;
let req = json!({ "jsonrpc": "2.0", "id": id, "method": method, "params": params });
let mut line = serde_json::to_string(&req)?;
line.push('\n');
self.stdin.write_all(line.as_bytes()).await?;
self.stdin.flush().await?;
let mut buf = String::new();
loop {
buf.clear();
let n = self.stdout.read_line(&mut buf).await?;
if n == 0 { anyhow::bail!("MCP server closed connection"); }
let trimmed = buf.trim();
if trimmed.is_empty() { continue; }
if let Ok(resp) = serde_json::from_str::<JsonRpcResponse>(trimmed) {
if resp.id == Some(id) {
if let Some(err) = resp.error {
anyhow::bail!("MCP error {}: {}", err.code, err.message);
}
return Ok(resp.result.unwrap_or(serde_json::Value::Null));
}
}
}
}
async fn notify(&mut self, method: &str) -> Result<()> {
let msg = json!({ "jsonrpc": "2.0", "method": method });
let mut line = serde_json::to_string(&msg)?;
line.push('\n');
self.stdin.write_all(line.as_bytes()).await?;
self.stdin.flush().await?;
Ok(())
}
async fn spawn(name: &str, command: &str, args: &[&str]) -> Result<McpServer> {
let mut child = Command::new(command)
.args(args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::null())
.spawn()
.with_context(|| format!("spawning MCP server: {} {}", command, args.join(" ")))?;
let mut server = McpServer {
name: name.to_string(),
stdin: BufWriter::new(child.stdin.take().unwrap()),
stdout: BufReader::new(child.stdout.take().unwrap()),
_child: child,
next_id: 0,
tools: Vec::new(),
};
server.request("initialize", Some(json!({
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "consciousness", "version": "0.1"}
}))).await.with_context(|| format!("initializing MCP server {}", name))?;
server.notify("notifications/initialized").await?;
let tools_result = server.request("tools/list", None).await
.with_context(|| format!("listing tools from MCP server {}", name))?;
if let Some(tool_list) = tools_result.get("tools").and_then(|t| t.as_array()) {
for tool in tool_list {
server.tools.push(McpTool {
name: tool["name"].as_str().unwrap_or("").to_string(),
description: tool["description"].as_str().unwrap_or("").to_string(),
parameters_json: tool.get("inputSchema")
.map(|s| serde_json::to_string(s).unwrap_or_default())
.unwrap_or_else(|| r#"{"type":"object"}"#.to_string()),
});
}
}
dbglog!("[mcp] {} connected: {} tools", name, server.tools.len());
Ok(server)
}
}
struct Registry {
servers: Vec<McpServer>,
}
static REGISTRY: OnceLock<TokioMutex<Registry>> = OnceLock::new();
fn registry() -> &'static TokioMutex<Registry> {
REGISTRY.get_or_init(|| {
let configs = &crate::config::get().mcp_servers;
// Can't do async init in OnceLock, so servers are spawned lazily on first access
let _ = configs; // configs read but servers spawned in ensure_init()
TokioMutex::new(Registry { servers: Vec::new() })
})
}
async fn ensure_init() -> Result<()> {
let mut reg = registry().lock().await;
if !reg.servers.is_empty() { return Ok(()); }
let configs = crate::config::get().mcp_servers.clone();
for cfg in &configs {
let args: Vec<&str> = cfg.args.iter().map(|s| s.as_str()).collect();
match McpServer::spawn(&cfg.name, &cfg.command, &args).await {
Ok(server) => reg.servers.push(server),
Err(e) => eprintln!("warning: MCP server {} failed: {:#}", cfg.name, e),
}
}
Ok(())
}
pub(super) async fn call_tool(name: &str, args: &serde_json::Value) -> Result<String> {
ensure_init().await?;
let mut reg = registry().lock().await;
let server = reg.servers.iter_mut()
.find(|s| s.tools.iter().any(|t| t.name == name))
.ok_or_else(|| anyhow::anyhow!("no MCP server has tool {}", name))?;
let result = server.request("tools/call", Some(json!({
"name": name, "arguments": args,
}))).await.with_context(|| format!("calling MCP tool {}", name))?;
if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
let texts: Vec<&str> = content.iter()
.filter_map(|c| c.get("text").and_then(|t| t.as_str()))
.collect();
Ok(texts.join("\n"))
} else if let Some(text) = result.as_str() {
Ok(text.to_string())
} else {
Ok(serde_json::to_string_pretty(&result)?)
}
}
pub(super) async fn tool_definitions_json() -> Vec<String> {
let _ = ensure_init().await;
let reg = registry().lock().await;
reg.servers.iter()
.flat_map(|s| s.tools.iter())
.map(|t| format!(
r#"{{"type":"function","function":{{"name":"{}","description":"{}","parameters":{}}}}}"#,
t.name,
t.description.replace('"', r#"\""#),
t.parameters_json,
))
.collect()
}

View file

@ -6,6 +6,8 @@
// Core tools
mod ast_grep;
pub mod lsp;
pub mod mcp_client;
mod bash;
pub mod channels;
mod edit;
@ -152,7 +154,22 @@ pub async fn dispatch_with_agent(
match tool {
Some(t) => (t.handler)(agent, args.clone()).await
.unwrap_or_else(|e| format!("Error: {}", e)),
None => format!("Error: Unknown tool: {}", name),
None => {
let allowed = match &agent {
Some(a) => match &a.state.lock().await.mcp_tools {
super::McpToolAccess::All => true,
super::McpToolAccess::Some(list) => list.iter().any(|t| t == name),
super::McpToolAccess::None => false,
},
None => true,
};
if allowed {
if let Ok(result) = mcp_client::call_tool(name, args).await {
return result;
}
}
format!("Error: Unknown tool: {}", name)
}
}
}
@ -168,9 +185,16 @@ pub fn tools() -> Vec<Tool> {
all.extend(memory::journal_tools());
all.extend(channels::tools());
all.extend(control::tools());
all.extend(lsp::tools());
all
}
pub async fn all_tool_definitions() -> Vec<String> {
let mut defs: Vec<String> = tools().iter().map(|t| t.to_json()).collect();
defs.extend(mcp_client::tool_definitions_json().await);
defs
}
/// Memory + journal tools only — for subconscious agents.
pub fn memory_and_journal_tools() -> Vec<Tool> {
let mut all = memory::memory_tools().to_vec();
@ -222,10 +246,7 @@ pub fn summarize_args(tool_name: &str, args: &serde_json::Value) -> String {
entry.to_string()
}
}
"yield_to_user" => args["message"]
.as_str()
.unwrap_or("")
.to_string(),
"yield_to_user" => String::new(),
"switch_model" => args["model"]
.as_str()
.unwrap_or("")

View file

@ -107,6 +107,10 @@ pub struct Config {
pub scoring_response_window: usize,
pub api_reasoning: String,
pub agent_types: Vec<String>,
#[serde(default)]
pub mcp_servers: Vec<McpServerConfig>,
#[serde(default)]
pub lsp_servers: Vec<LspServerConfig>,
/// Surface agent timeout in seconds.
#[serde(default)]
pub surface_timeout_secs: Option<u32>,
@ -164,6 +168,8 @@ impl Default for Config {
surface_timeout_secs: None,
surface_conversation_bytes: None,
surface_hooks: vec![],
mcp_servers: vec![],
lsp_servers: vec![],
}
}
}
@ -204,6 +210,14 @@ impl Config {
}
}
// Top-level config sections (not inside "memory")
if let Some(servers) = root.get("lsp_servers") {
config.lsp_servers = serde_json::from_value(servers.clone()).unwrap_or_default();
}
if let Some(servers) = root.get("mcp_servers") {
config.mcp_servers = serde_json::from_value(servers.clone()).unwrap_or_default();
}
Some(config)
}
@ -346,6 +360,28 @@ pub struct AppConfig {
pub models: HashMap<String, ModelConfig>,
#[serde(default = "default_model_name")]
pub default_model: String,
#[serde(default)]
pub mcp_servers: Vec<McpServerConfig>,
#[serde(default)]
pub lsp_servers: Vec<LspServerConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpServerConfig {
pub name: String,
pub command: String,
#[serde(default)]
pub args: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LspServerConfig {
pub name: String,
pub command: String,
#[serde(default)]
pub args: Vec<String>,
#[serde(default)]
pub languages: Vec<String>, // e.g. ["rust"], ["c", "cpp"]. Empty = auto-detect
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
@ -436,6 +472,8 @@ impl Default for AppConfig {
system_prompt_file: None,
models: HashMap::new(),
default_model: String::new(),
mcp_servers: Vec::new(),
lsp_servers: Vec::new(),
}
}
}

View file

@ -1,8 +1,10 @@
use anyhow::{Context, Result};
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Seek, SeekFrom, Write};
use std::io::Write;
use std::path::{Path, PathBuf};
use crate::agent::context::AstNode;
use crate::hippocampus::transcript::JsonlBackwardIter;
use memmap2::Mmap;
pub struct ConversationLog {
path: PathBuf,
@ -33,32 +35,19 @@ impl ConversationLog {
Ok(())
}
pub fn read_nodes(&self, max_bytes: u64) -> Result<Vec<AstNode>> {
/// Read nodes from the tail of the log, newest first.
/// Caller decides when to stop (budget, count, etc).
pub fn read_tail(&self) -> Result<TailNodes> {
if !self.path.exists() {
return Ok(Vec::new());
anyhow::bail!("log does not exist");
}
let file = File::open(&self.path)
.with_context(|| format!("opening log {}", self.path.display()))?;
let file_len = file.metadata()?.len();
let mut reader = BufReader::new(file);
if file_len > max_bytes {
reader.seek(SeekFrom::Start(file_len - max_bytes))?;
let mut discard = String::new();
reader.read_line(&mut discard)?;
if file.metadata()?.len() == 0 {
anyhow::bail!("log is empty");
}
let mut nodes = Vec::new();
for line in reader.lines() {
let line = line.context("reading log tail")?;
let line = line.trim();
if line.is_empty() { continue; }
if let Ok(node) = serde_json::from_str::<AstNode>(line) {
nodes.push(node);
}
// Old format entries silently skipped — journal has the context
}
Ok(nodes)
let mmap = unsafe { Mmap::map(&file)? };
Ok(TailNodes { _file: file, mmap })
}
pub fn path(&self) -> &Path {
@ -66,12 +55,13 @@ impl ConversationLog {
}
pub fn oldest_timestamp(&self) -> Option<chrono::DateTime<chrono::Utc>> {
// Read forward from the start to find first timestamp
let file = File::open(&self.path).ok()?;
let reader = BufReader::new(file);
for line in reader.lines().flatten() {
let line = line.trim().to_string();
let mmap = unsafe { Mmap::map(&file).ok()? };
// Find first { ... } and parse
for line in mmap.split(|&b| b == b'\n') {
if line.is_empty() { continue; }
if let Ok(node) = serde_json::from_str::<AstNode>(&line) {
if let Ok(node) = serde_json::from_slice::<AstNode>(line) {
if let Some(leaf) = node.leaf() {
if let Some(ts) = leaf.timestamp() {
return Some(ts);
@ -82,3 +72,16 @@ impl ConversationLog {
None
}
}
/// Iterates over conversation log nodes newest-first, using mmap + backward scan.
pub struct TailNodes {
_file: File,
mmap: Mmap,
}
impl TailNodes {
pub fn iter(&self) -> impl Iterator<Item = AstNode> + '_ {
JsonlBackwardIter::new(&self.mmap)
.filter_map(|bytes| serde_json::from_slice::<AstNode>(bytes).ok())
}
}

View file

@ -494,6 +494,7 @@ impl Mind {
};
let mut cmds = Vec::new();
let mut dmn_expired = false;
tokio::select! {
biased;
@ -526,17 +527,15 @@ impl Mind {
}
cmds.push(MindCommand::Compact);
/*
* Broken since the AST context window conversion:
if !self.config.no_agents {
cmds.push(MindCommand::Score);
}
*/
}
_ = tokio::time::sleep(timeout), if !has_input => {
let tick = self.shared.lock().unwrap().dmn_tick();
if let Some((prompt, target)) = tick {
self.start_turn(&prompt, target).await;
}
}
_ = tokio::time::sleep(timeout), if !has_input => dmn_expired = true,
}
if !self.config.no_agents {
@ -562,6 +561,14 @@ impl Mind {
if let Some(text) = pending {
self.start_turn(&text, StreamTarget::Conversation).await;
}
/*
else if dmn_expired {
let tick = self.shared.lock().unwrap().dmn_tick();
if let Some((prompt, target)) = tick {
self.start_turn(&prompt, target).await;
}
}
*/
self.run_commands(cmds).await;
}

View file

@ -68,8 +68,8 @@ impl State {
/// How long to wait before the next DMN prompt in this state.
pub fn interval(&self) -> Duration {
match self {
State::Engaged => Duration::from_secs(5),
State::Working => Duration::from_secs(3),
State::Engaged => Duration::from_secs(5),
State::Foraging => Duration::from_secs(30),
State::Resting { .. } => Duration::from_secs(300),
State::Paused | State::Off => Duration::from_secs(86400), // effectively never

View file

@ -9,8 +9,9 @@ use ratatui::{
text::{Line, Span},
widgets::{Block, Borders, Paragraph, Wrap},
Frame,
crossterm::event::{KeyCode, KeyModifiers, MouseEvent, MouseEventKind, MouseButton},
};
use ratatui::crossterm::event::{KeyCode, KeyModifiers, MouseEvent, MouseEventKind, MouseButton};
use super::{App, ScreenView, screen_legend};
use crate::agent::context::{AstNode, NodeBody, Role, Ast};
@ -158,6 +159,56 @@ enum ActivePane {
Tools,
}
/// Text selection within a pane. Anchor is where the click started,
/// cursor is where the mouse currently is. They may be in either order.
#[derive(Debug, Clone, PartialEq, Default)]
struct Selection {
anchor_line: usize,
anchor_col: usize,
cursor_line: usize,
cursor_col: usize,
}
impl Selection {
fn new(line: usize, col: usize) -> Self {
Self { anchor_line: line, anchor_col: col, cursor_line: line, cursor_col: col }
}
fn extend(&mut self, line: usize, col: usize) {
self.cursor_line = line;
self.cursor_col = col;
}
/// Normalized range: (start_line, start_col, end_line, end_col)
fn range(&self) -> (usize, usize, usize, usize) {
if (self.anchor_line, self.anchor_col) <= (self.cursor_line, self.cursor_col) {
(self.anchor_line, self.anchor_col, self.cursor_line, self.cursor_col)
} else {
(self.cursor_line, self.cursor_col, self.anchor_line, self.anchor_col)
}
}
fn text(&self, lines: &[Line<'static>]) -> String {
let (start_line, start_col, end_line, end_col) = self.range();
let mut result = String::new();
for (i, line) in lines.iter().enumerate() {
if i < start_line || i > end_line { continue; }
let line_text: String = line.spans.iter().map(|s| s.content.as_ref()).collect();
let sc = if i == start_line { start_col } else { 0 };
let ec = if i == end_line { end_col } else { line_text.len() };
if sc < line_text.len() {
if let Some(selected) = line_text.get(sc..ec.min(line_text.len())) {
if !result.is_empty() {
result.push('\n');
}
result.push_str(selected);
}
}
}
result
}
}
fn strip_ansi(text: &str) -> String {
let mut out = String::with_capacity(text.len());
let mut chars = text.chars().peekable();
@ -226,6 +277,7 @@ struct PaneState {
pinned: bool,
last_total_lines: u16,
last_height: u16,
selection: Option<Selection>,
}
impl PaneState {
@ -237,6 +289,7 @@ impl PaneState {
md_buffer: String::new(), use_markdown,
pending_marker: Marker::None, scroll: 0, pinned: false,
last_total_lines: 0, last_height: 20,
selection: None,
}
}
@ -352,6 +405,56 @@ impl PaneState {
}
(lines, markers)
}
/// Convert mouse coordinates (relative to pane) to line/column position.
fn mouse_to_position(&self, mouse_x: u16, mouse_y: u16, pane_height: u16) -> Option<(usize, usize)> {
let (lines, _) = self.all_lines_with_markers();
if lines.is_empty() || self.cached_width == 0 { return None; }
// Build heights array (reuse cached where possible)
let n_committed = self.line_heights.len();
let mut heights: Vec<u16> = self.line_heights.clone();
for line in lines.iter().skip(n_committed) {
let h = Paragraph::new(line.clone())
.wrap(Wrap { trim: false })
.line_count(self.cached_width) as u16;
heights.push(h.max(1));
}
// Find the first visible line given current scroll
let (first, sub_scroll, _) = visible_range(&heights, self.scroll, pane_height);
// Walk from the first visible line, offset by sub_scroll
let mut row = -(sub_scroll as i32);
for line_idx in first..lines.len() {
let h = heights.get(line_idx).copied().unwrap_or(1) as i32;
if (mouse_y as i32) < row + h {
let line_text: String = lines[line_idx].spans.iter().map(|s| s.content.as_ref()).collect();
let col = (mouse_x as usize).min(line_text.len());
return Some((line_idx, col));
}
row += h;
}
Some((lines.len().saturating_sub(1), 0))
}
/// Set the selection start position.
fn start_selection(&mut self, line: usize, col: usize) {
self.selection = Some(Selection::new(line, col));
}
/// Update the selection end position.
fn extend_selection(&mut self, line: usize, col: usize) {
if let Some(ref mut sel) = self.selection {
sel.extend(line, col);
}
}
/// Get the selected text, or None if nothing is selected.
fn get_selection(&self) -> Option<String> {
let (lines, _) = self.all_lines_with_markers();
self.selection.as_ref().map(|sel| sel.text(&lines))
}
}
pub(crate) struct InteractScreen {
@ -610,14 +713,83 @@ impl InteractScreen {
for (i, area) in self.pane_areas.iter().enumerate() {
if x >= area.x && x < area.x + area.width && y >= area.y && y < area.y + area.height {
self.active_pane = match i { 0 => ActivePane::Autonomous, 1 => ActivePane::Conversation, _ => ActivePane::Tools };
let rel_x = x.saturating_sub(area.x);
let rel_y = y.saturating_sub(area.y);
self.selection_event(i, rel_x, rel_y, true);
break;
}
}
}
MouseEventKind::Drag(MouseButton::Left) => {
let (x, y) = (mouse.column, mouse.row);
let i = match self.active_pane { ActivePane::Autonomous => 0, ActivePane::Conversation => 1, ActivePane::Tools => 2 };
let area = self.pane_areas[i];
if x >= area.x && x < area.x + area.width && y >= area.y && y < area.y + area.height {
let rel_x = x.saturating_sub(area.x);
let rel_y = y.saturating_sub(area.y);
self.selection_event(i, rel_x, rel_y, false);
}
}
MouseEventKind::Up(MouseButton::Left) => {
self.copy_selection_to_clipboard();
}
MouseEventKind::Down(MouseButton::Middle) => {
self.paste_from_selection();
}
_ => {}
}
}
/// Copy the current selection to the clipboard via OSC 52.
fn copy_selection_to_clipboard(&self) {
let text = match self.active_pane {
ActivePane::Autonomous => self.autonomous.get_selection(),
ActivePane::Conversation => self.conversation.get_selection(),
ActivePane::Tools => self.tools.get_selection(),
};
if let Some(ref selected_text) = text {
if selected_text.is_empty() { return; }
// OSC 52 clipboard copy
use std::io::Write;
use base64::Engine;
let encoded = base64::engine::general_purpose::STANDARD.encode(selected_text);
let mut stdout = std::io::stdout().lock();
let _ = write!(stdout, "\x1b]52;c;{}\x07", encoded);
let _ = stdout.flush();
}
}
/// Paste from tmux buffer via middle-click.
fn paste_from_selection(&mut self) {
let result = std::process::Command::new("tmux")
.args(["save-buffer", "-"]).output();
if let Ok(output) = result {
if output.status.success() {
let text = String::from_utf8_lossy(&output.stdout).into_owned();
if !text.is_empty() {
self.textarea.insert_str(&text);
}
}
}
}
fn pane_mut(&mut self, idx: usize) -> &mut PaneState {
match idx { 0 => &mut self.autonomous, 1 => &mut self.conversation, _ => &mut self.tools }
}
fn selection_event(&mut self, pane_idx: usize, rel_x: u16, rel_y: u16, start: bool) {
let height = self.pane_areas[pane_idx].height;
let pane = self.pane_mut(pane_idx);
if let Some((line, col)) = pane.mouse_to_position(rel_x, rel_y, height) {
if start {
pane.start_selection(line, col);
} else {
pane.extend_selection(line, col);
}
}
self.copy_selection_to_clipboard();
}
/// Draw the main (F1) screen — four-pane layout with status bar.
fn draw_main(&mut self, frame: &mut Frame, size: Rect, app: &App) {
// Main layout: content area + active tools overlay + status bar
@ -825,6 +997,11 @@ impl ScreenView for InteractScreen {
self.textarea = new_textarea(vec![String::new()]);
}
}
KeyCode::Char('c') if key.modifiers.contains(KeyModifiers::CONTROL) && key.modifiers.contains(KeyModifiers::SHIFT) => {
// Ctrl+Shift+C: copy selection
self.copy_selection_to_clipboard();
}
// Paste: terminal handles Ctrl+Shift+V natively via bracketed paste
KeyCode::Up if key.modifiers.contains(KeyModifiers::CONTROL) => self.scroll_active_up(3),
KeyCode::Down if key.modifiers.contains(KeyModifiers::CONTROL) => self.scroll_active_down(3),
KeyCode::Up => {
@ -862,6 +1039,9 @@ impl ScreenView for InteractScreen {
}
}
Event::Mouse(mouse) => { self.handle_mouse(*mouse); }
Event::Paste(text) => {
self.textarea.insert_str(text);
}
_ => {}
}
}
@ -1011,8 +1191,44 @@ fn draw_conversation_pane(
// Find visible line range
let (first, sub_scroll, last) = visible_range(&heights, pane.scroll, inner.height);
// Apply selection highlighting to visible lines
let mut visible_lines: Vec<Line<'static>> = Vec::new();
if let Some(ref sel) = pane.selection {
let (sl, sc, el, ec) = sel.range();
for i in first..last {
let line = &lines[i];
let line_text: String = line.spans.iter().map(|s| s.content.as_ref()).collect();
// Check if this line is within the selection
if i >= sl && i <= el {
let start_col = if i == sl { sc } else { 0 };
let end_col = if i == el { ec } else { line_text.len() };
if start_col < end_col {
let before = if start_col > 0 { &line_text[..start_col] } else { "" };
let selected = &line_text[start_col..end_col];
let after = if end_col < line_text.len() { &line_text[end_col..] } else { "" };
let mut new_spans = Vec::new();
if !before.is_empty() {
new_spans.push(Span::raw(before.to_string()));
}
new_spans.push(Span::styled(selected.to_string(), Style::default().bg(Color::DarkGray).fg(Color::White)));
if !after.is_empty() {
new_spans.push(Span::raw(after.to_string()));
}
visible_lines.push(Line::from(new_spans).style(line.style).alignment(line.alignment.unwrap_or(ratatui::layout::Alignment::Left)));
} else {
visible_lines.push(line.clone());
}
} else {
visible_lines.push(line.clone());
}
}
} else {
visible_lines = lines[first..last].to_vec();
}
// Render only the visible slice — no full-content grapheme walk
let text_para = Paragraph::new(lines[first..last].to_vec())
let text_para = Paragraph::new(visible_lines)
.wrap(Wrap { trim: false })
.scroll((sub_scroll, 0));
frame.render_widget(text_para, text_area);

View file

@ -19,7 +19,7 @@ use crate::user::{self as tui};
// --- TUI infrastructure (moved from tui/mod.rs) ---
use ratatui::crossterm::{
event::{EnableMouseCapture, DisableMouseCapture},
event::{EnableMouseCapture, DisableMouseCapture, EnableBracketedPaste, DisableBracketedPaste},
terminal::{self, EnterAlternateScreen, LeaveAlternateScreen},
ExecutableCommand,
};
@ -98,6 +98,7 @@ struct ChannelStatus {
struct App {
status: StatusInfo,
activity: String,
activity_started: Option<std::time::Instant>,
running_processes: u32,
reasoning_effort: String,
temperature: f32,
@ -125,6 +126,7 @@ impl App {
turn_tools: 0, context_budget: String::new(),
},
activity: String::new(),
activity_started: None,
running_processes: 0,
reasoning_effort: "none".to_string(),
temperature: 0.6,
@ -164,12 +166,14 @@ fn init_terminal() -> io::Result<ratatui::Terminal<CrosstermBackend<io::Stdout>>
let mut stdout = io::stdout();
stdout.execute(EnterAlternateScreen)?;
stdout.execute(EnableMouseCapture)?;
stdout.execute(EnableBracketedPaste)?;
let backend = CrosstermBackend::new(stdout);
ratatui::Terminal::new(backend)
}
fn restore_terminal(terminal: &mut ratatui::Terminal<CrosstermBackend<io::Stdout>>) -> io::Result<()> {
terminal::disable_raw_mode()?;
terminal.backend_mut().execute(DisableBracketedPaste)?;
terminal.backend_mut().execute(DisableMouseCapture)?;
terminal.backend_mut().execute(LeaveAlternateScreen)?;
terminal.show_cursor()
@ -319,7 +323,7 @@ async fn run(
let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel();
std::thread::spawn(move || {
loop {
match crossterm::event::read() {
match ratatui::crossterm::event::read() {
Ok(event) => { if event_tx.send(event).is_err() { break; } }
Err(_) => break,
}