diff --git a/proto/salience.proto b/proto/salience.proto index 01c0f1e..fab4e6d 100644 --- a/proto/salience.proto +++ b/proto/salience.proto @@ -58,21 +58,26 @@ service Salience { // boundary). rpc ForkSession(ForkSessionRequest) returns (ForkSessionResponse); - // Append an image to the session. Server decodes, runs vLLM's - // multimodal pipeline to compute N (IMAGE_PAD count), and writes - // the whole vision block into session.tokens. Returns N and the - // new total length. - rpc AppendImage(AppendImageRequest) returns (AppendImageResponse); - - // Prefill + optionally decode. See GenerateRequest for full - // semantics; stream yields Token events (with optional readouts / - // logprobs per position) followed by a terminating Done. + // Prefill + optionally decode. Images are attached inline via + // `GenerateRequest.images`; the client writes its own pre-expanded + // <|vision_start|> + N*<|image_pad|> + <|vision_end|> runs into + // `append_tokens` and declares each run's range in `images[i]`. + // Server validates run length against the actual vision-encoder + // feature count and returns INVALID_ARGUMENT on mismatch. Stream + // yields Token events (with optional readouts / logprobs per + // position) followed by a terminating Done. rpc Generate(GenerateRequest) returns (stream GenerateEvent); // Readout manifest for the currently-loaded model — concept names, // layer indices, tensor dtype. Stateless; fetch once at client // startup and cache. rpc GetReadoutManifest(GetReadoutManifestRequest) returns (ReadoutManifest); + + // Dump the full token stream of a session. Debug-only: used by the + // client to verify its local accounting against the server's + // session.tokens byte-for-byte when divergence is suspected. Not + // cheap — copies the whole sequence across the wire. + rpc DumpSession(DumpSessionRequest) returns (DumpSessionResponse); } // ============================================================ @@ -106,55 +111,47 @@ message ForkSessionResponse { string session_id = 1; // new session } -// ============================================================ -// Mutation -// ============================================================ - -message AppendImageRequest { - string session_id = 1; - - // Image bytes (PNG / JPEG / WebP / …). - bytes data = 2; - - // MIME type, e.g. "image/png". - string mime = 3; - - // Client's view of the session's current token length. Must equal - // the server's actual length, OR be strictly less when - // truncating=true. Any mismatch is FAILED_PRECONDITION. - uint32 offset = 4; - - // If true, server truncates session.tokens to `offset` before - // appending. Rejected with FAILED_PRECONDITION if the truncation - // would split an image block. - bool truncating = 5; -} - -message AppendImageResponse { - // Count of <|image_pad|> tokens inside the vision block. Does not - // include the <|vision_start|> / <|vision_end|> bookends, which - // contribute one token each. - uint32 placeholder_count = 1; - - // Session's total token length after this append, including both - // bookends (= offset + placeholder_count + 2, barring truncation). - uint32 total_length = 2; -} - // ============================================================ // Inference // ============================================================ +// One image attached to a Generate call. The client is responsible +// for writing the expanded placeholder run (VISION_START + +// N*IMAGE_PAD + VISION_END) into `GenerateRequest.append_tokens` at +// positions [pad_range_start, pad_range_end) and pairing it with +// the corresponding `ImageAttachment` entry. Server validates that +// the declared range's pad count matches what the vision encoder +// produces, and returns INVALID_ARGUMENT if they disagree. +message ImageAttachment { + // Image bytes (PNG / JPEG / WebP / …). + bytes bytes = 1; + + // MIME type, e.g. "image/png". + string mime = 2; + + // Absolute token positions (in `session.tokens` AFTER `append_tokens` + // is applied) spanning the full vision block — `[vision_start, + // pad*N, vision_end]`. end is exclusive, so end - start == N + 2. + uint32 pad_range_start = 3; + uint32 pad_range_end = 4; +} + message GenerateRequest { string session_id = 1; - // Tokens to append before prefill. May be empty. Client must NOT - // include vision tokens (<|vision_start|>, <|image_pad|>, - // <|vision_end|>) — those live in the session via AppendImage. + // Tokens to append before prefill. May be empty. Client writes the + // full vision block (VISION_START + N*IMAGE_PAD + VISION_END) for + // any newly-attached image directly into this stream; each such + // block must be paired with a matching entry in `images`. The + // server validates that the declared ranges all point at IMAGE_PAD + // runs and that each run's length matches what the vision encoder + // produces for the corresponding image. repeated uint32 append_tokens = 2; - // Offset / truncating — same semantics as AppendImage. Truncation - // that splits an image block is FAILED_PRECONDITION. + // Client's view of session.tokens length at the time of the call. + // Must equal server's actual length, OR be strictly less when + // truncating=true (server rewinds before appending). Any other + // mismatch is FAILED_PRECONDITION. uint32 offset = 3; bool truncating = 4; @@ -185,6 +182,12 @@ message GenerateRequest { // vLLM scheduler priority (0 = interactive, 10 = batch). int32 priority = 13; + + // Images newly attached on this call. Each entry describes one + // image's binary bytes, its mime type, and the exact token-position + // range of its pre-expanded placeholder run inside `session.tokens` + // after `append_tokens` is applied. See `ImageAttachment`. + repeated ImageAttachment images = 14; } message PositionRange { @@ -258,3 +261,16 @@ message ReadoutManifest { uint32 hidden_size = 3; string dtype = 4; } + +// ============================================================ +// Debug +// ============================================================ + +message DumpSessionRequest { + string session_id = 1; +} + +message DumpSessionResponse { + // The full session.tokens sequence, verbatim. + repeated uint32 tokens = 1 [packed = true]; +} diff --git a/src/agent/api/mod.rs b/src/agent/api/mod.rs index 1352d5f..5705d89 100644 --- a/src/agent/api/mod.rs +++ b/src/agent/api/mod.rs @@ -73,12 +73,6 @@ pub enum StreamToken { /// `readout` is `None` when the server has readout disabled or /// returned no readout for this chunk. Token { id: u32, readout: Option }, - /// An image was committed server-side via AppendImage during this - /// stream. `placeholder_count` is the N IMAGE_PADs the server - /// wrote. Emitted in AST order — caller applies these counts to - /// the first-N image leaves that currently have token_count=0 - /// via `ContextState::commit_image_token_counts`. - ImageAppended { placeholder_count: u32 }, Done { usage: Option }, Error(String), } @@ -150,6 +144,8 @@ impl ApiClient { &self, session_lock: std::sync::Arc>>, chunks: Vec, + images: Vec, + match_upto: u32, sampling: SamplingParams, priority: Option, readout_shape: Option<(u32, u32)>, @@ -159,8 +155,8 @@ impl ApiClient { let handle = tokio::spawn(async move { let result = run_session_generate( - session_lock, &client, chunks, sampling, priority, - readout_shape, &tx, + session_lock, &client, chunks, images, match_upto, sampling, + priority, readout_shape, &tx, ).await; if let Err(e) = result { log::warn!(target: "grpc", @@ -220,6 +216,8 @@ async fn run_session_generate( session_lock: std::sync::Arc>>, client: &ApiClient, chunks: Vec, + images: Vec, + match_upto: u32, sampling: SamplingParams, priority: Option, readout_shape: Option<(u32, u32)>, @@ -242,68 +240,69 @@ async fn run_session_generate( } }; - // Skip chunks already on the server. committed_len must land on - // a chunk boundary — every successful AppendImage / Generate - // advances committed_len by exactly one chunk's contribution, - // so straddling means divergence (client's AST was rewritten - // under us). - let mut acc: u32 = 0; - let mut delta_start = chunks.len(); - for (i, chunk) in chunks.iter().enumerate() { - if acc == handle.committed_len { - delta_start = i; - break; - } - let len = match chunk { - WireChunk::Tokens(t) => t.len() as u32, - WireChunk::Image { known_expanded_len, .. } => *known_expanded_len, - }; - if len == 0 { - anyhow::bail!( - "session divergence: chunk {} has unknown length but \ - precedes committed_len {} (acc={})", - i, handle.committed_len, acc, - ); - } - if acc + len > handle.committed_len { - anyhow::bail!( - "session divergence: chunk {} straddles committed_len \ - (acc={}, len={}, committed={})", - i, acc, len, handle.committed_len, - ); - } - acc += len; - } - if acc != handle.committed_len { - anyhow::bail!( - "session divergence: chunks sum to {} but committed_len is {}", - acc, handle.committed_len, - ); + // If the client believes the match extends only up to `match_upto` + // but the server has more, we need to rewind. For v1 the match is + // either whole or broken — `match_upto` is always 0 on any mutation + // — so the cheapest correct recovery is to drop the session and + // open a fresh one. + if match_upto < handle.committed_len { + log::warn!(target: "grpc", + "session rewind: match_upto={} < committed_len={} — reopening session (resending {} bytes)", + match_upto, handle.committed_len, handle.committed_len - match_upto); + drop(handle); + handle = salience::SessionHandle::open(client).await?; } - // Walk the delta: accumulate Tokens in `pending`; on Image, - // flush pending via prefill-only Generate then AppendImage. + // Walk chunks at byte-level, taking everything past `match_upto` + // as the delta. Token chunks can be split mid-way; images live + // inline in the token stream, so there's no separate image-chunk + // case anymore. + let mut acc: u32 = 0; let mut pending: Vec = Vec::new(); - for chunk in &chunks[delta_start..] { + for chunk in chunks.iter() { match chunk { - WireChunk::Tokens(t) => pending.extend_from_slice(t), - WireChunk::Image { bytes, mime, .. } => { - if !pending.is_empty() { - handle.prefill_only(std::mem::take(&mut pending)).await?; + WireChunk::Tokens(t) => { + let len = t.len() as u32; + let chunk_end = acc + len; + if chunk_end <= match_upto { + acc = chunk_end; + } else if acc < match_upto { + let skip = (match_upto - acc) as usize; + pending.extend_from_slice(&t[skip..]); + acc = chunk_end; + } else { + pending.extend_from_slice(t); + acc = chunk_end; } - let resp = handle - .append_image(bytes.clone(), mime.clone(), false) - .await?; - log::debug!(target: "grpc", - "AppendImage: N={} total_length={}", - resp.placeholder_count, resp.total_length); - let _ = tx.send(StreamToken::ImageAppended { - placeholder_count: resp.placeholder_count, - }); } } } + // Filter images to those entirely past `match_upto` — anything + // before is on the server already (prior turn), anything + // straddling is a hard divergence (image partially-sent shouldn't + // happen with our atomic AppendImage history; with images-inline + // it can only happen if mark_dirty cleared match_upto mid-block, + // which the AST mutators prevent). + let mut new_images: Vec = Vec::new(); + for img in &images { + if img.pad_end <= match_upto { + continue; // already sent on a prior turn + } + if img.pad_start < match_upto { + anyhow::bail!( + "session divergence: image at [{},{}) straddles match_upto={}", + img.pad_start, img.pad_end, match_upto, + ); + } + new_images.push(pb::ImageAttachment { + bytes: img.bytes.clone(), + mime: img.mime.clone(), + pad_range_start: img.pad_start, + pad_range_end: img.pad_end, + }); + } + // Final Generate: pending holds any trailing text; decode up to // sampling.max_tokens. Request readouts on all decode positions // via a catch-all range ending at u32::MAX — decode never @@ -331,6 +330,7 @@ async fn run_session_generate( top_k: sampling.top_k, stop_token_ids: Vec::new(), priority: priority.unwrap_or(0), + images: new_images, }; let session_id_for_log = handle.session_id.clone(); let t_generate = Instant::now(); diff --git a/src/agent/api/salience.rs b/src/agent/api/salience.rs index bba950f..f5f65d2 100644 --- a/src/agent/api/salience.rs +++ b/src/agent/api/salience.rs @@ -94,6 +94,8 @@ pub struct SessionHandle { impl SessionHandle { pub async fn open(client: &super::ApiClient) -> Result { + let t0 = std::time::Instant::now(); + log::debug!(target: "grpc", "OpenSession rpc: start"); let mut c = client.salience_client().await?; let mut req = tonic::Request::new(pb::OpenSessionRequest { model: client.model.clone(), @@ -105,8 +107,8 @@ impl SessionHandle { .with_context(|| "OpenSession RPC failed")? .into_inner(); log::debug!(target: "grpc", - "SessionHandle::open session_id={} max_model_len={}", - resp.session_id, resp.max_model_len); + "OpenSession rpc: done session_id={} max_model_len={} elapsed={:?}", + resp.session_id, resp.max_model_len, t0.elapsed()); Ok(Self { session_id: resp.session_id, max_model_len: resp.max_model_len, @@ -117,30 +119,21 @@ impl SessionHandle { pub fn client(&self) -> &super::ApiClient { &self.client } - /// Append an image via the server-side vision block. Updates - /// `committed_len` from the server's response on success. - pub async fn append_image( - &mut self, - data: Vec, - mime: String, - truncating: bool, - ) -> Result { + /// Debug-only: fetch the server's full session.tokens. Used to + /// verify client-side accounting byte-for-byte when divergence + /// is suspected. Not cheap on large sessions. + pub async fn dump_tokens(&self) -> Result> { let mut c = self.client.salience_client().await?; - let mut req = tonic::Request::new(pb::AppendImageRequest { + let mut req = tonic::Request::new(pb::DumpSessionRequest { session_id: self.session_id.clone(), - data, - mime, - offset: self.committed_len, - truncating, }); with_auth(&mut req, self.client.api_key()); let resp = c - .append_image(req) + .dump_session(req) .await - .with_context(|| "AppendImage RPC failed")? + .with_context(|| "DumpSession RPC failed")? .into_inner(); - self.committed_len = resp.total_length; - Ok(resp) + Ok(resp.tokens) } /// Open a gRPC Generate stream with the given request. Caller @@ -151,6 +144,10 @@ impl SessionHandle { &self, req: pb::GenerateRequest, ) -> Result> { + let t0 = std::time::Instant::now(); + log::debug!(target: "grpc", + "Generate rpc: open-stream session={} offset={} append={} max_tokens={}", + self.session_id, req.offset, req.append_tokens.len(), req.max_tokens); let mut c = self.client.salience_client().await?; let mut req = tonic::Request::new(req); with_auth(&mut req, self.client.api_key()); @@ -158,6 +155,9 @@ impl SessionHandle { .generate(req) .await .with_context(|| "Generate RPC failed")?; + log::debug!(target: "grpc", + "Generate rpc: stream opened session={} open-latency={:?}", + self.session_id, t0.elapsed()); Ok(resp.into_inner()) } @@ -183,6 +183,7 @@ impl SessionHandle { top_k: 0, stop_token_ids: Vec::new(), priority: 0, + images: Vec::new(), }; let mut stream = self.generate(req).await?; while let Some(event) = stream.next().await { diff --git a/src/agent/context.rs b/src/agent/context.rs index 2982851..0a49e05 100644 --- a/src/agent/context.rs +++ b/src/agent/context.rs @@ -143,6 +143,13 @@ pub enum AstNode { /// Maps memory key → divergence score for this response. #[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")] memory_scores: std::collections::BTreeMap, + /// Cached token stream for the subtree. When `Some`, wire-out + /// uses these bytes verbatim and skips recursion into children. + /// Populated by the response parser from the server's exact + /// stream; also computable from children as a fallback. Cleared + /// on any edit to a descendant. Not serialized — transient. + #[serde(skip, default)] + token_ids: Option>, }, } @@ -155,6 +162,14 @@ pub struct ContextState { journal: Vec, conversation: Vec, pub conversation_log: Option, + /// Length of the session's token stream on the server, as of the + /// last Done event. Updated by the grpc layer. + server_committed_len: u32, + /// Prefix length of our walk that still matches the server's + /// session.tokens byte-for-byte. When < `server_committed_len` + /// the session needs rewinding (truncating=true at this offset). + /// Reset to 0 on any mutation that could have changed sent bytes. + client_match_upto: u32, } impl Clone for ContextState { @@ -165,6 +180,8 @@ impl Clone for ContextState { journal: self.journal.clone(), conversation: self.conversation.clone(), conversation_log: None, // forked contexts don't log + server_committed_len: self.server_committed_len, + client_match_upto: self.client_match_upto, } } } @@ -201,6 +218,10 @@ pub struct ResponseParser { think_buf: String, in_tool_call: bool, tool_call_buf: String, + /// Raw generated token IDs, in arrival order. Combined with the + /// prologue at `finish` to stamp the Branch's authoritative + /// token cache — the bytes the server has for this branch. + generated_tokens: Vec, } impl Role { @@ -369,8 +390,11 @@ impl AstNode { mime: impl Into, orig_height: u32, orig_width: u32, - token_count: u32, ) -> Self { + // Pad count is computed eagerly from dimensions — no more + // "unknown until server responds" shape. Server validates + // on the Generate call; mismatches fail loud. + let token_count = qwen3_image_token_count(orig_height, orig_width); Self::Leaf(NodeLeaf::new(NodeBody::Image { bytes, mime: mime.into(), @@ -383,7 +407,13 @@ impl AstNode { // -- Branch constructors -------------------------------------------------- pub fn branch(role: Role, children: Vec) -> Self { - Self::Branch { role, children, timestamp: Utc::now(), memory_scores: Default::default() } + Self::Branch { + role, + children, + timestamp: Utc::now(), + memory_scores: Default::default(), + token_ids: None, + } } pub fn system_msg(text: impl Into) -> Self { @@ -392,6 +422,7 @@ impl AstNode { children: vec![Self::content(text)], timestamp: Utc::now(), memory_scores: Default::default(), + token_ids: None, } } @@ -401,6 +432,7 @@ impl AstNode { children: vec![Self::content(text)], timestamp: Utc::now(), memory_scores: Default::default(), + token_ids: None, } } @@ -412,11 +444,12 @@ impl AstNode { let token_ids = leaf.body.compute_token_ids(); Self::Leaf(NodeLeaf { token_ids, ..leaf }) } - Self::Branch { role, children, timestamp, memory_scores } => Self::Branch { + Self::Branch { role, children, timestamp, memory_scores, .. } => Self::Branch { role, children: children.into_iter().map(|c| c.retokenize()).collect(), timestamp, memory_scores, + token_ids: None, }, } } @@ -493,7 +526,10 @@ impl AstNode { fn token_ids_into(&self, out: &mut Vec) { match self { Self::Leaf(leaf) => out.extend_from_slice(&leaf.token_ids), - Self::Branch { role, children, .. } => { + Self::Branch { token_ids: Some(cached), .. } => { + out.extend_from_slice(cached); + } + Self::Branch { role, children, token_ids: None, .. } => { out.push(tokenizer::IM_START); out.extend(tokenizer::encode(&format!("{}\n", role.as_str()))); for child in children { @@ -522,7 +558,8 @@ impl Ast for AstNode { fn tokens(&self) -> usize { match self { Self::Leaf(leaf) => leaf.tokens(), - Self::Branch { role, children, .. } => { + Self::Branch { token_ids: Some(cached), .. } => cached.len(), + Self::Branch { role, children, token_ids: None, .. } => { 1 + role_header_tokens(*role) + children.iter().map(|c| c.tokens()).sum::() + 1 + newline_tokens() @@ -676,6 +713,7 @@ impl ResponseParser { think_buf: String::new(), in_tool_call: false, tool_call_buf: String::new(), + generated_tokens: Vec::new(), } } @@ -706,6 +744,7 @@ impl ResponseParser { buf.push(id, r); } } + parser.generated_tokens.push(id); let text = super::tokenizer::decode(&[id]); full_text.push_str(&text); let mut ctx = agent.context.lock().await; @@ -740,22 +779,16 @@ impl ResponseParser { let _ = writeln!(f, " unparsed text: {}", &full_text[..end]); } } - if let Some(u) = usage { + if let Some(ref u) = usage { agent.state.lock().await.last_prompt_tokens = u.prompt_tokens; } let mut ctx = agent.context.lock().await; parser.finish(&mut ctx); + if let Some(u) = usage { + ctx.note_session_synced(u.total_tokens); + } return Ok(()); } - super::api::StreamToken::ImageAppended { placeholder_count } => { - // Commit the server-authoritative IMAGE_PAD - // count into the first zero-count image leaf - // in wire order. AppendImage always runs - // before the final Generate, so this fires - // before any Token events for this stream. - let mut ctx = agent.context.lock().await; - ctx.commit_image_token_counts(&[placeholder_count]); - } super::api::StreamToken::Error(e) => { return Err(anyhow::anyhow!("{}", e)); } @@ -842,7 +875,7 @@ impl ResponseParser { } fn push_child(&self, ctx: &mut ContextState, child: AstNode) { - ctx.push_child(Section::Conversation, self.branch_idx, child); + ctx.push_child_raw(Section::Conversation, self.branch_idx, child); } fn flush_content(&mut self, ctx: &mut ContextState) { @@ -860,6 +893,29 @@ impl ResponseParser { self.content_parts.push(std::mem::take(&mut self.buf)); } self.flush_content(ctx); + + // Stamp the authoritative token cache onto the branch. + // Layout mirrors the full chat-template rendering of a + // message block: + // + // IM_START + "assistant\n" [+ "\n"] (prologue — what we sent) + // + generated_tokens (what the server generated, ends in IM_END) + // + "\n" (trailing newline — template-required) + // + // Server only has through the IM_END (model stops on it, + // doesn't emit "\n"). Match-upto lands inside the cache + // right after IM_END; the chunk-walk's straddle path picks + // up the trailing "\n" as the head of the next turn's delta. + // The "\n" between turns matters: without it Qwen sees + // `<|im_end|><|im_start|>` back-to-back (no newline) and + // responds with garbage. + let prologue_text = if self.in_think { "assistant\n\n" } else { "assistant\n" }; + let mut cache = Vec::with_capacity(1 + self.generated_tokens.len() + 8); + cache.push(tokenizer::IM_START); + cache.extend(tokenizer::encode(prologue_text)); + cache.extend(self.generated_tokens); + cache.extend(tokenizer::encode("\n")); + ctx.set_branch_cache(Section::Conversation, self.branch_idx, cache); } } @@ -871,9 +927,39 @@ impl ContextState { journal: Vec::new(), conversation: Vec::new(), conversation_log: None, + server_committed_len: 0, + client_match_upto: 0, } } + // -- Server sync tracking ------------------------------------------------- + + /// Length of the session's token stream on the server. Updated by + /// the grpc layer from Generate Done events. + pub fn server_committed_len(&self) -> u32 { self.server_committed_len } + + /// Prefix of our walk we still believe matches the server + /// byte-for-byte. If less than `server_committed_len`, the next + /// Generate must send `truncating=true` at this offset. + pub fn client_match_upto(&self) -> u32 { self.client_match_upto } + + /// Called by the grpc layer after a successful Generate Done: + /// records both the server's new length and the fact that we + /// match up to it (we just sent everything). + pub fn note_session_synced(&mut self, total_tokens: u32) { + self.server_committed_len = total_tokens; + self.client_match_upto = total_tokens; + } + + /// Reset match-upto to 0. Called from every mutation that could + /// have touched a region the server already has. For now, + /// conservatively drops alignment entirely — finer-grained + /// tracking (match-upto at the mutated node's offset) is a + /// future optimization. + fn mark_dirty(&mut self) { + self.client_match_upto = 0; + } + // -- Read access ---------------------------------------------------------- pub fn system(&self) -> &[AstNode] { &self.system } @@ -886,35 +972,6 @@ impl ContextState { [&self.system, &self.identity, &self.journal, &self.conversation] } - /// Walk image leaves across all sections in wire order and fill in - /// the first N leaves that have `token_count == 0` with successive - /// values from `counts`. Used after a gRPC session's stream of - /// AppendImage responses to commit the server's IMAGE_PAD counts - /// back into the AST so the next wire walk doesn't see zero-count - /// images in the already-committed prefix. - pub fn commit_image_token_counts(&mut self, counts: &[u32]) { - fn visit(node: &mut AstNode, counts: &[u32], idx: &mut usize) { - if *idx >= counts.len() { return; } - match node { - AstNode::Leaf(leaf) => { - if let NodeBody::Image { token_count, .. } = leaf.body() { - if *token_count == 0 { - leaf.set_image_token_count(counts[*idx]); - *idx += 1; - } - } - } - AstNode::Branch { children, .. } => { - for c in children { visit(c, counts, idx); } - } - } - } - let mut idx = 0usize; - for node in &mut self.system { visit(node, counts, &mut idx); } - for node in &mut self.identity { visit(node, counts, &mut idx); } - for node in &mut self.journal { visit(node, counts, &mut idx); } - for node in &mut self.conversation { visit(node, counts, &mut idx); } - } } impl Ast for ContextState { @@ -947,55 +1004,57 @@ impl Ast for ContextState { } /// An image collected from the AST for a request body. The AST stores -/// the pre-expanded token form (`<|vision_start|> + <|image_pad|>×N + -/// <|vision_end|>`), and the wire form mirrors that exactly so the -/// server's `session.tokens` length matches what vLLM's engine will -/// process. The authoritative N is obtained from the server via the -/// CountImageTokens RPC before the Image leaf is constructed. +/// Image metadata collected during `wire_chunks` — the binary + +/// mime plus the absolute token-position range of the image's +/// pre-expanded placeholder run in the full wire stream. Sent +/// alongside `append_tokens` in `GenerateRequest` so the server +/// can attach vision features to the declared positions. Positions +/// are absolute within the full wire walk starting at offset 0, +/// i.e. the same coordinate system as `session.tokens` on the +/// server once the walk has been applied. #[derive(Clone)] pub struct WireImage { pub bytes: Vec, pub mime: String, + pub pad_start: u32, + pub pad_end: u32, } -/// One piece of the wire stream for the gRPC session path. Runs of -/// text/tool/thinking tokens are batched into `Tokens`; each Image -/// leaf becomes its own `Image` chunk because the server writes the -/// full vision block on AppendImage — the client never sends vision -/// tokens inline. Order matches the AST's depth-first wire order. +/// One piece of the wire stream for the gRPC session path. Since +/// images now live inline in the token stream (pre-expanded at AST +/// construction time), there's only one variant — a run of tokens. +/// The parallel `Vec` returned by `wire_chunks` gives the +/// binary + position metadata for each embedded image. #[derive(Clone)] pub enum WireChunk { Tokens(Vec), - Image { - bytes: Vec, - mime: String, - /// Client's current best guess at how many tokens the server - /// will expand this image to, including bookends. `0` means - /// the count is unknown (view_image just loaded the image and - /// AppendImage hasn't run yet). Callers use this only to know - /// this chunk's contribution to the server-visible length for - /// offset bookkeeping on chunks that were already appended on - /// a prior turn. - known_expanded_len: u32, - }, } fn wire_into(node: &AstNode, tokens: &mut Vec, images: &mut Vec) { match node { AstNode::Leaf(leaf) => match leaf.body() { NodeBody::Image { bytes, mime, .. } => { - // Send the pre-expanded token form (includes N - // <|image_pad|> tokens); engine's multi_modal - // pipeline pairs them with the binary data below. + // The Image leaf's token_ids is already + // [VISION_START, IMAGE_PAD * N, VISION_END]. Inline + // those into the token stream and record the pad-run + // range so the server can attach features to the + // declared positions. + let pad_start = tokens.len() as u32; tokens.extend_from_slice(leaf.token_ids()); + let pad_end = tokens.len() as u32; images.push(WireImage { bytes: bytes.clone(), mime: mime.clone(), + pad_start, + pad_end, }); } _ => tokens.extend_from_slice(leaf.token_ids()), }, - AstNode::Branch { role, children, .. } => { + AstNode::Branch { token_ids: Some(cached), .. } => { + tokens.extend_from_slice(cached); + } + AstNode::Branch { role, children, token_ids: None, .. } => { tokens.push(tokenizer::IM_START); tokens.extend(tokenizer::encode(&format!("{}\n", role.as_str()))); for c in children { @@ -1118,10 +1177,16 @@ impl ContextState { } /// Build the wire stream as interleaved `WireChunk`s for the gRPC - /// session path. Unlike `wire_prompt`, this preserves the order - /// of text runs vs image blocks so the caller can drive the - /// append flow (AppendImage for each Image, Generate append for - /// contiguous text runs). + /// session path. Returns a tuple of (chunks, images): the chunks + /// hold the full token stream (with vision blocks inlined as + /// `VISION_START + IMAGE_PAD*N + VISION_END`), and the images + /// list carries each embedded image's binary + position range so + /// the gRPC layer can attach them via `GenerateRequest.images`. + /// + /// Note: with images inlined into the token stream, the chunks + /// list is structurally a single `Tokens` chunk in the common + /// case — the multi-chunk shape persists only because some + /// callers may want the option of inserting breakpoints later. /// /// `conv_range` and `skip` mirror `wire_prompt` — select a /// conversation slice and drop identity / conversation nodes by @@ -1130,46 +1195,43 @@ impl ContextState { &self, conv_range: std::ops::Range, mut skip: F, - ) -> Vec + ) -> (Vec, Vec) where F: FnMut(&AstNode) -> bool, { - let mut out: Vec = Vec::new(); let mut buf: Vec = Vec::new(); + let mut images: Vec = Vec::new(); - fn flush(buf: &mut Vec, out: &mut Vec) { - if !buf.is_empty() { - out.push(WireChunk::Tokens(std::mem::take(buf))); - } - } - - fn visit(node: &AstNode, buf: &mut Vec, out: &mut Vec) { + fn visit( + node: &AstNode, + buf: &mut Vec, + images: &mut Vec, + ) { match node { AstNode::Leaf(leaf) => match leaf.body() { - NodeBody::Image { bytes, mime, token_count, .. } => { - flush(buf, out); - // Bookends (VISION_START + VISION_END) add 2 - // to the expanded length; token_count is the - // IMAGE_PAD run. 0 means count is still - // unknown (no AppendImage yet) — don't claim - // a length the server will disagree with. - let expanded = if *token_count == 0 { - 0 - } else { - *token_count + 2 - }; - out.push(WireChunk::Image { + NodeBody::Image { bytes, mime, .. } => { + // Pre-expanded vision block lives in + // leaf.token_ids: [VISION_START, IMAGE_PAD*N, + // VISION_END]. Inline + record the range. + let pad_start = buf.len() as u32; + buf.extend_from_slice(leaf.token_ids()); + let pad_end = buf.len() as u32; + images.push(WireImage { bytes: bytes.clone(), mime: mime.clone(), - known_expanded_len: expanded, + pad_start, + pad_end, }); } _ => buf.extend_from_slice(leaf.token_ids()), }, - AstNode::Branch { role, children, .. } => { + AstNode::Branch { token_ids: Some(cached), .. } => { + buf.extend_from_slice(cached); + } + AstNode::Branch { role, children, token_ids: None, .. } => { buf.push(tokenizer::IM_START); buf.extend(tokenizer::encode(&format!("{}\n", role.as_str()))); for c in children { - visit(c, buf, out); + visit(c, buf, images); } buf.push(tokenizer::IM_END); buf.extend(tokenizer::encode("\n")); @@ -1177,18 +1239,22 @@ impl ContextState { } } - for node in self.system() { visit(node, &mut buf, &mut out); } + for node in self.system() { visit(node, &mut buf, &mut images); } for node in self.identity() { if skip(node) { continue; } - visit(node, &mut buf, &mut out); + visit(node, &mut buf, &mut images); } - for node in self.journal() { visit(node, &mut buf, &mut out); } + for node in self.journal() { visit(node, &mut buf, &mut images); } for node in &self.conversation()[conv_range] { if skip(node) { continue; } - visit(node, &mut buf, &mut out); + visit(node, &mut buf, &mut images); } - flush(&mut buf, &mut out); - out + let chunks = if buf.is_empty() { + Vec::new() + } else { + vec![WireChunk::Tokens(buf)] + }; + (chunks, images) } } @@ -1209,17 +1275,27 @@ impl ContextState { dbglog!("warning: log: {:#}", e); } } + // Conversation appends always go to the tail — past committed — + // so they don't break the match. Any other section mutates a + // region the server may already have, so drop alignment. + if section != Section::Conversation { + self.mark_dirty(); + } self.section_mut(section).push(node); } /// Push without logging. pub fn push_no_log(&mut self, section: Section, node: AstNode) { + if section != Section::Conversation { + self.mark_dirty(); + } self.section_mut(section).push(node); } /// Replace the body of a leaf at `index` in `section`. /// Re-tokenizes to maintain the invariant. pub fn set_message(&mut self, section: Section, index: usize, body: NodeBody) { + self.mark_dirty(); let nodes = self.section_mut(section); let node = &mut nodes[index]; match node { @@ -1245,10 +1321,12 @@ impl ContextState { } pub fn del(&mut self, section: Section, index: usize) -> AstNode { + self.mark_dirty(); self.section_mut(section).remove(index) } pub fn clear(&mut self, section: Section) { + self.mark_dirty(); self.section_mut(section).clear(); } @@ -1269,6 +1347,7 @@ impl ContextState { /// are > 50% of conversation tokens) or oldest conversation entry. /// Phase 3: Snap to user message boundary at start. pub fn trim_conversation(&mut self) { + self.mark_dirty(); let max_tokens = context_budget_tokens(); let fixed = self.system.iter().map(|n| n.tokens()).sum::() + self.identity.iter().map(|n| n.tokens()).sum::() @@ -1345,11 +1424,49 @@ impl ContextState { } /// Push a child node into a branch at `index` in `section`. + /// Clears the branch's cached token stream — wire-out will recompute + /// from children until the cache is repopulated. If the cache was + /// populated (server had these bytes), drops session alignment. pub fn push_child(&mut self, section: Section, index: usize, child: AstNode) { + let node = &mut self.section_mut(section)[index]; + let was_cached = matches!(node, AstNode::Branch { token_ids: Some(_), .. }); + match node { + AstNode::Branch { children, token_ids, .. } => { + children.push(child); + *token_ids = None; + } + AstNode::Leaf(_) => panic!("push_child on leaf node"), + } + if was_cached { + self.mark_dirty(); + } + } + + /// Like `push_child` but preserves the branch's cached token stream. + /// Used by the response parser, which is simultaneously populating + /// the cache from the authoritative server stream and pushing the + /// parsed-out children — the two stay consistent by construction. + /// Module-private: callers outside `context.rs` must go through + /// `push_child` so the invariant is maintained. + fn push_child_raw(&mut self, section: Section, index: usize, child: AstNode) { let node = &mut self.section_mut(section)[index]; match node { AstNode::Branch { children, .. } => children.push(child), - AstNode::Leaf(_) => panic!("push_child on leaf node"), + AstNode::Leaf(_) => panic!("push_child_raw on leaf node"), + } + } + + /// Stamp a verbatim token cache onto the branch at `index` in + /// `section`. Used by the response parser to record the server's + /// authoritative token stream for the just-finished turn. + /// Module-private: the cache is an invariant-load-bearing piece + /// of state, populated only by code that holds the server's + /// ground truth. + fn set_branch_cache(&mut self, section: Section, index: usize, tokens: Vec) { + let node = &mut self.section_mut(section)[index]; + match node { + AstNode::Branch { token_ids, .. } => *token_ids = Some(tokens), + AstNode::Leaf(_) => panic!("set_branch_cache on leaf node"), } } @@ -1373,20 +1490,19 @@ impl ContextState { // to at request time. Constants come from Qwen3.5-27B's preprocessor_config. // --------------------------------------------------------------------------- -// Test-only client-side estimate of image token expansion. Production -// callers obtain the authoritative count from the server via -// CountImageTokens; these constants and helpers stay around only to -// keep the context-shape unit tests self-contained. -#[cfg(test)] +// Production client-side computation of image-token expansion. With +// the delta-session protocol, the client writes the pre-expanded +// vision block (VISION_START + N*IMAGE_PAD + VISION_END) directly +// into the token stream at Image-leaf construction time, and tells +// the server where each image's pad run lives via +// GenerateRequest.images. Server validates that this N matches +// what the vision encoder actually produces and rejects on +// mismatch — so drift here fails loudly, not silently. const QWEN3_PATCH_SIZE: u32 = 16; -#[cfg(test)] const QWEN3_MERGE_SIZE: u32 = 2; -#[cfg(test)] const QWEN3_MIN_PIXELS: u64 = 65_536; -#[cfg(test)] const QWEN3_MAX_PIXELS: u64 = 16_777_216; -#[cfg(test)] fn smart_resize(h: u32, w: u32, factor: u32, min_pixels: u64, max_pixels: u64) -> (u32, u32) { let max_s = h.max(w) as f64; let min_s = h.min(w) as f64; @@ -1415,11 +1531,10 @@ fn smart_resize(h: u32, w: u32, factor: u32, min_pixels: u64, max_pixels: u64) - } } -/// Test-only: client-side estimate of how many `<|image_pad|>` tokens -/// vLLM will emit for an image of the given dimensions. Production -/// callers use `salience::count_image_tokens` (server-authoritative). -#[cfg(test)] -fn qwen3_image_token_count(orig_h: u32, orig_w: u32) -> u32 { +/// How many `<|image_pad|>` tokens the Qwen3-VL vision encoder will +/// produce for an image of the given dimensions. Server verifies +/// this count against its own encoder run and rejects on mismatch. +pub fn qwen3_image_token_count(orig_h: u32, orig_w: u32) -> u32 { let factor = QWEN3_PATCH_SIZE * QWEN3_MERGE_SIZE; let (rh, rw) = smart_resize(orig_h, orig_w, factor, QWEN3_MIN_PIXELS, QWEN3_MAX_PIXELS); (rh / QWEN3_PATCH_SIZE) * (rw / QWEN3_PATCH_SIZE) / (QWEN3_MERGE_SIZE * QWEN3_MERGE_SIZE) @@ -1854,7 +1969,7 @@ mod tests { #[test] fn test_image_render_and_token_ids() { - let node = AstNode::image(vec![0u8, 1, 2, 3], "image/png", 512, 512, qwen3_image_token_count(512, 512)); + let node = AstNode::image(vec![0u8, 1, 2, 3], "image/png", 512, 512); let leaf = node.leaf().unwrap(); // 3 tokens of bookend + 256 image_pad tokens assert_eq!(leaf.token_ids().len(), 258); @@ -1874,7 +1989,7 @@ mod tests { let mut ctx = ContextState::new(); ctx.push_no_log(Section::Conversation, AstNode::branch(Role::User, vec![ AstNode::content("look:"), - AstNode::image(vec![0xDE, 0xAD], "image/png", 512, 512, qwen3_image_token_count(512, 512)), + AstNode::image(vec![0xDE, 0xAD], "image/png", 512, 512), ])); // AST side and wire side should both carry N image_pads + bookends — @@ -1904,7 +2019,7 @@ mod tests { #[test] fn test_image_serde_roundtrip() { - let node = AstNode::image(vec![0xDE, 0xAD, 0xBE, 0xEF], "image/png", 64, 64, qwen3_image_token_count(64, 64)); + let node = AstNode::image(vec![0xDE, 0xAD, 0xBE, 0xEF], "image/png", 64, 64); let json = serde_json::to_string(&node).unwrap(); // bytes must be base64-encoded in the JSON form assert!(json.contains("3q2+7w==")); diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 613b106..1db40b1 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -333,14 +333,16 @@ impl Agent { /// becomes its own chunk. Also trims the conversation to budget /// first so we don't build a prompt the server will reject for /// length. - pub async fn assemble_prompt(&self) -> Vec { + pub async fn assemble_prompt(&self) + -> (Vec, Vec, u32) + { let mut ctx = self.context.lock().await; if ctx.total_tokens() > context::context_budget_tokens() { ctx.trim_conversation(); } let st = self.state.lock().await; let conv_len = ctx.conversation().len(); - let mut chunks = ctx.wire_chunks(0..conv_len, |_| false); + let (mut chunks, images) = ctx.wire_chunks(0..conv_len, |_| false); // Assistant-turn prologue. Merge into the trailing Tokens // chunk if there is one, else push as a new chunk. let mut prologue = vec![tokenizer::IM_START]; @@ -353,7 +355,8 @@ impl Agent { Some(context::WireChunk::Tokens(last)) => last.extend(prologue), _ => chunks.push(context::WireChunk::Tokens(prologue)), } - chunks + let match_upto = ctx.client_match_upto(); + (chunks, images, match_upto) } /// Rebuild the tools section of the system prompt from the current tools list. @@ -413,7 +416,7 @@ impl Agent { let _thinking = start_activity(&agent, "thinking...").await; let (rx, _stream_guard) = { - let chunks = agent.assemble_prompt().await; + let (chunks, images, match_upto) = agent.assemble_prompt().await; let st = agent.state.lock().await; let readout_shape = agent.readout.lock().ok().and_then(|buf| { buf.manifest.as_ref().map(|m| { @@ -423,6 +426,8 @@ impl Agent { agent.client.stream_session_mm( agent.grpc_session.clone(), chunks, + images, + match_upto, st.sampling, st.priority, readout_shape, diff --git a/src/agent/tools/vision.rs b/src/agent/tools/vision.rs index d122384..aede258 100644 --- a/src/agent/tools/vision.rs +++ b/src/agent/tools/vision.rs @@ -63,7 +63,7 @@ async fn view_image( // AppendImage (the server is authoritative for the IMAGE_PAD // count). Placeholder of 0 here until AppendImage is wired; the // leaf's count gets rewritten from the RPC response at send time. - let image_leaf = AstNode::image(bytes.clone(), mime, h, w, 0); + let image_leaf = AstNode::image(bytes.clone(), mime, h, w); let branch = AstNode::branch(Role::User, vec![image_leaf]); agent.context.lock().await.push_log(Section::Conversation, branch); diff --git a/src/mind/mod.rs b/src/mind/mod.rs index b2eb77a..9572272 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -693,7 +693,7 @@ impl Mind { } }); - let mut sub_handle: Option> = None; + let _sub_handle: Option> = None; // Start finetune scoring at startup (scores existing conversation) if !self.config.no_agents { @@ -743,6 +743,7 @@ impl Mind { _ = tokio::time::sleep(timeout), if !has_input => _dmn_expired = true, } + /* if !self.config.no_agents { if sub_handle.as_ref().map_or(true, |h| h.is_finished()) { let sub = self.subconscious.clone(); @@ -754,6 +755,7 @@ impl Mind { })); } } + */ // Check for pending user input → push to agent context and start turn let pending = self.shared.lock().unwrap().take_pending_input(); diff --git a/src/subconscious/generate.rs b/src/subconscious/generate.rs index 625b619..584d2c7 100644 --- a/src/subconscious/generate.rs +++ b/src/subconscious/generate.rs @@ -26,7 +26,7 @@ pub async fn gen_continuation( ) -> anyhow::Result where F: FnMut(&AstNode) -> bool, { - let mut chunks = context.wire_chunks(0..entry_idx, skip); + let (mut chunks, images) = context.wire_chunks(0..entry_idx, skip); // Assistant-turn prologue. let prologue = { @@ -50,19 +50,13 @@ where F: FnMut(&AstNode) -> bool, // `_guard` drops at function end. let session_lock = Arc::new(crate::Mutex::new(None)); let (mut rx, _guard) = client.stream_session_mm( - session_lock, chunks, sampling, Some(-5), None, + session_lock, chunks, images, 0, sampling, Some(-5), None, ); let mut tokens = Vec::new(); while let Some(tok) = rx.recv().await { match tok { StreamToken::Token { id, .. } => tokens.push(id), - StreamToken::ImageAppended { .. } => { - // subconscious/generate uses wire_chunks over an AST - // slice that shouldn't have unsized images — but if - // it ever does, we just don't care about updating the - // ephemeral session's AST view. - } StreamToken::Done { .. } => break, StreamToken::Error(e) => anyhow::bail!("generation error: {}", e), } diff --git a/src/subconscious/learn.rs b/src/subconscious/learn.rs index dca9b3c..feb209c 100644 --- a/src/subconscious/learn.rs +++ b/src/subconscious/learn.rs @@ -40,14 +40,15 @@ struct ScoreResult { total_logprob: f64, } -/// Convert a flat (prompt_tokens, images) pair into the interleaved -/// chunks the session protocol expects. Tokens up to the next -/// `<|vision_start|>` become a Tokens chunk; each -/// `<|vision_start|>..<|vision_end|>` run collapses into one Image -/// chunk paired by position with the next entry in `images`. The -/// server re-expands the IMAGE_PADs on AppendImage. -fn prompt_to_chunks(prompt: &[u32], images: &[WireImage]) -> Vec { - let mut out: Vec = Vec::new(); +/// Find each <|vision_start|>...<|vision_end|> run in the flat prompt +/// and pair it with the matching entry in `images`. Returns a list +/// of `ImageAttachment` with absolute pad-range positions, ready +/// to drop into `GenerateRequest.images`. +fn pair_images_to_ranges( + prompt: &[u32], + images: &[WireImage], +) -> Vec { + let mut out: Vec = Vec::new(); let mut cur = 0; let mut img_idx = 0; while cur < prompt.len() { @@ -60,22 +61,16 @@ fn prompt_to_chunks(prompt: &[u32], images: &[WireImage]) -> Vec { let img = images.get(img_idx) .unwrap_or_else(|| panic!( "image index {} out of range for {} images", img_idx, images.len())); - out.push(WireChunk::Image { + out.push(pb::ImageAttachment { bytes: img.bytes.clone(), mime: img.mime.clone(), - known_expanded_len: (end - cur) as u32, + pad_range_start: cur as u32, + pad_range_end: end as u32, }); img_idx += 1; cur = end; } else { - let next_vs = prompt[cur..].iter() - .position(|&t| t == tokenizer::VISION_START); - let end = match next_vs { - Some(o) => cur + o, - None => prompt.len(), - }; - out.push(WireChunk::Tokens(prompt[cur..end].to_vec())); - cur = end; + cur += 1; } } out @@ -95,36 +90,22 @@ async fn call_score( return Ok(Vec::new()); } - let chunks = prompt_to_chunks(prompt, images); + let images_pb = pair_images_to_ranges(prompt, images); let mut handle = SessionHandle::open(client).await?; - // Walk chunks: AppendImage for each image, prefill-only Generate - // for each text run between images. Accumulate any trailing text - // run into `pending` for the final logprob-generating Generate. - let mut pending: Vec = Vec::new(); - for chunk in chunks { - match chunk { - WireChunk::Tokens(t) => pending.extend(t), - WireChunk::Image { bytes, mime, .. } => { - if !pending.is_empty() { - handle.prefill_only(std::mem::take(&mut pending)).await?; - } - handle.append_image(bytes, mime, false).await?; - } - } - } - // Final Generate: max_tokens=0 so the server runs prefill of the - // trailing `pending` tokens and emits Token events for each - // position covered by logprobs_ranges, then Done. logprob_top_k=0 - // means "just the sampled (prompt) token's logprob" — no top-k - // alternatives, which is all call_score historically needed. + // full prompt and emits Token events for each position covered + // by logprobs_ranges, then Done. logprob_top_k=0 means "just + // the sampled (prompt) token's logprob" — no top-k alternatives, + // which is all call_score historically needed. Images attach + // inline via `images`; the prompt already contains their pre- + // expanded vision blocks at the declared ranges. let logprobs_ranges: Vec = ranges.iter() .map(|(s, e)| pb::PositionRange { start: *s as u32, end: *e as u32 }) .collect(); let req = pb::GenerateRequest { session_id: handle.session_id.clone(), - append_tokens: pending, + append_tokens: prompt.to_vec(), offset: handle.committed_len, truncating: false, max_tokens: 0, @@ -136,6 +117,7 @@ async fn call_score( top_k: 0, stop_token_ids: Vec::new(), priority: priority.unwrap_or(0), + images: images_pb, }; let mut stream = handle.generate(req).await?; diff --git a/src/user/context.rs b/src/user/context.rs index 17660b5..8edd926 100644 --- a/src/user/context.rs +++ b/src/user/context.rs @@ -43,6 +43,7 @@ impl ConsciousScreen { name: format!("mem: {}", key), tokens: node.tokens(), content: text.clone(), + token_ids: leaf.token_ids().to_vec(), children: Vec::new(), status: score.map(|s| format!("{:.2}", s)).unwrap_or_default(), }); @@ -55,6 +56,7 @@ impl ConsciousScreen { name: format!("Memory nodes ({})", mem_children.len()), tokens: mem_tokens, content: String::new(), + token_ids: Vec::new(), children: mem_children, status: format!("{} scored, {} unscored", scored, unscored), }); @@ -70,11 +72,13 @@ impl ConsciousScreen { AstNode::Leaf(leaf) => leaf.body().text().to_string(), _ => String::new(), }, + token_ids: node.token_ids(), children: match node { AstNode::Branch { children, .. } => children.iter() .map(|c| SectionView { name: c.label(), tokens: c.tokens(), content: match c { AstNode::Leaf(l) => l.body().text().to_string(), _ => String::new() }, + token_ids: match c { AstNode::Leaf(l) => l.token_ids().to_vec(), _ => c.token_ids() }, children: Vec::new(), status: String::new(), }).collect(), _ => Vec::new(), @@ -101,6 +105,7 @@ impl ConsciousScreen { name: format!("Conversation ({} entries)", conv_children.len()), tokens: conv_tokens, content: String::new(), + token_ids: Vec::new(), children: conv_children, status: String::new(), }); diff --git a/src/user/subconscious.rs b/src/user/subconscious.rs index c332ce6..c71642d 100644 --- a/src/user/subconscious.rs +++ b/src/user/subconscious.rs @@ -207,6 +207,7 @@ impl SubconsciousScreen { name: key.clone(), tokens: 0, content: val.clone(), + token_ids: Vec::new(), children: Vec::new(), status: String::new(), } @@ -238,6 +239,7 @@ impl SubconsciousScreen { name: format!("Conversation ({} entries)", conv_children.len()), tokens: conv_children.iter().map(|c| c.tokens).sum(), content: String::new(), + token_ids: Vec::new(), children: conv_children, status: String::new(), }); diff --git a/src/user/widgets.rs b/src/user/widgets.rs index 49f3e3b..6706a69 100644 --- a/src/user/widgets.rs +++ b/src/user/widgets.rs @@ -8,11 +8,18 @@ use ratatui::{ }; use crate::agent::context::{AstNode, Ast, NodeBody}; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct SectionView { pub name: String, pub tokens: usize, pub content: String, + /// Token-id stream for this subtree, displayed in place of + /// `content` when the tree's show-tokens mode is on. Populated + /// from `leaf.token_ids()` / `node.token_ids()` for views built + /// from the AST; empty for views that don't have a corresponding + /// AST node (subconscious entries, etc.), in which case the + /// token view falls back to the text content. + pub token_ids: Vec, pub children: Vec, /// Extra status text shown after the token count. pub status: String, @@ -32,6 +39,7 @@ fn node_to_view(node: &AstNode) -> SectionView { name, tokens: node.tokens(), content: leaf.body().text().to_string(), + token_ids: leaf.token_ids().to_vec(), children: Vec::new(), status, } @@ -44,6 +52,7 @@ fn node_to_view(node: &AstNode) -> SectionView { name: node.label(), tokens: node.tokens(), content: String::new(), + token_ids: node.token_ids(), children: child_views, status: String::new(), } @@ -54,10 +63,12 @@ fn node_to_view(node: &AstNode) -> SectionView { pub fn section_to_view(name: &str, nodes: &[AstNode]) -> SectionView { let children: Vec = nodes.iter().map(|n| node_to_view(n)).collect(); let total_tokens: usize = nodes.iter().map(|n| n.tokens()).sum(); + let token_ids: Vec = nodes.iter().flat_map(|n| n.token_ids()).collect(); SectionView { name: name.to_string(), tokens: total_tokens, content: String::new(), + token_ids, children, status: String::new(), } @@ -104,7 +115,7 @@ pub fn format_ts_age(ts: i64) -> String { /// Key legend for SectionTree panes. pub fn tree_legend() -> Line<'static> { Line::styled( - " ↑↓:nav →/Enter:expand ←:collapse e:expand all c:collapse all PgUp/Dn Home/End ", + " ↑↓:nav →/Enter:expand ←:collapse e:expand c:collapse v:toggle tokens/text PgUp/Dn ", Style::default().fg(Color::DarkGray), ) } @@ -185,11 +196,19 @@ pub struct SectionTree { pub selected: Option, pub expanded: std::collections::HashSet, pub scroll: super::scroll_pane::ScrollPaneState, + /// When true, render `token_ids` as space-separated IDs in place + /// of `content` in expanded panels. Toggled with 'v'. + pub show_tokens: bool, } impl SectionTree { pub fn new() -> Self { - Self { selected: None, expanded: std::collections::HashSet::new(), scroll: super::scroll_pane::ScrollPaneState::new() } + Self { + selected: None, + expanded: std::collections::HashSet::new(), + scroll: super::scroll_pane::ScrollPaneState::new(), + show_tokens: false, + } } fn total_nodes(&self, sections: &[SectionView]) -> usize { @@ -264,6 +283,9 @@ impl SectionTree { KeyCode::Char('c') => { self.expanded.clear(); } + KeyCode::Char('v') => { + self.show_tokens = !self.show_tokens; + } _ => {} } self.scroll_to_selected(height); @@ -326,7 +348,12 @@ impl SectionTree { } } else if has_content { let content_indent = format!("{} │ ", " ".repeat(depth + 1)); - let content_lines: Vec<&str> = section.content.lines().collect(); + let body = if self.show_tokens && !section.token_ids.is_empty() { + format_token_ids_wrapped(§ion.token_ids) + } else { + section.content.clone() + }; + let content_lines: Vec<&str> = body.lines().collect(); let show = content_lines.len().min(50); for line in &content_lines[..show] { lines.push(Line::styled( @@ -344,3 +371,16 @@ impl SectionTree { } } } + +/// Format token IDs for the content panel: space-separated, wrapped +/// at 12 ids per line so they fit comfortably in a pane. +fn format_token_ids_wrapped(ids: &[u32]) -> String { + let mut out = String::new(); + for (i, id) in ids.iter().enumerate() { + if i > 0 { + if i % 12 == 0 { out.push('\n'); } else { out.push(' '); } + } + out.push_str(&id.to_string()); + } + out +}