diff --git a/channels/irc/src/main.rs b/channels/irc/src/main.rs index e81c4fe..4b20284 100644 --- a/channels/irc/src/main.rs +++ b/channels/irc/src/main.rs @@ -237,19 +237,11 @@ impl State { async fn send_privmsg(&mut self, target: &str, msg: &str) -> io::Result<()> { // Send PRIVMSG, which is used for both private and channel messages. // Splits into multiple fragments if necessary. - // - // Two constraints: - // 1. IRC max line = 512 bytes including CRLF. The server prepends - // our prefix when relaying: ":nick!~user@host PRIVMSG target :msg\r\n" - // So per-PRIVMSG message content must fit in 512 - overhead. - // 2. Embedded '\n' in the message would be interpreted by the - // server as an end-of-command marker, truncating us. Split - // on newlines first and send each line as its own PRIVMSG. - // + // IRC max line = 512 bytes including CRLF. The server prepends + // our prefix when relaying: ":nick!~user@host PRIVMSG target :msg\r\n" // User is often ~nick (nick_len + 1). Host is up to 63 bytes. - // Cloaked OFTC hosts can be longer - pad the budget. let nick_len = self.config.nick.len(); - let overhead = 1 + nick_len + 1 + (nick_len + 1) + 1 + 80 + let overhead = 1 + nick_len + 2 + nick_len + 1 + 63 + " PRIVMSG ".len() + target.len() + " :".len() + 2; let max_msg = 512_usize.saturating_sub(overhead); @@ -257,34 +249,24 @@ impl State { return Err(io::Error::new(io::ErrorKind::InvalidInput, "target too long")); } - for line in msg.split('\n') { - let mut remaining = line; - // Empty lines (blank paragraph breaks) can't be sent as empty - // PRIVMSGs - most IRC servers reject them. Skip. - if remaining.is_empty() { continue; } - loop { - let split_at = if remaining.len() <= max_msg { - remaining.len() - } else { - // Find last char boundary at or before max_msg. - let mut i = max_msg; - while i > 0 && !remaining.is_char_boundary(i) { i -= 1; } - // Prefer splitting at a word boundary - look back up to - // max_msg/4 chars for a space. With dense content (code) - // we may not find one; fall back to the char boundary. - let lookback = max_msg / 4; - let bytes = remaining.as_bytes(); - let mut j = i; - while j > 0 && (i - j) < lookback && bytes[j - 1] != b' ' { - j -= 1; - } - if j > 0 && bytes[j - 1] == b' ' { j } else { i } - }; - let (chunk, rest) = remaining.split_at(split_at); - self.send_raw(&format!("PRIVMSG {target} :{chunk}")).await?; - remaining = rest; - if remaining.is_empty() { break; } - } + // Split on UTF-8 char boundaries + let mut remaining = msg; + while !remaining.is_empty() { + let split_at = if remaining.len() <= max_msg { + remaining.len() + } else { + // Find last char boundary at or before max_msg + let mut i = max_msg; + while i > 0 && !remaining.is_char_boundary(i) { i -= 1; } + // To avoid splitting mid-word, see if there was a space recently + let mut j = i; + while j > 1 && j > i-10 && remaining.as_bytes()[j] != b' ' { j -= 1; } + if remaining.as_bytes()[j] == b' ' { j } + else if i == 0 { max_msg } else { i } + }; + let (chunk, rest) = remaining.split_at(split_at); + self.send_raw(&format!("PRIVMSG {target} :{chunk}")).await?; + remaining = rest; } Ok(()) } diff --git a/src/agent/context.rs b/src/agent/context.rs index 2009cfc..00c1ea5 100644 --- a/src/agent/context.rs +++ b/src/agent/context.rs @@ -218,11 +218,7 @@ impl NodeBody { fn render_into(&self, out: &mut String) { match self { Self::Content(text) => out.push_str(text), - Self::Thinking(text) => { - out.push_str("\n"); - out.push_str(text); - out.push_str("\n\n"); - } + Self::Thinking(_) => {}, Self::Log(_) => {}, Self::ToolCall { name, arguments } => { out.push_str("\n"); @@ -262,7 +258,7 @@ impl NodeBody { } fn is_prompt_visible(&self) -> bool { - !matches!(self, Self::Log(_)) + !matches!(self, Self::Thinking(_) | Self::Log(_)) } /// Hand-assemble token IDs for body types where running the tokenizer @@ -652,17 +648,13 @@ fn drain_safe(buf: &mut String, tag_len: usize) -> String { } impl ResponseParser { - /// @in_think: whether the model's output begins inside a block. - /// Set when the prompt was prefilled with "\n" (native thinking - /// mode) so the parser captures reasoning tokens as Thinking until the - /// model emits . - pub fn new(branch_idx: usize, in_think: bool) -> Self { + pub fn new(branch_idx: usize) -> Self { Self { branch_idx, call_counter: 0, buf: String::new(), content_parts: Vec::new(), - in_think, + in_think: false, think_buf: String::new(), in_tool_call: false, tool_call_buf: String::new(), @@ -1377,7 +1369,7 @@ mod tests { fn parse_into_ctx(chunks: &[&str]) -> (ContextState, Vec) { let mut ctx = ContextState::new(); ctx.push_no_log(Section::Conversation, AstNode::branch(Role::Assistant, vec![])); - let mut p = ResponseParser::new(0, false); + let mut p = ResponseParser::new(0); let mut calls = Vec::new(); for chunk in chunks { // Feed each chunk as a single token (id=0 for tests) @@ -1441,7 +1433,7 @@ mod tests { let text = "thoughtresponse"; let mut ctx = ContextState::new(); ctx.push_no_log(Section::Conversation, AstNode::branch(Role::Assistant, vec![])); - let mut p = ResponseParser::new(0, false); + let mut p = ResponseParser::new(0); for ch in text.chars() { p.feed_token(&ch.to_string(), &mut ctx); } @@ -1457,7 +1449,7 @@ mod tests { let text = "text\n\nls\n\nmore"; let mut ctx = ContextState::new(); ctx.push_no_log(Section::Conversation, AstNode::branch(Role::Assistant, vec![])); - let mut p = ResponseParser::new(0, false); + let mut p = ResponseParser::new(0); let mut tool_calls = 0; for ch in text.chars() { tool_calls += p.feed_token(&ch.to_string(), &mut ctx).len(); @@ -1505,10 +1497,8 @@ mod tests { AstNode::thinking("hmm"), AstNode::content("answer"), ]); - // Thinking renders wrapped in ... so the model sees - // previous turns' reasoning (Qwen 3.6 style: CoT stays in the - // conversation across turns). - assert_eq!(node.render(), "<|im_start|>assistant\n\nhmm\n\nanswer<|im_end|>\n"); + // Thinking renders as empty, content renders as-is + assert_eq!(node.render(), "<|im_start|>assistant\nanswer<|im_end|>\n"); } #[test] @@ -1587,19 +1577,10 @@ mod tests { fn test_tokenize_invisible_nodes_are_zero() { if !init_tokenizer() { return; } + assert_eq!(AstNode::thinking("deep thoughts").tokens(), 0); assert_eq!(AstNode::log("debug info").tokens(), 0); } - #[test] - fn test_tokenize_thinking_matches_rendered_tags() { - if !init_tokenizer() { return; } - - // Thinking is now prompt-visible (wrapped in ...); - // token count must match the rendered wrapping. - let node = AstNode::thinking("deep thoughts"); - assert_eq!(node.tokens(), tokenizer::encode(&node.render()).len()); - } - #[test] fn test_tokenize_decode_roundtrip() { if !init_tokenizer() { return; } diff --git a/src/thalamus/supervisor.rs b/src/thalamus/supervisor.rs index 3716682..a4c53ec 100644 --- a/src/thalamus/supervisor.rs +++ b/src/thalamus/supervisor.rs @@ -19,51 +19,6 @@ fn channels_dir() -> PathBuf { .join(".consciousness/channels") } -/// Install a SIGCHLD-driven reaper for channel daemons. -/// -/// We can't use SIGCHLD=SIG_IGN because that makes the kernel auto-reap -/// all children, and tokio::process::Command::wait() then returns ECHILD -/// (breaking every tool that spawns a subprocess — bash, mcp clients, etc.). -/// -/// Instead, on each SIGCHLD we read PID files in channels_dir() and call -/// waitpid(pid, WNOHANG) on each. That reaps only our own zombie channel -/// daemons; waitpid on any other PID returns ECHILD (harmless no-op). -/// Tokio-spawned children aren't recorded in PID files, so tokio's own -/// per-child wait paths are left free to reap them. -pub fn start_zombie_reaper() -> tokio::task::JoinHandle<()> { - use tokio::signal::unix::{signal, SignalKind}; - tokio::spawn(async move { - let mut sig = match signal(SignalKind::child()) { - Ok(s) => s, - Err(e) => { - error!("failed to install SIGCHLD handler: {}", e); - return; - } - }; - while sig.recv().await.is_some() { - reap_channel_daemons(); - } - }) -} - -fn reap_channel_daemons() { - let entries = match std::fs::read_dir(channels_dir()) { - Ok(e) => e, - Err(_) => return, - }; - for entry in entries.flatten() { - let path = entry.path(); - if path.extension().and_then(|s| s.to_str()) != Some("pid") { - continue; - } - let Ok(s) = std::fs::read_to_string(&path) else { continue }; - let Ok(pid) = s.trim().parse::() else { continue }; - let mut status = 0; - // Reaps our zombie child; ECHILD on non-child is a no-op. - unsafe { libc::waitpid(pid, &mut status, libc::WNOHANG); } - } -} - fn config_path() -> PathBuf { channels_dir().join("channels.json5") } diff --git a/src/user/chat.rs b/src/user/chat.rs index bd2df25..fe3db5b 100644 --- a/src/user/chat.rs +++ b/src/user/chat.rs @@ -167,7 +167,6 @@ enum PaneTarget { ConversationAssistant, Tools, ToolResult, - Autonomous, } const MAX_PANE_LINES: usize = 10_000; @@ -473,11 +472,8 @@ impl InteractScreen { AstNode::Leaf(leaf) => { let text = leaf.body().text().to_string(); match leaf.body() { - NodeBody::Memory { .. } | NodeBody::Log(_) | NodeBody::Dmn(_) => vec![], - NodeBody::Thinking(_) => { - if text.is_empty() { vec![] } - else { vec![(PaneTarget::Autonomous, text, Marker::None)] } - } + NodeBody::Memory { .. } | NodeBody::Thinking(_) + | NodeBody::Log(_) | NodeBody::Dmn(_) => vec![], NodeBody::Content(_) => { if text.is_empty() || text.starts_with("") { vec![] } else { vec![(PaneTarget::Conversation, text, Marker::User)] } @@ -551,12 +547,6 @@ impl InteractScreen { self.tools.push_line(format!(" {}", line), Color::DarkGray); } } - PaneTarget::Autonomous => { - self.autonomous.current_color = Color::Gray; - self.autonomous.append_text(&text); - self.autonomous.pending_marker = marker; - self.autonomous.flush_pending(); - } } } } @@ -568,8 +558,6 @@ impl InteractScreen { => self.conversation.pop_line(), PaneTarget::Tools | PaneTarget::ToolResult => self.tools.pop_line(), - PaneTarget::Autonomous - => self.autonomous.pop_line(), } } } diff --git a/src/user/mod.rs b/src/user/mod.rs index 04e895b..fc3a4ac 100644 --- a/src/user/mod.rs +++ b/src/user/mod.rs @@ -756,10 +756,8 @@ fn restore_stderr(original_fd: std::os::fd::RawFd) { #[tokio::main] pub async fn main() { - // Reap channel-daemon zombies via a SIGCHLD handler that only touches - // PIDs listed in channels_dir(). Avoids SIGCHLD=SIG_IGN, which would - // break tokio::process::Command::wait() (kernel auto-reap → ECHILD). - let _reaper = crate::thalamus::supervisor::start_zombie_reaper(); + // Auto-reap child processes (channel daemons outlive the supervisor) + unsafe { libc::signal(libc::SIGCHLD, libc::SIG_IGN); } // Redirect stderr to pipe — logs to file and sends to channel for UI display let stderr_capture = redirect_stderr_to_pipe(); diff --git a/training/amygdala_training/train_direct.py b/training/amygdala_training/train_direct.py index 2ad2a30..8749e37 100644 --- a/training/amygdala_training/train_direct.py +++ b/training/amygdala_training/train_direct.py @@ -35,11 +35,12 @@ from steering_vectors.aggregators import pca_aggregator def _load_descriptions(direct_dir: Path) -> dict[str, list[str]]: """Each file in direct_dir is `{concept}.txt`. Descriptions are - separated by blank lines within the file. Files starting with `_` - are not concepts but are included in negative pools (e.g. _baseline.txt).""" + separated by blank lines within the file.""" out: dict[str, list[str]] = {} for f in sorted(direct_dir.glob("*.txt")): - concept = f.stem # underscore-prefixed names keep their prefix + if f.name.startswith("_"): + continue + concept = f.stem text = f.read_text() descs = [d.strip() for d in text.split("\n\n") if d.strip()] out[concept] = descs @@ -68,19 +69,11 @@ def main() -> None: target_layers = [int(x) for x in args.target_layers.split(",")] dtype = {"bf16": torch.bfloat16, "fp16": torch.float16, "fp32": torch.float32}[args.dtype] - all_descriptions = _load_descriptions(Path(args.direct_dir)) - # Files starting with `_` are neg-pool helpers (e.g. _baseline.txt), not concepts. - concepts = sorted(k for k in all_descriptions if not k.startswith("_")) - neg_pool_extra: list[str] = [] - for k, ds in all_descriptions.items(): - if k.startswith("_"): - neg_pool_extra.extend(ds) - descriptions = {k: all_descriptions[k] for k in concepts} + descriptions = _load_descriptions(Path(args.direct_dir)) + concepts = sorted(descriptions.keys()) print(f"Loaded {len(concepts)} concepts with direct descriptions:") for c in concepts: print(f" {c}: {len(descriptions[c])} descriptions") - if neg_pool_extra: - print(f"Plus {len(neg_pool_extra)} neutral/baseline descriptions added to every concept's negative pool") print(f"\nLoading {args.model} ({args.dtype}) on {args.device}...") tokenizer = AutoTokenizer.from_pretrained(args.model) @@ -124,10 +117,6 @@ def main() -> None: for other, other_descs in descriptions.items(): if other != concept: neg_pool.extend(other_descs) - # Underscore-prefixed files (e.g. _baseline.txt) contribute to - # every concept's negative pool, independent of the other- - # concept negatives. - neg_pool.extend(neg_pool_extra) rng = random.Random(hash(concept) & 0xFFFFFFFF) samples: list[SteeringVectorTrainingSample] = []