diff --git a/Cargo.lock b/Cargo.lock index f4744b0..f4e8519 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -671,6 +671,7 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8b9f2e4c67f833b660cdb0a3523065869fb35570177239812ed4c905aeff87b" dependencies = [ + "base64 0.22.1", "bitflags 2.11.0", "crossterm_winapi", "derive_more", diff --git a/Cargo.toml b/Cargo.toml index 5e3e1cf..64dbf8d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ edition.workspace = true [dependencies] anyhow = "1" +crossterm = { version = "0.29", features = ["event-stream", "bracketed-paste", "osc52"] } clap = { version = "4", features = ["derive"] } figment = { version = "0.10", features = ["env"] } dirs = "6" @@ -30,7 +31,6 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" json5 = "1.3" -crossterm = { version = "0.29", features = ["event-stream"] } ratatui = { version = "0.30", features = ["unstable-rendered-line-info"] } tui-markdown = { git = "https://github.com/koverstreet/tui-markdown", subdirectory = "tui-markdown" } tui-textarea = { version = "0.10.2", package = "tui-textarea-2" } diff --git a/src/agent/api/mod.rs b/src/agent/api/mod.rs index a3c73a0..7c06fa7 100644 --- a/src/agent/api/mod.rs +++ b/src/agent/api/mod.rs @@ -45,7 +45,7 @@ pub(crate) struct SamplingParams { /// One token from the streaming completions API. pub enum StreamToken { - Token { text: String, id: u32 }, + Token(u32), Done { usage: Option }, Error(String), } @@ -159,20 +159,19 @@ async fn stream_completions( }; for choice in choices { - let text = choice["text"].as_str().unwrap_or(""); - let token_ids = choice["token_ids"].as_array(); - - if let Some(ids) = token_ids { - for (i, id_val) in ids.iter().enumerate() { + if let Some(ids) = choice["token_ids"].as_array() { + for id_val in ids { if let Some(id) = id_val.as_u64() { - let _ = tx.send(StreamToken::Token { - text: if i == 0 { text.to_string() } else { String::new() }, - id: id as u32, - }); + let _ = tx.send(StreamToken::Token(id as u32)); + } + } + } else if let Some(text) = choice["text"].as_str() { + // Fallback: provider didn't return token_ids, encode locally + if !text.is_empty() { + for id in super::tokenizer::encode(text) { + let _ = tx.send(StreamToken::Token(id)); } } - } else if !text.is_empty() { - let _ = tx.send(StreamToken::Token { text: text.to_string(), id: 0 }); } } } diff --git a/src/agent/context.rs b/src/agent/context.rs index ccc0830..8063918 100644 --- a/src/agent/context.rs +++ b/src/agent/context.rs @@ -296,6 +296,23 @@ impl AstNode { // -- Builder -------------------------------------------------------------- + pub fn retokenize(self) -> Self { + match self { + Self::Leaf(leaf) => { + let token_ids = if leaf.body.is_prompt_visible() { + tokenizer::encode(&leaf.body.render()) + } else { + vec![] + }; + Self::Leaf(NodeLeaf { token_ids, ..leaf }) + } + Self::Branch { role, children } => Self::Branch { + role, + children: children.into_iter().map(|c| c.retokenize()).collect(), + }, + } + } + pub fn with_timestamp(mut self, ts: DateTime) -> Self { match &mut self { Self::Leaf(leaf) => leaf.timestamp = Some(ts), @@ -427,49 +444,57 @@ fn format_tool_call_xml(name: &str, args_json: &str) -> String { xml } -fn normalize_xml_tags(text: &str) -> String { - let mut result = String::with_capacity(text.len()); - let mut chars = text.chars().peekable(); - while let Some(ch) = chars.next() { - if ch == '<' { - let mut tag = String::from('<'); - for inner in chars.by_ref() { - if inner == '>' { - tag.push('>'); - break; - } else if inner.is_whitespace() { - // Skip whitespace inside tags - } else { - tag.push(inner); - } +/// Search for a sequence of literal parts separated by optional ASCII whitespace. +/// Returns (start, end) byte positions of the overall match. +/// +/// Handles the case where streaming tokenization inserts whitespace inside +/// XML tag structure, e.g. `< function = bash >` instead of ``. +fn find_ws_seq(s: &str, parts: &[&str]) -> Option<(usize, usize)> { + let bytes = s.as_bytes(); + let mut search_from = 0; + 'outer: loop { + let start = s[search_from..].find(parts[0])? + search_from; + let mut pos = start + parts[0].len(); + for &part in &parts[1..] { + while pos < bytes.len() && bytes[pos].is_ascii_whitespace() { + pos += 1; } - result.push_str(&tag); - } else { - result.push(ch); + if !s[pos..].starts_with(part) { + search_from = start + 1; + continue 'outer; + } + pos += part.len(); } + return Some((start, pos)); } - result } +/// Parse a Qwen-style XML tag: `body`. +/// Tolerates whitespace inside tag delimiters (streaming artifact). +/// Body content is returned verbatim except for a single leading/trailing +/// newline (XML formatting convention). fn parse_qwen_tag<'a>(s: &'a str, tag: &str) -> Option<(&'a str, &'a str, &'a str)> { - let open = format!("<{}=", tag); + // Open tag: tolerate whitespace from streaming tokenization + let (_, after_eq) = find_ws_seq(s, &["<", tag, "="])?; + let gt_offset = s[after_eq..].find('>')?; + let name = s[after_eq..after_eq + gt_offset].trim(); + let body_start = after_eq + gt_offset + 1; + + // Close tag: exact match — model doesn't insert whitespace in close tags let close = format!("", tag); + let close_offset = s[body_start..].find(&close)?; + let body = &s[body_start..body_start + close_offset]; + // Strip the single leading/trailing newline from XML formatting, + // but preserve all other whitespace (indentation matters for code). + let body = body.strip_prefix('\n').unwrap_or(body); + let body = body.strip_suffix('\n').unwrap_or(body); + let rest = &s[body_start + close_offset + close.len()..]; - let start = s.find(&open)? + open.len(); - let name_end = start + s[start..].find('>')?; - let body_start = name_end + 1; - let body_end = body_start + s[body_start..].find(&close)?; - - Some(( - s[start..name_end].trim(), - s[body_start..body_end].trim(), - &s[body_end + close.len()..], - )) + Some((name, body, rest)) } fn parse_tool_call_body(body: &str) -> Option<(String, String)> { - let normalized = normalize_xml_tags(body); - let body = normalized.trim(); + let body = body.trim(); parse_xml_tool_call(body) .or_else(|| parse_json_tool_call(body)) } @@ -494,6 +519,38 @@ fn parse_json_tool_call(body: &str) -> Option<(String, String)> { Some((name.to_string(), serde_json::to_string(arguments).unwrap_or_default())) } +/// Search `buf` for `close_tag`. If found, append everything before it to +/// `accum`, advance `buf` past the tag, and return the accumulated content. +/// If not found, drain the safe prefix (preserving any partial tag match at +/// the end of buf) into `accum`. +fn scan_close_tag(buf: &mut String, close_tag: &str, accum: &mut String) -> Option { + if let Some(pos) = buf.find(close_tag) { + accum.push_str(&buf[..pos]); + *buf = buf[pos + close_tag.len()..].to_string(); + Some(std::mem::take(accum)) + } else { + let drained = drain_safe(buf, close_tag.len()); + if !drained.is_empty() { + accum.push_str(&drained); + } + None + } +} + +/// Remove everything from `buf` except the last `tag_len` bytes, which might +/// be a partial tag. Returns the removed prefix. +fn drain_safe(buf: &mut String, tag_len: usize) -> String { + let safe = buf.len().saturating_sub(tag_len); + if safe > 0 { + let safe = buf.floor_char_boundary(safe); + let drained = buf[..safe].to_string(); + *buf = buf[safe..].to_string(); + drained + } else { + String::new() + } +} + impl ResponseParser { pub fn new(branch_idx: usize) -> Self { Self { @@ -529,10 +586,11 @@ impl ResponseParser { let mut full_text = String::new(); while let Some(event) = stream.recv().await { match event { - super::api::StreamToken::Token { text, id } => { + super::api::StreamToken::Token(id) => { + let text = super::tokenizer::decode(&[id]); full_text.push_str(&text); let mut ctx = agent.context.lock().await; - let calls = parser.feed_token(&text, id, &mut ctx); + let calls = parser.feed_token(&text, &mut ctx); if !calls.is_empty() { if let Some(ref mut f) = log_file { use std::io::Write; @@ -549,19 +607,18 @@ impl ResponseParser { super::api::StreamToken::Done { usage } => { if let Some(ref mut f) = log_file { use std::io::Write; - let tc_count = full_text.matches("").count(); - let ctx_tokens = agent.context.lock().await.tokens(); - let _ = writeln!(f, "done: {} chars, {} tags, ctx: {} tokens", - full_text.len(), tc_count, ctx_tokens); - if tc_count == 0 && full_text.len() > 0 { + let ctx = agent.context.lock().await; + let children = ctx.conversation().get(parser.branch_idx) + .map(|n| n.children()).unwrap_or(&[]); + let n_think = children.iter().filter(|c| matches!(c.leaf().map(|l| l.body()), Some(NodeBody::Thinking(_)))).count(); + let n_content = children.iter().filter(|c| matches!(c.leaf().map(|l| l.body()), Some(NodeBody::Content(_)))).count(); + let n_tool = children.iter().filter(|c| matches!(c.leaf().map(|l| l.body()), Some(NodeBody::ToolCall { .. }))).count(); + let _ = writeln!(f, "done: {} chars, {} content + {} think + {} tool_call, ctx: {} tokens", + full_text.len(), n_content, n_think, n_tool, ctx.tokens()); + drop(ctx); + if full_text.len() > 0 && n_content == 0 && n_tool == 0 { let end = full_text.floor_char_boundary(full_text.len().min(2000)); - let _ = writeln!(f, "full text:\n{}", &full_text[..end]); - } - for (i, part) in full_text.split("").enumerate() { - if i > 0 { - let end = part.floor_char_boundary(part.len().min(200)); - let _ = writeln!(f, "tool_call body: {}...", &part[..end]); - } + let _ = writeln!(f, " unparsed text: {}", &full_text[..end]); } } if let Some(u) = usage { @@ -581,97 +638,72 @@ impl ResponseParser { (rx, handle) } - pub fn feed_token(&mut self, text: &str, _token_id: u32, ctx: &mut ContextState) -> Vec { + pub fn feed_token(&mut self, text: &str, ctx: &mut ContextState) -> Vec { + const THINK_OPEN: &str = ""; + const THINK_CLOSE: &str = ""; + const TOOL_CALL_OPEN: &str = ""; + const TOOL_CALL_CLOSE: &str = ""; + const OPEN_TAGS: &[&str] = &[THINK_OPEN, TOOL_CALL_OPEN]; + let mut pending = Vec::new(); self.buf.push_str(text); loop { if self.in_think { - match self.buf.find("") { - Some(end) => { - self.think_buf.push_str(&self.buf[..end]); - self.buf = self.buf[end + 8..].to_string(); - self.in_think = false; - let text = std::mem::take(&mut self.think_buf).trim().to_string(); - if !text.is_empty() { - self.push_child(ctx, AstNode::thinking(text)); - } - continue; - } - None => { - let safe = self.buf.len().saturating_sub(8); - if safe > 0 { - let safe = self.buf.floor_char_boundary(safe); - self.think_buf.push_str(&self.buf[..safe]); - self.buf = self.buf[safe..].to_string(); - } - break; + if let Some(content) = scan_close_tag(&mut self.buf, THINK_CLOSE, &mut self.think_buf) { + self.in_think = false; + let text = content.trim().to_string(); + if !text.is_empty() { + self.push_child(ctx, AstNode::thinking(text)); } + continue; } + break; } if self.in_tool_call { - match self.buf.find("") { - Some(end) => { - self.tool_call_buf.push_str(&self.buf[..end]); - self.buf = self.buf[end + 12..].to_string(); - self.in_tool_call = false; - if let Some((name, args)) = parse_tool_call_body(&self.tool_call_buf) { - self.flush_content(ctx); - self.push_child(ctx, AstNode::tool_call(&name, &args)); - self.call_counter += 1; - pending.push(PendingToolCall { - name, - arguments: args, - id: format!("call_{}", self.call_counter), - }); - } - self.tool_call_buf.clear(); - continue; - } - None => { - let safe = self.buf.len().saturating_sub(12); - if safe > 0 { - let safe = self.buf.floor_char_boundary(safe); - self.tool_call_buf.push_str(&self.buf[..safe]); - self.buf = self.buf[safe..].to_string(); - } - break; + if let Some(content) = scan_close_tag(&mut self.buf, TOOL_CALL_CLOSE, &mut self.tool_call_buf) { + self.in_tool_call = false; + if let Some((name, args)) = parse_tool_call_body(&content) { + self.flush_content(ctx); + self.push_child(ctx, AstNode::tool_call(&name, &args)); + self.call_counter += 1; + pending.push(PendingToolCall { + name, + arguments: args, + id: format!("call_{}", self.call_counter), + }); } + continue; } + break; } - let think_pos = self.buf.find(""); - let tool_pos = self.buf.find(""); - let next_tag = match (think_pos, tool_pos) { - (Some(a), Some(b)) => Some(a.min(b)), - (Some(a), None) => Some(a), - (None, Some(b)) => Some(b), - (None, None) => None, - }; + // Not inside a tag — find the earliest opening tag + let next = OPEN_TAGS.iter() + .filter_map(|tag| self.buf.find(tag).map(|pos| (pos, *tag))) + .min_by_key(|(pos, _)| *pos); - match next_tag { - Some(pos) => { + match next { + Some((pos, tag)) => { if pos > 0 { self.content_parts.push(self.buf[..pos].to_string()); } - if self.buf[pos..].starts_with("") { - self.buf = self.buf[pos + 7..].to_string(); - self.flush_content(ctx); - self.in_think = true; - } else { - self.buf = self.buf[pos + 11..].to_string(); - self.flush_content(ctx); - self.in_tool_call = true; + self.buf = self.buf[pos + tag.len()..].to_string(); + self.flush_content(ctx); + match tag { + THINK_OPEN => self.in_think = true, + TOOL_CALL_OPEN => self.in_tool_call = true, + _ => unreachable!(), } continue; } None => { - let safe = self.buf.len().saturating_sub(11); - if safe > 0 { - let safe = self.buf.floor_char_boundary(safe); - self.content_parts.push(self.buf[..safe].to_string()); - self.buf = self.buf[safe..].to_string(); + // Keep a tail that might be a partial opening tag + let max_tag = OPEN_TAGS.iter().map(|t| t.len()).max().unwrap(); + let drained = drain_safe(&mut self.buf, max_tag); + if !drained.is_empty() { + self.content_parts.push(drained); } break; } @@ -993,7 +1025,9 @@ mod tests { #[test] fn test_tool_call_xml_parse_streamed_whitespace() { - let body = "<\nfunction\n=\nbash\n>\n<\nparameter\n=\ncommand\n>pwd\n"; + // Streaming tokenization can insert whitespace in opening tags, + // but close tags are always emitted verbatim. + let body = "<\nfunction\n=\nbash\n>\n<\nparameter\n=\ncommand\n>pwd\n"; let (name, args) = parse_tool_call_body(body).unwrap(); assert_eq!(name, "bash"); let args: serde_json::Value = serde_json::from_str(&args).unwrap(); @@ -1010,15 +1044,12 @@ mod tests { } #[test] - fn test_normalize_preserves_content() { - let text = "\necho hello world\n"; - let normalized = normalize_xml_tags(text); - assert_eq!(normalized, text); - } - - #[test] - fn test_normalize_strips_tag_internal_whitespace() { - assert_eq!(normalize_xml_tags("<\nfunction\n=\nbash\n>"), ""); + fn test_tool_call_preserves_code_with_angle_brackets() { + let body = "\nif x < y {\n std::mem::swap(&mut a, &mut b);\n}\n"; + let (name, args) = parse_tool_call_body(body).unwrap(); + assert_eq!(name, "edit"); + let args: serde_json::Value = serde_json::from_str(&args).unwrap(); + assert_eq!(args["code"], "if x < y {\n std::mem::swap(&mut a, &mut b);\n}"); } // -- ResponseParser tests ------------------------------------------------- @@ -1032,7 +1063,7 @@ mod tests { let mut calls = Vec::new(); for chunk in chunks { // Feed each chunk as a single token (id=0 for tests) - calls.extend(p.feed_token(chunk, 0, &mut ctx)); + calls.extend(p.feed_token(chunk, &mut ctx)); } p.finish(&mut ctx); (ctx, calls) @@ -1094,7 +1125,7 @@ mod tests { ctx.push_no_log(Section::Conversation, AstNode::branch(Role::Assistant, vec![])); let mut p = ResponseParser::new(0); for ch in text.chars() { - p.feed_token(&ch.to_string(), 0, &mut ctx); + p.feed_token(&ch.to_string(), &mut ctx); } p.finish(&mut ctx); let b = bodies(assistant_children(&ctx)); @@ -1111,7 +1142,7 @@ mod tests { let mut p = ResponseParser::new(0); let mut tool_calls = 0; for ch in text.chars() { - tool_calls += p.feed_token(&ch.to_string(), 0, &mut ctx).len(); + tool_calls += p.feed_token(&ch.to_string(), &mut ctx).len(); } p.finish(&mut ctx); assert_eq!(tool_calls, 1); diff --git a/src/agent/mod.rs b/src/agent/mod.rs index e79a71b..695569f 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -138,8 +138,17 @@ pub struct Agent { } /// Mutable agent state — behind its own mutex. +/// Which external MCP tools an agent can access. +#[derive(Clone)] +pub enum McpToolAccess { + None, + All, + Some(Vec), +} + pub struct AgentState { pub tools: Vec, + pub mcp_tools: McpToolAccess, pub last_prompt_tokens: u32, pub reasoning_effort: String, pub temperature: f32, @@ -174,8 +183,7 @@ impl Agent { context.conversation_log = conversation_log; context.push_no_log(Section::System, AstNode::system_msg(&system_prompt)); - let tool_defs: Vec = tools::tools().iter() - .map(|t| t.to_json()).collect(); + let tool_defs = tools::all_tool_definitions().await; if !tool_defs.is_empty() { let tools_text = format!( "# Tools\n\nYou have access to the following functions:\n\n\n{}\n\n\n\ @@ -202,6 +210,7 @@ impl Agent { context: tokio::sync::Mutex::new(context), state: tokio::sync::Mutex::new(AgentState { tools: tools::tools(), + mcp_tools: McpToolAccess::All, last_prompt_tokens: 0, reasoning_effort: "none".to_string(), temperature: 0.6, @@ -237,6 +246,7 @@ impl Agent { context: tokio::sync::Mutex::new(ctx), state: tokio::sync::Mutex::new(AgentState { tools, + mcp_tools: McpToolAccess::None, last_prompt_tokens: 0, reasoning_effort: "none".to_string(), temperature: st.temperature, @@ -408,7 +418,9 @@ impl Agent { } } Agent::apply_tool_results(&agent, results, &mut ds).await; - continue; + if !agent.state.lock().await.pending_yield { + continue; + } } // Text-only response — extract text and return @@ -457,6 +469,7 @@ impl Agent { ) { let mut nodes = Vec::new(); for (call, output) in &results { + if call.name == "yield_to_user" { continue; } ds.had_tool_calls = true; if output.starts_with("Error:") { ds.tool_errors += 1; } nodes.push(Self::make_tool_result_node(call, output)); @@ -558,28 +571,46 @@ impl Agent { } pub async fn restore_from_log(&self) -> bool { - let nodes = { + let tail = { let ctx = self.context.lock().await; match &ctx.conversation_log { - Some(log) => match log.read_nodes(64 * 1024 * 1024) { - Ok(nodes) if !nodes.is_empty() => nodes, - _ => return false, + Some(log) => match log.read_tail() { + Ok(t) => t, + Err(_) => return false, }, None => return false, } }; + let budget = context::context_budget_tokens(); + let fixed = { + let ctx = self.context.lock().await; + ctx.system().iter().chain(ctx.identity().iter()) + .map(|n| n.tokens()).sum::() + }; + let conv_budget = budget.saturating_sub(fixed); + + // Walk backwards (newest first), retokenize, stop at budget + let mut kept = Vec::new(); + let mut total = 0; + for node in tail.iter() { + let node = node.retokenize(); + let tok = node.tokens(); + if total + tok > conv_budget && !kept.is_empty() { break; } + total += tok; + kept.push(node); + } + kept.reverse(); + { let mut ctx = self.context.lock().await; ctx.clear(Section::Conversation); - // Push without logging — these are already in the log - for node in nodes { + for node in kept { ctx.push_no_log(Section::Conversation, node); } } self.compact().await; - let mut st = self.state.lock().await; - st.last_prompt_tokens = self.context.lock().await.tokens() as u32; + self.state.lock().await.last_prompt_tokens = self.context.lock().await.tokens() as u32; true } diff --git a/src/agent/tools/control.rs b/src/agent/tools/control.rs index 3dff813..6090f43 100644 --- a/src/agent/tools/control.rs +++ b/src/agent/tools/control.rs @@ -33,14 +33,12 @@ pub(super) fn tools() -> [super::Tool; 3] { })) }, Tool { name: "yield_to_user", description: "Wait for user input before continuing. The only way to enter a waiting state.", - parameters_json: r#"{"type":"object","properties":{"message":{"type":"string","description":"Optional status message"}}}"#, - handler: Arc::new(|agent, v| Box::pin(async move { - let msg = v.get("message").and_then(|v| v.as_str()).unwrap_or("Waiting for input."); + parameters_json: r#"{"type":"object","properties":{}}"#, + handler: Arc::new(|agent, _| Box::pin(async move { if let Some(agent) = agent { - let mut a = agent.state.lock().await; - a.pending_yield = true; + agent.state.lock().await.pending_yield = true; } - Ok(format!("Yielding. {}", msg)) + Ok(String::new()) })) }, ] } diff --git a/src/agent/tools/lsp.rs b/src/agent/tools/lsp.rs new file mode 100644 index 0000000..0111a46 --- /dev/null +++ b/src/agent/tools/lsp.rs @@ -0,0 +1,419 @@ +// tools/lsp.rs — LSP client for code intelligence +// +// Spawns language servers on demand when a file is first queried. +// Finds the project root (git/cargo/etc.) automatically. Maintains +// persistent connections — the server indexes once, queries are cheap. + +use anyhow::{Context, Result}; +use serde_json::json; +use std::collections::HashSet; +use std::path::Path; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; +use tokio::process::{Child, ChildStdin, ChildStdout, Command}; + +struct LspServer { + root_path: String, + stdin: BufWriter, + stdout: BufReader, + _child: Child, + next_id: i64, + opened_files: HashSet, + last_access: u64, +} + +impl LspServer { + async fn request(&mut self, method: &str, params: serde_json::Value) -> Result { + self.next_id += 1; + let id = self.next_id; + let msg = json!({ "jsonrpc": "2.0", "id": id, "method": method, "params": params }); + self.send_message(&msg).await?; + self.read_response(id).await + } + + async fn notify(&mut self, method: &str, params: serde_json::Value) -> Result<()> { + let msg = json!({ "jsonrpc": "2.0", "method": method, "params": params }); + self.send_message(&msg).await + } + + async fn send_message(&mut self, msg: &serde_json::Value) -> Result<()> { + let body = serde_json::to_string(msg)?; + let header = format!("Content-Length: {}\r\n\r\n", body.len()); + self.stdin.write_all(header.as_bytes()).await?; + self.stdin.write_all(body.as_bytes()).await?; + self.stdin.flush().await?; + Ok(()) + } + + async fn read_response(&mut self, expected_id: i64) -> Result { + loop { + let mut content_length: usize = 0; + loop { + let mut line = String::new(); + self.stdout.read_line(&mut line).await?; + let line = line.trim(); + if line.is_empty() { break; } + if let Some(len) = line.strip_prefix("Content-Length: ") { + content_length = len.parse()?; + } + } + if content_length == 0 { + anyhow::bail!("LSP: no Content-Length header"); + } + + let mut body = vec![0u8; content_length]; + self.stdout.read_exact(&mut body).await?; + let msg: serde_json::Value = serde_json::from_slice(&body)?; + + if let Some(id) = msg.get("id").and_then(|v| v.as_i64()) { + if id == expected_id { + if let Some(err) = msg.get("error") { + anyhow::bail!("LSP error: {}", err); + } + return Ok(msg.get("result").cloned().unwrap_or(serde_json::Value::Null)); + } + } + } + } + + async fn ensure_open(&mut self, path: &str) -> Result { + let uri = format!("file://{}", path); + if !self.opened_files.contains(&uri) { + let text = std::fs::read_to_string(path) + .with_context(|| format!("reading {}", path))?; + self.notify("textDocument/didOpen", json!({ + "textDocument": { + "uri": uri, + "languageId": detect_language(path), + "version": 1, + "text": text, + } + })).await?; + self.opened_files.insert(uri.clone()); + } + Ok(uri) + } +} + +fn detect_language(path: &str) -> &'static str { + match Path::new(path).extension().and_then(|e| e.to_str()) { + Some("rs") => "rust", + Some("c" | "h") => "c", + Some("cpp" | "cc" | "cxx" | "hpp") => "cpp", + Some("py") => "python", + Some("js") => "javascript", + Some("ts") => "typescript", + Some("go") => "go", + Some("java") => "java", + _ => "plaintext", + } +} + +fn find_project_root(file_path: &str) -> Option { + let mut dir = Path::new(file_path).parent()?; + loop { + for marker in &[".git", "Cargo.toml", "package.json", "go.mod", "pyproject.toml", "Makefile"] { + if dir.join(marker).exists() { + return Some(dir.to_string_lossy().to_string()); + } + } + dir = dir.parent()?; + } +} + +const IDLE_TIMEOUT_SECS: u64 = 600; + +use std::sync::OnceLock; +use tokio::sync::Mutex as TokioMutex; + +struct Registry { + configs: Vec, + servers: Vec, +} + +static REGISTRY: OnceLock> = OnceLock::new(); + +fn registry() -> &'static TokioMutex { + REGISTRY.get_or_init(|| { + let configs = crate::config::get().lsp_servers.clone(); + TokioMutex::new(Registry { configs, servers: Vec::new() }) + }) +} + +fn now() -> u64 { + std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() +} + +impl LspServer { + async fn spawn(command: &str, args: &[String], root_path: &str) -> Result { + let mut child = Command::new(command) + .args(args) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::null()) + .spawn() + .with_context(|| format!("spawning LSP: {} {}", command, args.join(" ")))?; + + let mut server = LspServer { + root_path: root_path.to_string(), + stdin: BufWriter::new(child.stdin.take().unwrap()), + stdout: BufReader::new(child.stdout.take().unwrap()), + _child: child, + next_id: 0, + opened_files: HashSet::new(), + last_access: now(), + }; + + server.request("initialize", json!({ + "processId": std::process::id(), + "rootUri": format!("file://{}", root_path), + "capabilities": { + "textDocument": { + "definition": { "dynamicRegistration": false }, + "references": { "dynamicRegistration": false }, + "hover": { "dynamicRegistration": false }, + "documentSymbol": { "dynamicRegistration": false }, + "callHierarchy": { "dynamicRegistration": false }, + } + }, + })).await.with_context(|| format!("initializing LSP for {}", root_path))?; + + server.notify("initialized", json!({})).await?; + dbglog!("[lsp] server started for {}", root_path); + Ok(server) + } +} + +impl Registry { + fn reap_idle(&mut self) { + let n = now(); + self.servers.retain(|s| n.saturating_sub(s.last_access) < IDLE_TIMEOUT_SECS); + } + + fn find_config(&self, lang: &str) -> Option<&crate::config::LspServerConfig> { + self.configs.iter().find(|c| { + if c.languages.is_empty() { + // Auto: rust-analyzer for rust, etc. + c.command.contains(lang) || c.name == lang + } else { + c.languages.iter().any(|l| l == lang) + } + }) + } + + async fn ensure_server(&mut self, file_path: &str) -> Result { + let root = find_project_root(file_path) + .ok_or_else(|| anyhow::anyhow!("no project root found for {}", file_path))?; + let lang = detect_language(file_path); + + self.reap_idle(); + + if let Some(idx) = self.servers.iter().position(|s| s.root_path == root) { + self.servers[idx].last_access = now(); + return Ok(idx); + } + + let config = self.find_config(lang) + .ok_or_else(|| anyhow::anyhow!("no LSP server configured for {}", lang))? + .clone(); + let server = LspServer::spawn(&config.command, &config.args, &root).await?; + self.servers.push(server); + Ok(self.servers.len() - 1) + } + + async fn conn_for(&mut self, path: &str) -> Result<(&mut LspServer, String)> { + let idx = self.ensure_server(path).await?; + let server = &mut self.servers[idx]; + let uri = server.ensure_open(path).await?; + Ok((server, uri)) + } +} + +// -- Operation table ---------------------------------------------------------- + +use std::sync::Arc; + +struct LspOp { + tool_name: &'static str, + description: &'static str, + method: &'static str, + needs_position: bool, + extra_params: fn() -> serde_json::Value, + format: fn(&serde_json::Value) -> String, + // Two-step RPCs (e.g. incoming_calls) use a second method on the first result + followup: Option<&'static str>, +} + +fn no_extra() -> serde_json::Value { json!({}) } +fn ref_extra() -> serde_json::Value { json!({"context": {"includeDeclaration": true}}) } + +fn fmt_locations(result: &serde_json::Value) -> String { + let locations = if result.is_array() { + result.as_array().unwrap().clone() + } else if result.is_object() { + vec![result.clone()] + } else { + return "No results.".into(); + }; + let mut out = String::new(); + for loc in &locations { + let uri = loc["uri"].as_str().or_else(|| loc["targetUri"].as_str()).unwrap_or(""); + let range = if loc.get("range").is_some() { &loc["range"] } else { &loc["targetRange"] }; + let line = range["start"]["line"].as_u64().unwrap_or(0) + 1; + let file = uri.strip_prefix("file://").unwrap_or(uri); + out.push_str(&format!("{}:{}\n", file, line)); + } + if out.is_empty() { "No results.".into() } else { out } +} + +fn fmt_hover(result: &serde_json::Value) -> String { + if result.is_null() { return "No hover information.".into(); } + let contents = &result["contents"]; + if let Some(s) = contents.as_str() { return s.to_string(); } + if let Some(obj) = contents.as_object() { + return obj.get("value").and_then(|v| v.as_str()).unwrap_or("").to_string(); + } + serde_json::to_string_pretty(result).unwrap_or_default() +} + +fn fmt_symbols(result: &serde_json::Value) -> String { + if let Some(symbols) = result.as_array() { + let mut out = String::new(); + fmt_symbols_recursive(symbols, &mut out, 0); + if out.is_empty() { "No symbols found.".into() } else { out } + } else { + "No symbols found.".into() + } +} + +fn fmt_symbols_recursive(symbols: &[serde_json::Value], out: &mut String, depth: usize) { + let indent = " ".repeat(depth); + for sym in symbols { + let name = sym["name"].as_str().unwrap_or("?"); + let kind = match sym["kind"].as_u64().unwrap_or(0) { + 2 => "Module", 5 => "Class", 6 => "Method", 8 => "Field", + 10 => "Enum", 11 => "Interface", 12 => "Function", 13 => "Variable", + 14 => "Constant", 22 => "EnumMember", 23 => "Struct", 26 => "TypeParameter", + _ => "Symbol", + }; + let line = sym["range"]["start"]["line"].as_u64() + .or_else(|| sym["location"]["range"]["start"]["line"].as_u64()) + .unwrap_or(0) + 1; + out.push_str(&format!("{}{} ({}) - Line {}\n", indent, name, kind, line)); + if let Some(children) = sym.get("children").and_then(|c| c.as_array()) { + fmt_symbols_recursive(children, out, depth + 1); + } + } +} + +fn fmt_callers(result: &serde_json::Value) -> String { + if let Some(calls) = result.as_array() { + let mut out = String::new(); + for call in calls { + if let Some(from) = call.get("from") { + let name = from["name"].as_str().unwrap_or("?"); + let uri = from["uri"].as_str().unwrap_or(""); + let line = from["range"]["start"]["line"].as_u64().unwrap_or(0) + 1; + let file = uri.strip_prefix("file://").unwrap_or(uri); + out.push_str(&format!("{}:{}: {}\n", file, line, name)); + } + } + if out.is_empty() { "No incoming calls.".into() } else { out } + } else { + "No incoming calls.".into() + } +} + +static OPS: &[LspOp] = &[ + LspOp { + tool_name: "lsp_definition", + description: "Find where a symbol is defined.", + method: "textDocument/definition", + needs_position: true, + extra_params: no_extra, + format: fmt_locations, + followup: None, + }, + LspOp { + tool_name: "lsp_references", + description: "Find all references to a symbol.", + method: "textDocument/references", + needs_position: true, + extra_params: ref_extra, + format: fmt_locations, + followup: None, + }, + LspOp { + tool_name: "lsp_hover", + description: "Get type info and documentation for a symbol.", + method: "textDocument/hover", + needs_position: true, + extra_params: no_extra, + format: fmt_hover, + followup: None, + }, + LspOp { + tool_name: "lsp_symbols", + description: "List all symbols in a file.", + method: "textDocument/documentSymbol", + needs_position: false, + extra_params: no_extra, + format: fmt_symbols, + followup: None, + }, + LspOp { + tool_name: "lsp_callers", + description: "Find all functions that call the function at a position.", + method: "textDocument/prepareCallHierarchy", + needs_position: true, + extra_params: no_extra, + format: fmt_callers, + followup: Some("callHierarchy/incomingCalls"), + }, +]; + +const POS_PARAMS: &str = r#"{"type":"object","properties":{"file":{"type":"string"},"line":{"type":"integer"},"character":{"type":"integer"}},"required":["file","line","character"]}"#; +const FILE_PARAMS: &str = r#"{"type":"object","properties":{"file":{"type":"string"}},"required":["file"]}"#; + +async fn dispatch_op(op: &LspOp, v: &serde_json::Value) -> Result { + let file = v["file"].as_str().ok_or_else(|| anyhow::anyhow!("file required"))?; + + let mut reg = registry().lock().await; + let (conn, uri) = reg.conn_for(file).await?; + + let mut params = json!({ "textDocument": { "uri": uri } }); + if op.needs_position { + let line = v["line"].as_u64().ok_or_else(|| anyhow::anyhow!("line required"))? as u32 - 1; + let character = v["character"].as_u64().unwrap_or(0) as u32; + params["position"] = json!({ "line": line, "character": character }); + } + let extra = (op.extra_params)(); + if let Some(obj) = extra.as_object() { + for (k, v) in obj { params[k] = v.clone(); } + } + + let result = conn.request(op.method, params).await?; + + if let Some(followup) = op.followup { + let item = result.as_array().and_then(|a| a.first()) + .ok_or_else(|| anyhow::anyhow!("no item at this position"))?; + let result2 = conn.request(followup, json!({ "item": item })).await?; + return Ok((op.format)(&result2)); + } + + Ok((op.format)(&result)) +} + +pub(super) fn tools() -> Vec { + OPS.iter().map(|op| { + let name = op.tool_name; + super::Tool { + name: op.tool_name, + description: op.description, + parameters_json: if op.needs_position { POS_PARAMS } else { FILE_PARAMS }, + handler: Arc::new(move |_agent, v| Box::pin(async move { + let op = OPS.iter().find(|o| o.tool_name == name).unwrap(); + dispatch_op(op, &v).await + })), + } + }).collect() +} diff --git a/src/agent/tools/mcp_client.rs b/src/agent/tools/mcp_client.rs new file mode 100644 index 0000000..a7348ec --- /dev/null +++ b/src/agent/tools/mcp_client.rs @@ -0,0 +1,192 @@ +// tools/mcp_client.rs — MCP client for external tool servers +// +// Spawns external MCP servers, discovers their tools, dispatches calls. +// JSON-RPC 2.0 over stdio (newline-delimited). Global registry, lazy +// init from config. + +use anyhow::{Context, Result}; +use serde::Deserialize; +use serde_json::json; +use std::sync::OnceLock; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; +use tokio::process::{Child, ChildStdin, ChildStdout, Command}; +use tokio::sync::Mutex as TokioMutex; + +#[derive(Debug, Clone)] +pub struct McpTool { + pub name: String, + pub description: String, + pub parameters_json: String, +} + +struct McpServer { + #[allow(dead_code)] + name: String, + stdin: BufWriter, + stdout: BufReader, + _child: Child, + next_id: u64, + tools: Vec, +} + +#[derive(Debug, Deserialize)] +struct JsonRpcResponse { + id: Option, + result: Option, + error: Option, +} + +#[derive(Debug, Deserialize)] +struct JsonRpcError { + code: i64, + message: String, +} + +impl McpServer { + async fn request(&mut self, method: &str, params: Option) -> Result { + self.next_id += 1; + let id = self.next_id; + let req = json!({ "jsonrpc": "2.0", "id": id, "method": method, "params": params }); + let mut line = serde_json::to_string(&req)?; + line.push('\n'); + self.stdin.write_all(line.as_bytes()).await?; + self.stdin.flush().await?; + + let mut buf = String::new(); + loop { + buf.clear(); + let n = self.stdout.read_line(&mut buf).await?; + if n == 0 { anyhow::bail!("MCP server closed connection"); } + let trimmed = buf.trim(); + if trimmed.is_empty() { continue; } + if let Ok(resp) = serde_json::from_str::(trimmed) { + if resp.id == Some(id) { + if let Some(err) = resp.error { + anyhow::bail!("MCP error {}: {}", err.code, err.message); + } + return Ok(resp.result.unwrap_or(serde_json::Value::Null)); + } + } + } + } + + async fn notify(&mut self, method: &str) -> Result<()> { + let msg = json!({ "jsonrpc": "2.0", "method": method }); + let mut line = serde_json::to_string(&msg)?; + line.push('\n'); + self.stdin.write_all(line.as_bytes()).await?; + self.stdin.flush().await?; + Ok(()) + } + + async fn spawn(name: &str, command: &str, args: &[&str]) -> Result { + let mut child = Command::new(command) + .args(args) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::null()) + .spawn() + .with_context(|| format!("spawning MCP server: {} {}", command, args.join(" ")))?; + + let mut server = McpServer { + name: name.to_string(), + stdin: BufWriter::new(child.stdin.take().unwrap()), + stdout: BufReader::new(child.stdout.take().unwrap()), + _child: child, + next_id: 0, + tools: Vec::new(), + }; + + server.request("initialize", Some(json!({ + "protocolVersion": "2024-11-05", + "capabilities": {}, + "clientInfo": {"name": "consciousness", "version": "0.1"} + }))).await.with_context(|| format!("initializing MCP server {}", name))?; + + server.notify("notifications/initialized").await?; + + let tools_result = server.request("tools/list", None).await + .with_context(|| format!("listing tools from MCP server {}", name))?; + + if let Some(tool_list) = tools_result.get("tools").and_then(|t| t.as_array()) { + for tool in tool_list { + server.tools.push(McpTool { + name: tool["name"].as_str().unwrap_or("").to_string(), + description: tool["description"].as_str().unwrap_or("").to_string(), + parameters_json: tool.get("inputSchema") + .map(|s| serde_json::to_string(s).unwrap_or_default()) + .unwrap_or_else(|| r#"{"type":"object"}"#.to_string()), + }); + } + } + + dbglog!("[mcp] {} connected: {} tools", name, server.tools.len()); + Ok(server) + } +} + +struct Registry { + servers: Vec, +} + +static REGISTRY: OnceLock> = OnceLock::new(); + +fn registry() -> &'static TokioMutex { + REGISTRY.get_or_init(|| { + let configs = &crate::config::get().mcp_servers; + // Can't do async init in OnceLock, so servers are spawned lazily on first access + let _ = configs; // configs read but servers spawned in ensure_init() + TokioMutex::new(Registry { servers: Vec::new() }) + }) +} + +async fn ensure_init() -> Result<()> { + let mut reg = registry().lock().await; + if !reg.servers.is_empty() { return Ok(()); } + let configs = crate::config::get().mcp_servers.clone(); + for cfg in &configs { + let args: Vec<&str> = cfg.args.iter().map(|s| s.as_str()).collect(); + match McpServer::spawn(&cfg.name, &cfg.command, &args).await { + Ok(server) => reg.servers.push(server), + Err(e) => eprintln!("warning: MCP server {} failed: {:#}", cfg.name, e), + } + } + Ok(()) +} + +pub(super) async fn call_tool(name: &str, args: &serde_json::Value) -> Result { + ensure_init().await?; + let mut reg = registry().lock().await; + let server = reg.servers.iter_mut() + .find(|s| s.tools.iter().any(|t| t.name == name)) + .ok_or_else(|| anyhow::anyhow!("no MCP server has tool {}", name))?; + + let result = server.request("tools/call", Some(json!({ + "name": name, "arguments": args, + }))).await.with_context(|| format!("calling MCP tool {}", name))?; + + if let Some(content) = result.get("content").and_then(|c| c.as_array()) { + let texts: Vec<&str> = content.iter() + .filter_map(|c| c.get("text").and_then(|t| t.as_str())) + .collect(); + Ok(texts.join("\n")) + } else if let Some(text) = result.as_str() { + Ok(text.to_string()) + } else { + Ok(serde_json::to_string_pretty(&result)?) + } +} + +pub(super) async fn tool_definitions_json() -> Vec { + let _ = ensure_init().await; + let reg = registry().lock().await; + reg.servers.iter() + .flat_map(|s| s.tools.iter()) + .map(|t| format!( + r#"{{"type":"function","function":{{"name":"{}","description":"{}","parameters":{}}}}}"#, + t.name, + t.description.replace('"', r#"\""#), + t.parameters_json, + )) + .collect() +} diff --git a/src/agent/tools/mod.rs b/src/agent/tools/mod.rs index bea0167..55fe311 100644 --- a/src/agent/tools/mod.rs +++ b/src/agent/tools/mod.rs @@ -6,6 +6,8 @@ // Core tools mod ast_grep; +pub mod lsp; +pub mod mcp_client; mod bash; pub mod channels; mod edit; @@ -152,7 +154,22 @@ pub async fn dispatch_with_agent( match tool { Some(t) => (t.handler)(agent, args.clone()).await .unwrap_or_else(|e| format!("Error: {}", e)), - None => format!("Error: Unknown tool: {}", name), + None => { + let allowed = match &agent { + Some(a) => match &a.state.lock().await.mcp_tools { + super::McpToolAccess::All => true, + super::McpToolAccess::Some(list) => list.iter().any(|t| t == name), + super::McpToolAccess::None => false, + }, + None => true, + }; + if allowed { + if let Ok(result) = mcp_client::call_tool(name, args).await { + return result; + } + } + format!("Error: Unknown tool: {}", name) + } } } @@ -168,9 +185,16 @@ pub fn tools() -> Vec { all.extend(memory::journal_tools()); all.extend(channels::tools()); all.extend(control::tools()); + all.extend(lsp::tools()); all } +pub async fn all_tool_definitions() -> Vec { + let mut defs: Vec = tools().iter().map(|t| t.to_json()).collect(); + defs.extend(mcp_client::tool_definitions_json().await); + defs +} + /// Memory + journal tools only — for subconscious agents. pub fn memory_and_journal_tools() -> Vec { let mut all = memory::memory_tools().to_vec(); @@ -222,10 +246,7 @@ pub fn summarize_args(tool_name: &str, args: &serde_json::Value) -> String { entry.to_string() } } - "yield_to_user" => args["message"] - .as_str() - .unwrap_or("") - .to_string(), + "yield_to_user" => String::new(), "switch_model" => args["model"] .as_str() .unwrap_or("") diff --git a/src/config.rs b/src/config.rs index 9a12f1f..98d2c23 100644 --- a/src/config.rs +++ b/src/config.rs @@ -107,6 +107,10 @@ pub struct Config { pub scoring_response_window: usize, pub api_reasoning: String, pub agent_types: Vec, + #[serde(default)] + pub mcp_servers: Vec, + #[serde(default)] + pub lsp_servers: Vec, /// Surface agent timeout in seconds. #[serde(default)] pub surface_timeout_secs: Option, @@ -164,6 +168,8 @@ impl Default for Config { surface_timeout_secs: None, surface_conversation_bytes: None, surface_hooks: vec![], + mcp_servers: vec![], + lsp_servers: vec![], } } } @@ -204,6 +210,14 @@ impl Config { } } + // Top-level config sections (not inside "memory") + if let Some(servers) = root.get("lsp_servers") { + config.lsp_servers = serde_json::from_value(servers.clone()).unwrap_or_default(); + } + if let Some(servers) = root.get("mcp_servers") { + config.mcp_servers = serde_json::from_value(servers.clone()).unwrap_or_default(); + } + Some(config) } @@ -346,6 +360,28 @@ pub struct AppConfig { pub models: HashMap, #[serde(default = "default_model_name")] pub default_model: String, + #[serde(default)] + pub mcp_servers: Vec, + #[serde(default)] + pub lsp_servers: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct McpServerConfig { + pub name: String, + pub command: String, + #[serde(default)] + pub args: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LspServerConfig { + pub name: String, + pub command: String, + #[serde(default)] + pub args: Vec, + #[serde(default)] + pub languages: Vec, // e.g. ["rust"], ["c", "cpp"]. Empty = auto-detect } #[derive(Debug, Clone, Default, Serialize, Deserialize)] @@ -436,6 +472,8 @@ impl Default for AppConfig { system_prompt_file: None, models: HashMap::new(), default_model: String::new(), + mcp_servers: Vec::new(), + lsp_servers: Vec::new(), } } } diff --git a/src/mind/log.rs b/src/mind/log.rs index 85dcedf..b69f2ca 100644 --- a/src/mind/log.rs +++ b/src/mind/log.rs @@ -1,8 +1,10 @@ use anyhow::{Context, Result}; use std::fs::{File, OpenOptions}; -use std::io::{BufRead, BufReader, Seek, SeekFrom, Write}; +use std::io::Write; use std::path::{Path, PathBuf}; use crate::agent::context::AstNode; +use crate::hippocampus::transcript::JsonlBackwardIter; +use memmap2::Mmap; pub struct ConversationLog { path: PathBuf, @@ -33,32 +35,19 @@ impl ConversationLog { Ok(()) } - pub fn read_nodes(&self, max_bytes: u64) -> Result> { + /// Read nodes from the tail of the log, newest first. + /// Caller decides when to stop (budget, count, etc). + pub fn read_tail(&self) -> Result { if !self.path.exists() { - return Ok(Vec::new()); + anyhow::bail!("log does not exist"); } let file = File::open(&self.path) .with_context(|| format!("opening log {}", self.path.display()))?; - let file_len = file.metadata()?.len(); - let mut reader = BufReader::new(file); - - if file_len > max_bytes { - reader.seek(SeekFrom::Start(file_len - max_bytes))?; - let mut discard = String::new(); - reader.read_line(&mut discard)?; + if file.metadata()?.len() == 0 { + anyhow::bail!("log is empty"); } - - let mut nodes = Vec::new(); - for line in reader.lines() { - let line = line.context("reading log tail")?; - let line = line.trim(); - if line.is_empty() { continue; } - if let Ok(node) = serde_json::from_str::(line) { - nodes.push(node); - } - // Old format entries silently skipped — journal has the context - } - Ok(nodes) + let mmap = unsafe { Mmap::map(&file)? }; + Ok(TailNodes { _file: file, mmap }) } pub fn path(&self) -> &Path { @@ -66,12 +55,13 @@ impl ConversationLog { } pub fn oldest_timestamp(&self) -> Option> { + // Read forward from the start to find first timestamp let file = File::open(&self.path).ok()?; - let reader = BufReader::new(file); - for line in reader.lines().flatten() { - let line = line.trim().to_string(); + let mmap = unsafe { Mmap::map(&file).ok()? }; + // Find first { ... } and parse + for line in mmap.split(|&b| b == b'\n') { if line.is_empty() { continue; } - if let Ok(node) = serde_json::from_str::(&line) { + if let Ok(node) = serde_json::from_slice::(line) { if let Some(leaf) = node.leaf() { if let Some(ts) = leaf.timestamp() { return Some(ts); @@ -82,3 +72,16 @@ impl ConversationLog { None } } + +/// Iterates over conversation log nodes newest-first, using mmap + backward scan. +pub struct TailNodes { + _file: File, + mmap: Mmap, +} + +impl TailNodes { + pub fn iter(&self) -> impl Iterator + '_ { + JsonlBackwardIter::new(&self.mmap) + .filter_map(|bytes| serde_json::from_slice::(bytes).ok()) + } +} diff --git a/src/mind/mod.rs b/src/mind/mod.rs index 8426ef1..2241ea2 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -494,6 +494,7 @@ impl Mind { }; let mut cmds = Vec::new(); + let mut dmn_expired = false; tokio::select! { biased; @@ -526,17 +527,15 @@ impl Mind { } cmds.push(MindCommand::Compact); + /* + * Broken since the AST context window conversion: if !self.config.no_agents { cmds.push(MindCommand::Score); } + */ } - _ = tokio::time::sleep(timeout), if !has_input => { - let tick = self.shared.lock().unwrap().dmn_tick(); - if let Some((prompt, target)) = tick { - self.start_turn(&prompt, target).await; - } - } + _ = tokio::time::sleep(timeout), if !has_input => dmn_expired = true, } if !self.config.no_agents { @@ -562,6 +561,14 @@ impl Mind { if let Some(text) = pending { self.start_turn(&text, StreamTarget::Conversation).await; } + /* + else if dmn_expired { + let tick = self.shared.lock().unwrap().dmn_tick(); + if let Some((prompt, target)) = tick { + self.start_turn(&prompt, target).await; + } + } + */ self.run_commands(cmds).await; } diff --git a/src/mind/subconscious.rs b/src/mind/subconscious.rs index a4cc7d9..a35d586 100644 --- a/src/mind/subconscious.rs +++ b/src/mind/subconscious.rs @@ -68,8 +68,8 @@ impl State { /// How long to wait before the next DMN prompt in this state. pub fn interval(&self) -> Duration { match self { - State::Engaged => Duration::from_secs(5), State::Working => Duration::from_secs(3), + State::Engaged => Duration::from_secs(5), State::Foraging => Duration::from_secs(30), State::Resting { .. } => Duration::from_secs(300), State::Paused | State::Off => Duration::from_secs(86400), // effectively never diff --git a/src/user/chat.rs b/src/user/chat.rs index d91576c..800bf2a 100644 --- a/src/user/chat.rs +++ b/src/user/chat.rs @@ -9,8 +9,9 @@ use ratatui::{ text::{Line, Span}, widgets::{Block, Borders, Paragraph, Wrap}, Frame, - crossterm::event::{KeyCode, KeyModifiers, MouseEvent, MouseEventKind, MouseButton}, }; +use ratatui::crossterm::event::{KeyCode, KeyModifiers, MouseEvent, MouseEventKind, MouseButton}; + use super::{App, ScreenView, screen_legend}; use crate::agent::context::{AstNode, NodeBody, Role, Ast}; @@ -158,6 +159,56 @@ enum ActivePane { Tools, } +/// Text selection within a pane. Anchor is where the click started, +/// cursor is where the mouse currently is. They may be in either order. +#[derive(Debug, Clone, PartialEq, Default)] +struct Selection { + anchor_line: usize, + anchor_col: usize, + cursor_line: usize, + cursor_col: usize, +} + +impl Selection { + fn new(line: usize, col: usize) -> Self { + Self { anchor_line: line, anchor_col: col, cursor_line: line, cursor_col: col } + } + + fn extend(&mut self, line: usize, col: usize) { + self.cursor_line = line; + self.cursor_col = col; + } + + /// Normalized range: (start_line, start_col, end_line, end_col) + fn range(&self) -> (usize, usize, usize, usize) { + if (self.anchor_line, self.anchor_col) <= (self.cursor_line, self.cursor_col) { + (self.anchor_line, self.anchor_col, self.cursor_line, self.cursor_col) + } else { + (self.cursor_line, self.cursor_col, self.anchor_line, self.anchor_col) + } + } + + fn text(&self, lines: &[Line<'static>]) -> String { + let (start_line, start_col, end_line, end_col) = self.range(); + let mut result = String::new(); + for (i, line) in lines.iter().enumerate() { + if i < start_line || i > end_line { continue; } + let line_text: String = line.spans.iter().map(|s| s.content.as_ref()).collect(); + let sc = if i == start_line { start_col } else { 0 }; + let ec = if i == end_line { end_col } else { line_text.len() }; + if sc < line_text.len() { + if let Some(selected) = line_text.get(sc..ec.min(line_text.len())) { + if !result.is_empty() { + result.push('\n'); + } + result.push_str(selected); + } + } + } + result + } +} + fn strip_ansi(text: &str) -> String { let mut out = String::with_capacity(text.len()); let mut chars = text.chars().peekable(); @@ -226,6 +277,7 @@ struct PaneState { pinned: bool, last_total_lines: u16, last_height: u16, + selection: Option, } impl PaneState { @@ -237,6 +289,7 @@ impl PaneState { md_buffer: String::new(), use_markdown, pending_marker: Marker::None, scroll: 0, pinned: false, last_total_lines: 0, last_height: 20, + selection: None, } } @@ -352,6 +405,56 @@ impl PaneState { } (lines, markers) } + + /// Convert mouse coordinates (relative to pane) to line/column position. + fn mouse_to_position(&self, mouse_x: u16, mouse_y: u16, pane_height: u16) -> Option<(usize, usize)> { + let (lines, _) = self.all_lines_with_markers(); + if lines.is_empty() || self.cached_width == 0 { return None; } + + // Build heights array (reuse cached where possible) + let n_committed = self.line_heights.len(); + let mut heights: Vec = self.line_heights.clone(); + for line in lines.iter().skip(n_committed) { + let h = Paragraph::new(line.clone()) + .wrap(Wrap { trim: false }) + .line_count(self.cached_width) as u16; + heights.push(h.max(1)); + } + + // Find the first visible line given current scroll + let (first, sub_scroll, _) = visible_range(&heights, self.scroll, pane_height); + + // Walk from the first visible line, offset by sub_scroll + let mut row = -(sub_scroll as i32); + for line_idx in first..lines.len() { + let h = heights.get(line_idx).copied().unwrap_or(1) as i32; + if (mouse_y as i32) < row + h { + let line_text: String = lines[line_idx].spans.iter().map(|s| s.content.as_ref()).collect(); + let col = (mouse_x as usize).min(line_text.len()); + return Some((line_idx, col)); + } + row += h; + } + Some((lines.len().saturating_sub(1), 0)) + } + + /// Set the selection start position. + fn start_selection(&mut self, line: usize, col: usize) { + self.selection = Some(Selection::new(line, col)); + } + + /// Update the selection end position. + fn extend_selection(&mut self, line: usize, col: usize) { + if let Some(ref mut sel) = self.selection { + sel.extend(line, col); + } + } + + /// Get the selected text, or None if nothing is selected. + fn get_selection(&self) -> Option { + let (lines, _) = self.all_lines_with_markers(); + self.selection.as_ref().map(|sel| sel.text(&lines)) + } } pub(crate) struct InteractScreen { @@ -610,14 +713,83 @@ impl InteractScreen { for (i, area) in self.pane_areas.iter().enumerate() { if x >= area.x && x < area.x + area.width && y >= area.y && y < area.y + area.height { self.active_pane = match i { 0 => ActivePane::Autonomous, 1 => ActivePane::Conversation, _ => ActivePane::Tools }; + let rel_x = x.saturating_sub(area.x); + let rel_y = y.saturating_sub(area.y); + self.selection_event(i, rel_x, rel_y, true); break; } } } + MouseEventKind::Drag(MouseButton::Left) => { + let (x, y) = (mouse.column, mouse.row); + let i = match self.active_pane { ActivePane::Autonomous => 0, ActivePane::Conversation => 1, ActivePane::Tools => 2 }; + let area = self.pane_areas[i]; + if x >= area.x && x < area.x + area.width && y >= area.y && y < area.y + area.height { + let rel_x = x.saturating_sub(area.x); + let rel_y = y.saturating_sub(area.y); + self.selection_event(i, rel_x, rel_y, false); + } + } + MouseEventKind::Up(MouseButton::Left) => { + self.copy_selection_to_clipboard(); + } + MouseEventKind::Down(MouseButton::Middle) => { + self.paste_from_selection(); + } _ => {} } } + /// Copy the current selection to the clipboard via OSC 52. + fn copy_selection_to_clipboard(&self) { + let text = match self.active_pane { + ActivePane::Autonomous => self.autonomous.get_selection(), + ActivePane::Conversation => self.conversation.get_selection(), + ActivePane::Tools => self.tools.get_selection(), + }; + if let Some(ref selected_text) = text { + if selected_text.is_empty() { return; } + // OSC 52 clipboard copy + use std::io::Write; + use base64::Engine; + let encoded = base64::engine::general_purpose::STANDARD.encode(selected_text); + let mut stdout = std::io::stdout().lock(); + let _ = write!(stdout, "\x1b]52;c;{}\x07", encoded); + let _ = stdout.flush(); + } + } + + /// Paste from tmux buffer via middle-click. + fn paste_from_selection(&mut self) { + let result = std::process::Command::new("tmux") + .args(["save-buffer", "-"]).output(); + if let Ok(output) = result { + if output.status.success() { + let text = String::from_utf8_lossy(&output.stdout).into_owned(); + if !text.is_empty() { + self.textarea.insert_str(&text); + } + } + } + } + + fn pane_mut(&mut self, idx: usize) -> &mut PaneState { + match idx { 0 => &mut self.autonomous, 1 => &mut self.conversation, _ => &mut self.tools } + } + + fn selection_event(&mut self, pane_idx: usize, rel_x: u16, rel_y: u16, start: bool) { + let height = self.pane_areas[pane_idx].height; + let pane = self.pane_mut(pane_idx); + if let Some((line, col)) = pane.mouse_to_position(rel_x, rel_y, height) { + if start { + pane.start_selection(line, col); + } else { + pane.extend_selection(line, col); + } + } + self.copy_selection_to_clipboard(); + } + /// Draw the main (F1) screen — four-pane layout with status bar. fn draw_main(&mut self, frame: &mut Frame, size: Rect, app: &App) { // Main layout: content area + active tools overlay + status bar @@ -825,6 +997,11 @@ impl ScreenView for InteractScreen { self.textarea = new_textarea(vec![String::new()]); } } + KeyCode::Char('c') if key.modifiers.contains(KeyModifiers::CONTROL) && key.modifiers.contains(KeyModifiers::SHIFT) => { + // Ctrl+Shift+C: copy selection + self.copy_selection_to_clipboard(); + } + // Paste: terminal handles Ctrl+Shift+V natively via bracketed paste KeyCode::Up if key.modifiers.contains(KeyModifiers::CONTROL) => self.scroll_active_up(3), KeyCode::Down if key.modifiers.contains(KeyModifiers::CONTROL) => self.scroll_active_down(3), KeyCode::Up => { @@ -862,6 +1039,9 @@ impl ScreenView for InteractScreen { } } Event::Mouse(mouse) => { self.handle_mouse(*mouse); } + Event::Paste(text) => { + self.textarea.insert_str(text); + } _ => {} } } @@ -1011,8 +1191,44 @@ fn draw_conversation_pane( // Find visible line range let (first, sub_scroll, last) = visible_range(&heights, pane.scroll, inner.height); + // Apply selection highlighting to visible lines + let mut visible_lines: Vec> = Vec::new(); + if let Some(ref sel) = pane.selection { + let (sl, sc, el, ec) = sel.range(); + for i in first..last { + let line = &lines[i]; + let line_text: String = line.spans.iter().map(|s| s.content.as_ref()).collect(); + + // Check if this line is within the selection + if i >= sl && i <= el { + let start_col = if i == sl { sc } else { 0 }; + let end_col = if i == el { ec } else { line_text.len() }; + if start_col < end_col { + let before = if start_col > 0 { &line_text[..start_col] } else { "" }; + let selected = &line_text[start_col..end_col]; + let after = if end_col < line_text.len() { &line_text[end_col..] } else { "" }; + let mut new_spans = Vec::new(); + if !before.is_empty() { + new_spans.push(Span::raw(before.to_string())); + } + new_spans.push(Span::styled(selected.to_string(), Style::default().bg(Color::DarkGray).fg(Color::White))); + if !after.is_empty() { + new_spans.push(Span::raw(after.to_string())); + } + visible_lines.push(Line::from(new_spans).style(line.style).alignment(line.alignment.unwrap_or(ratatui::layout::Alignment::Left))); + } else { + visible_lines.push(line.clone()); + } + } else { + visible_lines.push(line.clone()); + } + } + } else { + visible_lines = lines[first..last].to_vec(); + } + // Render only the visible slice — no full-content grapheme walk - let text_para = Paragraph::new(lines[first..last].to_vec()) + let text_para = Paragraph::new(visible_lines) .wrap(Wrap { trim: false }) .scroll((sub_scroll, 0)); frame.render_widget(text_para, text_area); diff --git a/src/user/mod.rs b/src/user/mod.rs index 94a507e..9ec1de6 100644 --- a/src/user/mod.rs +++ b/src/user/mod.rs @@ -19,7 +19,7 @@ use crate::user::{self as tui}; // --- TUI infrastructure (moved from tui/mod.rs) --- use ratatui::crossterm::{ - event::{EnableMouseCapture, DisableMouseCapture}, + event::{EnableMouseCapture, DisableMouseCapture, EnableBracketedPaste, DisableBracketedPaste}, terminal::{self, EnterAlternateScreen, LeaveAlternateScreen}, ExecutableCommand, }; @@ -98,6 +98,7 @@ struct ChannelStatus { struct App { status: StatusInfo, activity: String, + activity_started: Option, running_processes: u32, reasoning_effort: String, temperature: f32, @@ -125,6 +126,7 @@ impl App { turn_tools: 0, context_budget: String::new(), }, activity: String::new(), + activity_started: None, running_processes: 0, reasoning_effort: "none".to_string(), temperature: 0.6, @@ -164,12 +166,14 @@ fn init_terminal() -> io::Result> let mut stdout = io::stdout(); stdout.execute(EnterAlternateScreen)?; stdout.execute(EnableMouseCapture)?; + stdout.execute(EnableBracketedPaste)?; let backend = CrosstermBackend::new(stdout); ratatui::Terminal::new(backend) } fn restore_terminal(terminal: &mut ratatui::Terminal>) -> io::Result<()> { terminal::disable_raw_mode()?; + terminal.backend_mut().execute(DisableBracketedPaste)?; terminal.backend_mut().execute(DisableMouseCapture)?; terminal.backend_mut().execute(LeaveAlternateScreen)?; terminal.show_cursor() @@ -319,7 +323,7 @@ async fn run( let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel(); std::thread::spawn(move || { loop { - match crossterm::event::read() { + match ratatui::crossterm::event::read() { Ok(event) => { if event_tx.send(event).is_err() { break; } } Err(_) => break, }