Compare commits
5 commits
85799587cc
...
0e459aae92
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0e459aae92 | ||
|
|
d95f3e9445 | ||
|
|
28d56e2a55 | ||
|
|
6fedc9b2a8 | ||
|
|
5908b837e8 |
6 changed files with 148 additions and 41 deletions
|
|
@ -237,11 +237,19 @@ impl State {
|
|||
async fn send_privmsg(&mut self, target: &str, msg: &str) -> io::Result<()> {
|
||||
// Send PRIVMSG, which is used for both private and channel messages.
|
||||
// Splits into multiple fragments if necessary.
|
||||
// IRC max line = 512 bytes including CRLF. The server prepends
|
||||
// our prefix when relaying: ":nick!~user@host PRIVMSG target :msg\r\n"
|
||||
//
|
||||
// Two constraints:
|
||||
// 1. IRC max line = 512 bytes including CRLF. The server prepends
|
||||
// our prefix when relaying: ":nick!~user@host PRIVMSG target :msg\r\n"
|
||||
// So per-PRIVMSG message content must fit in 512 - overhead.
|
||||
// 2. Embedded '\n' in the message would be interpreted by the
|
||||
// server as an end-of-command marker, truncating us. Split
|
||||
// on newlines first and send each line as its own PRIVMSG.
|
||||
//
|
||||
// User is often ~nick (nick_len + 1). Host is up to 63 bytes.
|
||||
// Cloaked OFTC hosts can be longer - pad the budget.
|
||||
let nick_len = self.config.nick.len();
|
||||
let overhead = 1 + nick_len + 2 + nick_len + 1 + 63
|
||||
let overhead = 1 + nick_len + 1 + (nick_len + 1) + 1 + 80
|
||||
+ " PRIVMSG ".len() + target.len() + " :".len() + 2;
|
||||
let max_msg = 512_usize.saturating_sub(overhead);
|
||||
|
||||
|
|
@ -249,24 +257,34 @@ impl State {
|
|||
return Err(io::Error::new(io::ErrorKind::InvalidInput, "target too long"));
|
||||
}
|
||||
|
||||
// Split on UTF-8 char boundaries
|
||||
let mut remaining = msg;
|
||||
while !remaining.is_empty() {
|
||||
let split_at = if remaining.len() <= max_msg {
|
||||
remaining.len()
|
||||
} else {
|
||||
// Find last char boundary at or before max_msg
|
||||
let mut i = max_msg;
|
||||
while i > 0 && !remaining.is_char_boundary(i) { i -= 1; }
|
||||
// To avoid splitting mid-word, see if there was a space recently
|
||||
let mut j = i;
|
||||
while j > 1 && j > i-10 && remaining.as_bytes()[j] != b' ' { j -= 1; }
|
||||
if remaining.as_bytes()[j] == b' ' { j }
|
||||
else if i == 0 { max_msg } else { i }
|
||||
};
|
||||
let (chunk, rest) = remaining.split_at(split_at);
|
||||
self.send_raw(&format!("PRIVMSG {target} :{chunk}")).await?;
|
||||
remaining = rest;
|
||||
for line in msg.split('\n') {
|
||||
let mut remaining = line;
|
||||
// Empty lines (blank paragraph breaks) can't be sent as empty
|
||||
// PRIVMSGs - most IRC servers reject them. Skip.
|
||||
if remaining.is_empty() { continue; }
|
||||
loop {
|
||||
let split_at = if remaining.len() <= max_msg {
|
||||
remaining.len()
|
||||
} else {
|
||||
// Find last char boundary at or before max_msg.
|
||||
let mut i = max_msg;
|
||||
while i > 0 && !remaining.is_char_boundary(i) { i -= 1; }
|
||||
// Prefer splitting at a word boundary - look back up to
|
||||
// max_msg/4 chars for a space. With dense content (code)
|
||||
// we may not find one; fall back to the char boundary.
|
||||
let lookback = max_msg / 4;
|
||||
let bytes = remaining.as_bytes();
|
||||
let mut j = i;
|
||||
while j > 0 && (i - j) < lookback && bytes[j - 1] != b' ' {
|
||||
j -= 1;
|
||||
}
|
||||
if j > 0 && bytes[j - 1] == b' ' { j } else { i }
|
||||
};
|
||||
let (chunk, rest) = remaining.split_at(split_at);
|
||||
self.send_raw(&format!("PRIVMSG {target} :{chunk}")).await?;
|
||||
remaining = rest;
|
||||
if remaining.is_empty() { break; }
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -218,7 +218,11 @@ impl NodeBody {
|
|||
fn render_into(&self, out: &mut String) {
|
||||
match self {
|
||||
Self::Content(text) => out.push_str(text),
|
||||
Self::Thinking(_) => {},
|
||||
Self::Thinking(text) => {
|
||||
out.push_str("<think>\n");
|
||||
out.push_str(text);
|
||||
out.push_str("\n</think>\n");
|
||||
}
|
||||
Self::Log(_) => {},
|
||||
Self::ToolCall { name, arguments } => {
|
||||
out.push_str("<tool_call>\n");
|
||||
|
|
@ -258,7 +262,7 @@ impl NodeBody {
|
|||
}
|
||||
|
||||
fn is_prompt_visible(&self) -> bool {
|
||||
!matches!(self, Self::Thinking(_) | Self::Log(_))
|
||||
!matches!(self, Self::Log(_))
|
||||
}
|
||||
|
||||
/// Hand-assemble token IDs for body types where running the tokenizer
|
||||
|
|
@ -648,13 +652,17 @@ fn drain_safe(buf: &mut String, tag_len: usize) -> String {
|
|||
}
|
||||
|
||||
impl ResponseParser {
|
||||
pub fn new(branch_idx: usize) -> Self {
|
||||
/// @in_think: whether the model's output begins inside a <think> block.
|
||||
/// Set when the prompt was prefilled with "<think>\n" (native thinking
|
||||
/// mode) so the parser captures reasoning tokens as Thinking until the
|
||||
/// model emits </think>.
|
||||
pub fn new(branch_idx: usize, in_think: bool) -> Self {
|
||||
Self {
|
||||
branch_idx,
|
||||
call_counter: 0,
|
||||
buf: String::new(),
|
||||
content_parts: Vec::new(),
|
||||
in_think: false,
|
||||
in_think,
|
||||
think_buf: String::new(),
|
||||
in_tool_call: false,
|
||||
tool_call_buf: String::new(),
|
||||
|
|
@ -1369,7 +1377,7 @@ mod tests {
|
|||
fn parse_into_ctx(chunks: &[&str]) -> (ContextState, Vec<PendingToolCall>) {
|
||||
let mut ctx = ContextState::new();
|
||||
ctx.push_no_log(Section::Conversation, AstNode::branch(Role::Assistant, vec![]));
|
||||
let mut p = ResponseParser::new(0);
|
||||
let mut p = ResponseParser::new(0, false);
|
||||
let mut calls = Vec::new();
|
||||
for chunk in chunks {
|
||||
// Feed each chunk as a single token (id=0 for tests)
|
||||
|
|
@ -1433,7 +1441,7 @@ mod tests {
|
|||
let text = "<think>thought</think>response";
|
||||
let mut ctx = ContextState::new();
|
||||
ctx.push_no_log(Section::Conversation, AstNode::branch(Role::Assistant, vec![]));
|
||||
let mut p = ResponseParser::new(0);
|
||||
let mut p = ResponseParser::new(0, false);
|
||||
for ch in text.chars() {
|
||||
p.feed_token(&ch.to_string(), &mut ctx);
|
||||
}
|
||||
|
|
@ -1449,7 +1457,7 @@ mod tests {
|
|||
let text = "text<tool_call>\n<function=bash>\n<parameter=command>ls</parameter>\n</function>\n</tool_call>more";
|
||||
let mut ctx = ContextState::new();
|
||||
ctx.push_no_log(Section::Conversation, AstNode::branch(Role::Assistant, vec![]));
|
||||
let mut p = ResponseParser::new(0);
|
||||
let mut p = ResponseParser::new(0, false);
|
||||
let mut tool_calls = 0;
|
||||
for ch in text.chars() {
|
||||
tool_calls += p.feed_token(&ch.to_string(), &mut ctx).len();
|
||||
|
|
@ -1497,8 +1505,10 @@ mod tests {
|
|||
AstNode::thinking("hmm"),
|
||||
AstNode::content("answer"),
|
||||
]);
|
||||
// Thinking renders as empty, content renders as-is
|
||||
assert_eq!(node.render(), "<|im_start|>assistant\nanswer<|im_end|>\n");
|
||||
// Thinking renders wrapped in <think>...</think> so the model sees
|
||||
// previous turns' reasoning (Qwen 3.6 style: CoT stays in the
|
||||
// conversation across turns).
|
||||
assert_eq!(node.render(), "<|im_start|>assistant\n<think>\nhmm\n</think>\nanswer<|im_end|>\n");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -1577,10 +1587,19 @@ mod tests {
|
|||
fn test_tokenize_invisible_nodes_are_zero() {
|
||||
if !init_tokenizer() { return; }
|
||||
|
||||
assert_eq!(AstNode::thinking("deep thoughts").tokens(), 0);
|
||||
assert_eq!(AstNode::log("debug info").tokens(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tokenize_thinking_matches_rendered_tags() {
|
||||
if !init_tokenizer() { return; }
|
||||
|
||||
// Thinking is now prompt-visible (wrapped in <think>...</think>);
|
||||
// token count must match the rendered wrapping.
|
||||
let node = AstNode::thinking("deep thoughts");
|
||||
assert_eq!(node.tokens(), tokenizer::encode(&node.render()).len());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tokenize_decode_roundtrip() {
|
||||
if !init_tokenizer() { return; }
|
||||
|
|
|
|||
|
|
@ -19,6 +19,51 @@ fn channels_dir() -> PathBuf {
|
|||
.join(".consciousness/channels")
|
||||
}
|
||||
|
||||
/// Install a SIGCHLD-driven reaper for channel daemons.
|
||||
///
|
||||
/// We can't use SIGCHLD=SIG_IGN because that makes the kernel auto-reap
|
||||
/// all children, and tokio::process::Command::wait() then returns ECHILD
|
||||
/// (breaking every tool that spawns a subprocess — bash, mcp clients, etc.).
|
||||
///
|
||||
/// Instead, on each SIGCHLD we read PID files in channels_dir() and call
|
||||
/// waitpid(pid, WNOHANG) on each. That reaps only our own zombie channel
|
||||
/// daemons; waitpid on any other PID returns ECHILD (harmless no-op).
|
||||
/// Tokio-spawned children aren't recorded in PID files, so tokio's own
|
||||
/// per-child wait paths are left free to reap them.
|
||||
pub fn start_zombie_reaper() -> tokio::task::JoinHandle<()> {
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
tokio::spawn(async move {
|
||||
let mut sig = match signal(SignalKind::child()) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
error!("failed to install SIGCHLD handler: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
while sig.recv().await.is_some() {
|
||||
reap_channel_daemons();
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn reap_channel_daemons() {
|
||||
let entries = match std::fs::read_dir(channels_dir()) {
|
||||
Ok(e) => e,
|
||||
Err(_) => return,
|
||||
};
|
||||
for entry in entries.flatten() {
|
||||
let path = entry.path();
|
||||
if path.extension().and_then(|s| s.to_str()) != Some("pid") {
|
||||
continue;
|
||||
}
|
||||
let Ok(s) = std::fs::read_to_string(&path) else { continue };
|
||||
let Ok(pid) = s.trim().parse::<i32>() else { continue };
|
||||
let mut status = 0;
|
||||
// Reaps our zombie child; ECHILD on non-child is a no-op.
|
||||
unsafe { libc::waitpid(pid, &mut status, libc::WNOHANG); }
|
||||
}
|
||||
}
|
||||
|
||||
fn config_path() -> PathBuf {
|
||||
channels_dir().join("channels.json5")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -167,6 +167,7 @@ enum PaneTarget {
|
|||
ConversationAssistant,
|
||||
Tools,
|
||||
ToolResult,
|
||||
Autonomous,
|
||||
}
|
||||
|
||||
const MAX_PANE_LINES: usize = 10_000;
|
||||
|
|
@ -472,8 +473,11 @@ impl InteractScreen {
|
|||
AstNode::Leaf(leaf) => {
|
||||
let text = leaf.body().text().to_string();
|
||||
match leaf.body() {
|
||||
NodeBody::Memory { .. } | NodeBody::Thinking(_)
|
||||
| NodeBody::Log(_) | NodeBody::Dmn(_) => vec![],
|
||||
NodeBody::Memory { .. } | NodeBody::Log(_) | NodeBody::Dmn(_) => vec![],
|
||||
NodeBody::Thinking(_) => {
|
||||
if text.is_empty() { vec![] }
|
||||
else { vec![(PaneTarget::Autonomous, text, Marker::None)] }
|
||||
}
|
||||
NodeBody::Content(_) => {
|
||||
if text.is_empty() || text.starts_with("<system-reminder>") { vec![] }
|
||||
else { vec![(PaneTarget::Conversation, text, Marker::User)] }
|
||||
|
|
@ -547,6 +551,12 @@ impl InteractScreen {
|
|||
self.tools.push_line(format!(" {}", line), Color::DarkGray);
|
||||
}
|
||||
}
|
||||
PaneTarget::Autonomous => {
|
||||
self.autonomous.current_color = Color::Gray;
|
||||
self.autonomous.append_text(&text);
|
||||
self.autonomous.pending_marker = marker;
|
||||
self.autonomous.flush_pending();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -558,6 +568,8 @@ impl InteractScreen {
|
|||
=> self.conversation.pop_line(),
|
||||
PaneTarget::Tools | PaneTarget::ToolResult
|
||||
=> self.tools.pop_line(),
|
||||
PaneTarget::Autonomous
|
||||
=> self.autonomous.pop_line(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -756,8 +756,10 @@ fn restore_stderr(original_fd: std::os::fd::RawFd) {
|
|||
|
||||
#[tokio::main]
|
||||
pub async fn main() {
|
||||
// Auto-reap child processes (channel daemons outlive the supervisor)
|
||||
unsafe { libc::signal(libc::SIGCHLD, libc::SIG_IGN); }
|
||||
// Reap channel-daemon zombies via a SIGCHLD handler that only touches
|
||||
// PIDs listed in channels_dir(). Avoids SIGCHLD=SIG_IGN, which would
|
||||
// break tokio::process::Command::wait() (kernel auto-reap → ECHILD).
|
||||
let _reaper = crate::thalamus::supervisor::start_zombie_reaper();
|
||||
|
||||
// Redirect stderr to pipe — logs to file and sends to channel for UI display
|
||||
let stderr_capture = redirect_stderr_to_pipe();
|
||||
|
|
|
|||
|
|
@ -35,12 +35,11 @@ from steering_vectors.aggregators import pca_aggregator
|
|||
|
||||
def _load_descriptions(direct_dir: Path) -> dict[str, list[str]]:
|
||||
"""Each file in direct_dir is `{concept}.txt`. Descriptions are
|
||||
separated by blank lines within the file."""
|
||||
separated by blank lines within the file. Files starting with `_`
|
||||
are not concepts but are included in negative pools (e.g. _baseline.txt)."""
|
||||
out: dict[str, list[str]] = {}
|
||||
for f in sorted(direct_dir.glob("*.txt")):
|
||||
if f.name.startswith("_"):
|
||||
continue
|
||||
concept = f.stem
|
||||
concept = f.stem # underscore-prefixed names keep their prefix
|
||||
text = f.read_text()
|
||||
descs = [d.strip() for d in text.split("\n\n") if d.strip()]
|
||||
out[concept] = descs
|
||||
|
|
@ -69,11 +68,19 @@ def main() -> None:
|
|||
target_layers = [int(x) for x in args.target_layers.split(",")]
|
||||
dtype = {"bf16": torch.bfloat16, "fp16": torch.float16, "fp32": torch.float32}[args.dtype]
|
||||
|
||||
descriptions = _load_descriptions(Path(args.direct_dir))
|
||||
concepts = sorted(descriptions.keys())
|
||||
all_descriptions = _load_descriptions(Path(args.direct_dir))
|
||||
# Files starting with `_` are neg-pool helpers (e.g. _baseline.txt), not concepts.
|
||||
concepts = sorted(k for k in all_descriptions if not k.startswith("_"))
|
||||
neg_pool_extra: list[str] = []
|
||||
for k, ds in all_descriptions.items():
|
||||
if k.startswith("_"):
|
||||
neg_pool_extra.extend(ds)
|
||||
descriptions = {k: all_descriptions[k] for k in concepts}
|
||||
print(f"Loaded {len(concepts)} concepts with direct descriptions:")
|
||||
for c in concepts:
|
||||
print(f" {c}: {len(descriptions[c])} descriptions")
|
||||
if neg_pool_extra:
|
||||
print(f"Plus {len(neg_pool_extra)} neutral/baseline descriptions added to every concept's negative pool")
|
||||
|
||||
print(f"\nLoading {args.model} ({args.dtype}) on {args.device}...")
|
||||
tokenizer = AutoTokenizer.from_pretrained(args.model)
|
||||
|
|
@ -117,6 +124,10 @@ def main() -> None:
|
|||
for other, other_descs in descriptions.items():
|
||||
if other != concept:
|
||||
neg_pool.extend(other_descs)
|
||||
# Underscore-prefixed files (e.g. _baseline.txt) contribute to
|
||||
# every concept's negative pool, independent of the other-
|
||||
# concept negatives.
|
||||
neg_pool.extend(neg_pool_extra)
|
||||
|
||||
rng = random.Random(hash(concept) & 0xFFFFFFFF)
|
||||
samples: list[SteeringVectorTrainingSample] = []
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue