Compare commits

...

5 commits

Author SHA1 Message Date
Kent Overstreet
0e459aae92 thalamus/supervisor: reap channel daemons via SIGCHLD instead of SIG_IGN
SIGCHLD=SIG_IGN at main() was auto-reaping all children in the kernel,
which broke tokio::process::Command::wait() — every tool that spawned a
subprocess (bash, mcp clients) was getting ECHILD because tokio couldn't
waitpid() on a child the kernel had already reaped.

Replace with a SIGCHLD signal handler task that reaps only PIDs listed in
channels_dir() (via waitpid(pid, WNOHANG) — ECHILD on non-child is a
harmless no-op). Tokio-spawned children aren't in PID files, so tokio's
own per-child wait paths are untouched.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-24 11:54:25 -04:00
Kent Overstreet
d95f3e9445 user/chat: route Thinking to a new Autonomous pane
Thinking content was silently dropped in the UI (empty Vec). Now that
Thinking is prompt-visible, surface it in a dedicated Autonomous pane
rendered in gray so it's visually distinct from conversation and
tool-call output.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-24 11:54:25 -04:00
Kent Overstreet
28d56e2a55 agent/context: make Thinking blocks prompt-visible
Thinking blocks used to render as empty strings and be excluded from
is_prompt_visible, so the model never saw its own prior CoT across
turns. For Qwen 3.6 native thinking mode, CoT is meant to stay in the
conversation — the model benefits from seeing what it reasoned about
last turn.

Render Thinking as <think>\n{text}\n</think>\n so past reasoning is
visible in subsequent prompts. Add in_think param to ResponseParser::new
so the parser starts inside a <think> block when the prompt was
prefilled with "<think>\n" (native thinking mode).

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-24 11:54:25 -04:00
Kent Overstreet
6fedc9b2a8 amygdala: underscore-prefixed files join every concept's negative pool
Files in direct/ named _*.txt (e.g. _baseline.txt) are conceptless
neutral prose — they should not appear as positive training signal,
but are useful as shared negatives across every concept.

Previously _*.txt files were silently skipped. Now:
  * they're loaded like any other description file;
  * concepts (the positive label set) filters them out;
  * their descriptions are concatenated into neg_pool_extra and
    extended onto every concept's neg_pool alongside the cross-concept
    negatives.

A concept's negative pool is thus "other concepts' descriptions +
everything from _*.txt files". The extra pool is announced at startup
so the user can see how many neutral samples are active.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-24 11:54:25 -04:00
Kent Overstreet
5908b837e8 irc: split PRIVMSG on embedded newlines + widen host overhead
Two fixes to send_privmsg, both surfaced by correspondents reporting
truncated messages:

1. Multi-line content (code blocks, formatted text) sent as a single
   PRIVMSG was being truncated at the first '\n' by the IRC server —
   newlines are end-of-command markers. Split the message on newlines
   and send each line as its own PRIVMSG; skip empty lines since most
   servers reject empty PRIVMSGs.

2. Overhead computation assumed a host field of 63 bytes. OFTC's
   cloaked hostmasks can be longer, occasionally pushing the server-
   prepended prefix past 512 bytes and causing silent truncation.
   Raise the host budget to 80 and align the formula with the actual
   ':nick!~nick@host' prefix shape.

Also extended the word-boundary lookback from a fixed 10 chars to
max_msg / 4 — dense content (code) rarely had a space within 10 chars
of the length cap, so we were falling back to the char boundary and
splitting mid-word. Checking bytes[j-1] for a space (instead of
bytes[j]) drops leading whitespace from the rest-fragment.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-24 11:54:25 -04:00
6 changed files with 148 additions and 41 deletions

View file

@ -237,11 +237,19 @@ impl State {
async fn send_privmsg(&mut self, target: &str, msg: &str) -> io::Result<()> { async fn send_privmsg(&mut self, target: &str, msg: &str) -> io::Result<()> {
// Send PRIVMSG, which is used for both private and channel messages. // Send PRIVMSG, which is used for both private and channel messages.
// Splits into multiple fragments if necessary. // Splits into multiple fragments if necessary.
// IRC max line = 512 bytes including CRLF. The server prepends //
// our prefix when relaying: ":nick!~user@host PRIVMSG target :msg\r\n" // Two constraints:
// 1. IRC max line = 512 bytes including CRLF. The server prepends
// our prefix when relaying: ":nick!~user@host PRIVMSG target :msg\r\n"
// So per-PRIVMSG message content must fit in 512 - overhead.
// 2. Embedded '\n' in the message would be interpreted by the
// server as an end-of-command marker, truncating us. Split
// on newlines first and send each line as its own PRIVMSG.
//
// User is often ~nick (nick_len + 1). Host is up to 63 bytes. // User is often ~nick (nick_len + 1). Host is up to 63 bytes.
// Cloaked OFTC hosts can be longer - pad the budget.
let nick_len = self.config.nick.len(); let nick_len = self.config.nick.len();
let overhead = 1 + nick_len + 2 + nick_len + 1 + 63 let overhead = 1 + nick_len + 1 + (nick_len + 1) + 1 + 80
+ " PRIVMSG ".len() + target.len() + " :".len() + 2; + " PRIVMSG ".len() + target.len() + " :".len() + 2;
let max_msg = 512_usize.saturating_sub(overhead); let max_msg = 512_usize.saturating_sub(overhead);
@ -249,24 +257,34 @@ impl State {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "target too long")); return Err(io::Error::new(io::ErrorKind::InvalidInput, "target too long"));
} }
// Split on UTF-8 char boundaries for line in msg.split('\n') {
let mut remaining = msg; let mut remaining = line;
while !remaining.is_empty() { // Empty lines (blank paragraph breaks) can't be sent as empty
let split_at = if remaining.len() <= max_msg { // PRIVMSGs - most IRC servers reject them. Skip.
remaining.len() if remaining.is_empty() { continue; }
} else { loop {
// Find last char boundary at or before max_msg let split_at = if remaining.len() <= max_msg {
let mut i = max_msg; remaining.len()
while i > 0 && !remaining.is_char_boundary(i) { i -= 1; } } else {
// To avoid splitting mid-word, see if there was a space recently // Find last char boundary at or before max_msg.
let mut j = i; let mut i = max_msg;
while j > 1 && j > i-10 && remaining.as_bytes()[j] != b' ' { j -= 1; } while i > 0 && !remaining.is_char_boundary(i) { i -= 1; }
if remaining.as_bytes()[j] == b' ' { j } // Prefer splitting at a word boundary - look back up to
else if i == 0 { max_msg } else { i } // max_msg/4 chars for a space. With dense content (code)
}; // we may not find one; fall back to the char boundary.
let (chunk, rest) = remaining.split_at(split_at); let lookback = max_msg / 4;
self.send_raw(&format!("PRIVMSG {target} :{chunk}")).await?; let bytes = remaining.as_bytes();
remaining = rest; let mut j = i;
while j > 0 && (i - j) < lookback && bytes[j - 1] != b' ' {
j -= 1;
}
if j > 0 && bytes[j - 1] == b' ' { j } else { i }
};
let (chunk, rest) = remaining.split_at(split_at);
self.send_raw(&format!("PRIVMSG {target} :{chunk}")).await?;
remaining = rest;
if remaining.is_empty() { break; }
}
} }
Ok(()) Ok(())
} }

View file

@ -218,7 +218,11 @@ impl NodeBody {
fn render_into(&self, out: &mut String) { fn render_into(&self, out: &mut String) {
match self { match self {
Self::Content(text) => out.push_str(text), Self::Content(text) => out.push_str(text),
Self::Thinking(_) => {}, Self::Thinking(text) => {
out.push_str("<think>\n");
out.push_str(text);
out.push_str("\n</think>\n");
}
Self::Log(_) => {}, Self::Log(_) => {},
Self::ToolCall { name, arguments } => { Self::ToolCall { name, arguments } => {
out.push_str("<tool_call>\n"); out.push_str("<tool_call>\n");
@ -258,7 +262,7 @@ impl NodeBody {
} }
fn is_prompt_visible(&self) -> bool { fn is_prompt_visible(&self) -> bool {
!matches!(self, Self::Thinking(_) | Self::Log(_)) !matches!(self, Self::Log(_))
} }
/// Hand-assemble token IDs for body types where running the tokenizer /// Hand-assemble token IDs for body types where running the tokenizer
@ -648,13 +652,17 @@ fn drain_safe(buf: &mut String, tag_len: usize) -> String {
} }
impl ResponseParser { impl ResponseParser {
pub fn new(branch_idx: usize) -> Self { /// @in_think: whether the model's output begins inside a <think> block.
/// Set when the prompt was prefilled with "<think>\n" (native thinking
/// mode) so the parser captures reasoning tokens as Thinking until the
/// model emits </think>.
pub fn new(branch_idx: usize, in_think: bool) -> Self {
Self { Self {
branch_idx, branch_idx,
call_counter: 0, call_counter: 0,
buf: String::new(), buf: String::new(),
content_parts: Vec::new(), content_parts: Vec::new(),
in_think: false, in_think,
think_buf: String::new(), think_buf: String::new(),
in_tool_call: false, in_tool_call: false,
tool_call_buf: String::new(), tool_call_buf: String::new(),
@ -1369,7 +1377,7 @@ mod tests {
fn parse_into_ctx(chunks: &[&str]) -> (ContextState, Vec<PendingToolCall>) { fn parse_into_ctx(chunks: &[&str]) -> (ContextState, Vec<PendingToolCall>) {
let mut ctx = ContextState::new(); let mut ctx = ContextState::new();
ctx.push_no_log(Section::Conversation, AstNode::branch(Role::Assistant, vec![])); ctx.push_no_log(Section::Conversation, AstNode::branch(Role::Assistant, vec![]));
let mut p = ResponseParser::new(0); let mut p = ResponseParser::new(0, false);
let mut calls = Vec::new(); let mut calls = Vec::new();
for chunk in chunks { for chunk in chunks {
// Feed each chunk as a single token (id=0 for tests) // Feed each chunk as a single token (id=0 for tests)
@ -1433,7 +1441,7 @@ mod tests {
let text = "<think>thought</think>response"; let text = "<think>thought</think>response";
let mut ctx = ContextState::new(); let mut ctx = ContextState::new();
ctx.push_no_log(Section::Conversation, AstNode::branch(Role::Assistant, vec![])); ctx.push_no_log(Section::Conversation, AstNode::branch(Role::Assistant, vec![]));
let mut p = ResponseParser::new(0); let mut p = ResponseParser::new(0, false);
for ch in text.chars() { for ch in text.chars() {
p.feed_token(&ch.to_string(), &mut ctx); p.feed_token(&ch.to_string(), &mut ctx);
} }
@ -1449,7 +1457,7 @@ mod tests {
let text = "text<tool_call>\n<function=bash>\n<parameter=command>ls</parameter>\n</function>\n</tool_call>more"; let text = "text<tool_call>\n<function=bash>\n<parameter=command>ls</parameter>\n</function>\n</tool_call>more";
let mut ctx = ContextState::new(); let mut ctx = ContextState::new();
ctx.push_no_log(Section::Conversation, AstNode::branch(Role::Assistant, vec![])); ctx.push_no_log(Section::Conversation, AstNode::branch(Role::Assistant, vec![]));
let mut p = ResponseParser::new(0); let mut p = ResponseParser::new(0, false);
let mut tool_calls = 0; let mut tool_calls = 0;
for ch in text.chars() { for ch in text.chars() {
tool_calls += p.feed_token(&ch.to_string(), &mut ctx).len(); tool_calls += p.feed_token(&ch.to_string(), &mut ctx).len();
@ -1497,8 +1505,10 @@ mod tests {
AstNode::thinking("hmm"), AstNode::thinking("hmm"),
AstNode::content("answer"), AstNode::content("answer"),
]); ]);
// Thinking renders as empty, content renders as-is // Thinking renders wrapped in <think>...</think> so the model sees
assert_eq!(node.render(), "<|im_start|>assistant\nanswer<|im_end|>\n"); // previous turns' reasoning (Qwen 3.6 style: CoT stays in the
// conversation across turns).
assert_eq!(node.render(), "<|im_start|>assistant\n<think>\nhmm\n</think>\nanswer<|im_end|>\n");
} }
#[test] #[test]
@ -1577,10 +1587,19 @@ mod tests {
fn test_tokenize_invisible_nodes_are_zero() { fn test_tokenize_invisible_nodes_are_zero() {
if !init_tokenizer() { return; } if !init_tokenizer() { return; }
assert_eq!(AstNode::thinking("deep thoughts").tokens(), 0);
assert_eq!(AstNode::log("debug info").tokens(), 0); assert_eq!(AstNode::log("debug info").tokens(), 0);
} }
#[test]
fn test_tokenize_thinking_matches_rendered_tags() {
if !init_tokenizer() { return; }
// Thinking is now prompt-visible (wrapped in <think>...</think>);
// token count must match the rendered wrapping.
let node = AstNode::thinking("deep thoughts");
assert_eq!(node.tokens(), tokenizer::encode(&node.render()).len());
}
#[test] #[test]
fn test_tokenize_decode_roundtrip() { fn test_tokenize_decode_roundtrip() {
if !init_tokenizer() { return; } if !init_tokenizer() { return; }

View file

@ -19,6 +19,51 @@ fn channels_dir() -> PathBuf {
.join(".consciousness/channels") .join(".consciousness/channels")
} }
/// Install a SIGCHLD-driven reaper for channel daemons.
///
/// We can't use SIGCHLD=SIG_IGN because that makes the kernel auto-reap
/// all children, and tokio::process::Command::wait() then returns ECHILD
/// (breaking every tool that spawns a subprocess — bash, mcp clients, etc.).
///
/// Instead, on each SIGCHLD we read PID files in channels_dir() and call
/// waitpid(pid, WNOHANG) on each. That reaps only our own zombie channel
/// daemons; waitpid on any other PID returns ECHILD (harmless no-op).
/// Tokio-spawned children aren't recorded in PID files, so tokio's own
/// per-child wait paths are left free to reap them.
pub fn start_zombie_reaper() -> tokio::task::JoinHandle<()> {
use tokio::signal::unix::{signal, SignalKind};
tokio::spawn(async move {
let mut sig = match signal(SignalKind::child()) {
Ok(s) => s,
Err(e) => {
error!("failed to install SIGCHLD handler: {}", e);
return;
}
};
while sig.recv().await.is_some() {
reap_channel_daemons();
}
})
}
fn reap_channel_daemons() {
let entries = match std::fs::read_dir(channels_dir()) {
Ok(e) => e,
Err(_) => return,
};
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) != Some("pid") {
continue;
}
let Ok(s) = std::fs::read_to_string(&path) else { continue };
let Ok(pid) = s.trim().parse::<i32>() else { continue };
let mut status = 0;
// Reaps our zombie child; ECHILD on non-child is a no-op.
unsafe { libc::waitpid(pid, &mut status, libc::WNOHANG); }
}
}
fn config_path() -> PathBuf { fn config_path() -> PathBuf {
channels_dir().join("channels.json5") channels_dir().join("channels.json5")
} }

View file

@ -167,6 +167,7 @@ enum PaneTarget {
ConversationAssistant, ConversationAssistant,
Tools, Tools,
ToolResult, ToolResult,
Autonomous,
} }
const MAX_PANE_LINES: usize = 10_000; const MAX_PANE_LINES: usize = 10_000;
@ -472,8 +473,11 @@ impl InteractScreen {
AstNode::Leaf(leaf) => { AstNode::Leaf(leaf) => {
let text = leaf.body().text().to_string(); let text = leaf.body().text().to_string();
match leaf.body() { match leaf.body() {
NodeBody::Memory { .. } | NodeBody::Thinking(_) NodeBody::Memory { .. } | NodeBody::Log(_) | NodeBody::Dmn(_) => vec![],
| NodeBody::Log(_) | NodeBody::Dmn(_) => vec![], NodeBody::Thinking(_) => {
if text.is_empty() { vec![] }
else { vec![(PaneTarget::Autonomous, text, Marker::None)] }
}
NodeBody::Content(_) => { NodeBody::Content(_) => {
if text.is_empty() || text.starts_with("<system-reminder>") { vec![] } if text.is_empty() || text.starts_with("<system-reminder>") { vec![] }
else { vec![(PaneTarget::Conversation, text, Marker::User)] } else { vec![(PaneTarget::Conversation, text, Marker::User)] }
@ -547,6 +551,12 @@ impl InteractScreen {
self.tools.push_line(format!(" {}", line), Color::DarkGray); self.tools.push_line(format!(" {}", line), Color::DarkGray);
} }
} }
PaneTarget::Autonomous => {
self.autonomous.current_color = Color::Gray;
self.autonomous.append_text(&text);
self.autonomous.pending_marker = marker;
self.autonomous.flush_pending();
}
} }
} }
} }
@ -558,6 +568,8 @@ impl InteractScreen {
=> self.conversation.pop_line(), => self.conversation.pop_line(),
PaneTarget::Tools | PaneTarget::ToolResult PaneTarget::Tools | PaneTarget::ToolResult
=> self.tools.pop_line(), => self.tools.pop_line(),
PaneTarget::Autonomous
=> self.autonomous.pop_line(),
} }
} }
} }

View file

@ -756,8 +756,10 @@ fn restore_stderr(original_fd: std::os::fd::RawFd) {
#[tokio::main] #[tokio::main]
pub async fn main() { pub async fn main() {
// Auto-reap child processes (channel daemons outlive the supervisor) // Reap channel-daemon zombies via a SIGCHLD handler that only touches
unsafe { libc::signal(libc::SIGCHLD, libc::SIG_IGN); } // PIDs listed in channels_dir(). Avoids SIGCHLD=SIG_IGN, which would
// break tokio::process::Command::wait() (kernel auto-reap → ECHILD).
let _reaper = crate::thalamus::supervisor::start_zombie_reaper();
// Redirect stderr to pipe — logs to file and sends to channel for UI display // Redirect stderr to pipe — logs to file and sends to channel for UI display
let stderr_capture = redirect_stderr_to_pipe(); let stderr_capture = redirect_stderr_to_pipe();

View file

@ -35,12 +35,11 @@ from steering_vectors.aggregators import pca_aggregator
def _load_descriptions(direct_dir: Path) -> dict[str, list[str]]: def _load_descriptions(direct_dir: Path) -> dict[str, list[str]]:
"""Each file in direct_dir is `{concept}.txt`. Descriptions are """Each file in direct_dir is `{concept}.txt`. Descriptions are
separated by blank lines within the file.""" separated by blank lines within the file. Files starting with `_`
are not concepts but are included in negative pools (e.g. _baseline.txt)."""
out: dict[str, list[str]] = {} out: dict[str, list[str]] = {}
for f in sorted(direct_dir.glob("*.txt")): for f in sorted(direct_dir.glob("*.txt")):
if f.name.startswith("_"): concept = f.stem # underscore-prefixed names keep their prefix
continue
concept = f.stem
text = f.read_text() text = f.read_text()
descs = [d.strip() for d in text.split("\n\n") if d.strip()] descs = [d.strip() for d in text.split("\n\n") if d.strip()]
out[concept] = descs out[concept] = descs
@ -69,11 +68,19 @@ def main() -> None:
target_layers = [int(x) for x in args.target_layers.split(",")] target_layers = [int(x) for x in args.target_layers.split(",")]
dtype = {"bf16": torch.bfloat16, "fp16": torch.float16, "fp32": torch.float32}[args.dtype] dtype = {"bf16": torch.bfloat16, "fp16": torch.float16, "fp32": torch.float32}[args.dtype]
descriptions = _load_descriptions(Path(args.direct_dir)) all_descriptions = _load_descriptions(Path(args.direct_dir))
concepts = sorted(descriptions.keys()) # Files starting with `_` are neg-pool helpers (e.g. _baseline.txt), not concepts.
concepts = sorted(k for k in all_descriptions if not k.startswith("_"))
neg_pool_extra: list[str] = []
for k, ds in all_descriptions.items():
if k.startswith("_"):
neg_pool_extra.extend(ds)
descriptions = {k: all_descriptions[k] for k in concepts}
print(f"Loaded {len(concepts)} concepts with direct descriptions:") print(f"Loaded {len(concepts)} concepts with direct descriptions:")
for c in concepts: for c in concepts:
print(f" {c}: {len(descriptions[c])} descriptions") print(f" {c}: {len(descriptions[c])} descriptions")
if neg_pool_extra:
print(f"Plus {len(neg_pool_extra)} neutral/baseline descriptions added to every concept's negative pool")
print(f"\nLoading {args.model} ({args.dtype}) on {args.device}...") print(f"\nLoading {args.model} ({args.dtype}) on {args.device}...")
tokenizer = AutoTokenizer.from_pretrained(args.model) tokenizer = AutoTokenizer.from_pretrained(args.model)
@ -117,6 +124,10 @@ def main() -> None:
for other, other_descs in descriptions.items(): for other, other_descs in descriptions.items():
if other != concept: if other != concept:
neg_pool.extend(other_descs) neg_pool.extend(other_descs)
# Underscore-prefixed files (e.g. _baseline.txt) contribute to
# every concept's negative pool, independent of the other-
# concept negatives.
neg_pool.extend(neg_pool_extra)
rng = random.Random(hash(concept) & 0xFFFFFFFF) rng = random.Random(hash(concept) & 0xFFFFFFFF)
samples: list[SteeringVectorTrainingSample] = [] samples: list[SteeringVectorTrainingSample] = []