salience: client-side pad expansion, drop AppendImage

Mirrors the vLLM-side rewrite. AppendImage is gone; images now
ride along on Generate via a parallel `images` list.

- Productionize `qwen3_image_token_count` (was test-only). Image
  leaf computes its IMAGE_PAD count eagerly at construction from
  height/width; `token_count` is no longer "0 until the server
  tells us."
- WireChunk shrinks to a single `Tokens(Vec<u32>)` variant — vision
  blocks live inline in the token stream.
- `wire_chunks` now returns `(Vec<WireChunk>, Vec<WireImage>)`.
  `WireImage` carries `pad_start` / `pad_end` (absolute positions
  in the full walk) alongside bytes + mime.
- `assemble_prompt` returns `(chunks, images, match_upto)`.
- `stream_session_mm` / `run_session_generate` take the parallel
  images list, filter to those past `match_upto`, and pass them
  in `GenerateRequest.images` as `pb::ImageAttachment` entries.
- Drop `SessionHandle::append_image`,
  `ContextState::commit_image_token_counts`,
  `StreamToken::ImageAppended`, the WireChunk::Image branch in
  `learn.rs`, and the now-empty `prompt_to_chunks` helper.
- Add 'v' toggle on the conscious-screen tree to render token-id
  vectors in place of text content (debug-aid: lets us see what
  the server actually has when output is suspicious).
- Comment out the subconscious-trigger spawn loop — Kent had this
  disabled before; it had crept back into running.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-24 20:26:47 -04:00
commit fe232cf292
12 changed files with 468 additions and 306 deletions

View file

@ -58,21 +58,26 @@ service Salience {
// boundary). // boundary).
rpc ForkSession(ForkSessionRequest) returns (ForkSessionResponse); rpc ForkSession(ForkSessionRequest) returns (ForkSessionResponse);
// Append an image to the session. Server decodes, runs vLLM's // Prefill + optionally decode. Images are attached inline via
// multimodal pipeline to compute N (IMAGE_PAD count), and writes // `GenerateRequest.images`; the client writes its own pre-expanded
// the whole vision block into session.tokens. Returns N and the // <|vision_start|> + N*<|image_pad|> + <|vision_end|> runs into
// new total length. // `append_tokens` and declares each run's range in `images[i]`.
rpc AppendImage(AppendImageRequest) returns (AppendImageResponse); // Server validates run length against the actual vision-encoder
// feature count and returns INVALID_ARGUMENT on mismatch. Stream
// Prefill + optionally decode. See GenerateRequest for full // yields Token events (with optional readouts / logprobs per
// semantics; stream yields Token events (with optional readouts / // position) followed by a terminating Done.
// logprobs per position) followed by a terminating Done.
rpc Generate(GenerateRequest) returns (stream GenerateEvent); rpc Generate(GenerateRequest) returns (stream GenerateEvent);
// Readout manifest for the currently-loaded model concept names, // Readout manifest for the currently-loaded model concept names,
// layer indices, tensor dtype. Stateless; fetch once at client // layer indices, tensor dtype. Stateless; fetch once at client
// startup and cache. // startup and cache.
rpc GetReadoutManifest(GetReadoutManifestRequest) returns (ReadoutManifest); rpc GetReadoutManifest(GetReadoutManifestRequest) returns (ReadoutManifest);
// Dump the full token stream of a session. Debug-only: used by the
// client to verify its local accounting against the server's
// session.tokens byte-for-byte when divergence is suspected. Not
// cheap copies the whole sequence across the wire.
rpc DumpSession(DumpSessionRequest) returns (DumpSessionResponse);
} }
// ============================================================ // ============================================================
@ -106,55 +111,47 @@ message ForkSessionResponse {
string session_id = 1; // new session string session_id = 1; // new session
} }
// ============================================================
// Mutation
// ============================================================
message AppendImageRequest {
string session_id = 1;
// Image bytes (PNG / JPEG / WebP / ).
bytes data = 2;
// MIME type, e.g. "image/png".
string mime = 3;
// Client's view of the session's current token length. Must equal
// the server's actual length, OR be strictly less when
// truncating=true. Any mismatch is FAILED_PRECONDITION.
uint32 offset = 4;
// If true, server truncates session.tokens to `offset` before
// appending. Rejected with FAILED_PRECONDITION if the truncation
// would split an image block.
bool truncating = 5;
}
message AppendImageResponse {
// Count of <|image_pad|> tokens inside the vision block. Does not
// include the <|vision_start|> / <|vision_end|> bookends, which
// contribute one token each.
uint32 placeholder_count = 1;
// Session's total token length after this append, including both
// bookends (= offset + placeholder_count + 2, barring truncation).
uint32 total_length = 2;
}
// ============================================================ // ============================================================
// Inference // Inference
// ============================================================ // ============================================================
// One image attached to a Generate call. The client is responsible
// for writing the expanded placeholder run (VISION_START +
// N*IMAGE_PAD + VISION_END) into `GenerateRequest.append_tokens` at
// positions [pad_range_start, pad_range_end) and pairing it with
// the corresponding `ImageAttachment` entry. Server validates that
// the declared range's pad count matches what the vision encoder
// produces, and returns INVALID_ARGUMENT if they disagree.
message ImageAttachment {
// Image bytes (PNG / JPEG / WebP / ).
bytes bytes = 1;
// MIME type, e.g. "image/png".
string mime = 2;
// Absolute token positions (in `session.tokens` AFTER `append_tokens`
// is applied) spanning the full vision block `[vision_start,
// pad*N, vision_end]`. end is exclusive, so end - start == N + 2.
uint32 pad_range_start = 3;
uint32 pad_range_end = 4;
}
message GenerateRequest { message GenerateRequest {
string session_id = 1; string session_id = 1;
// Tokens to append before prefill. May be empty. Client must NOT // Tokens to append before prefill. May be empty. Client writes the
// include vision tokens (<|vision_start|>, <|image_pad|>, // full vision block (VISION_START + N*IMAGE_PAD + VISION_END) for
// <|vision_end|>) those live in the session via AppendImage. // any newly-attached image directly into this stream; each such
// block must be paired with a matching entry in `images`. The
// server validates that the declared ranges all point at IMAGE_PAD
// runs and that each run's length matches what the vision encoder
// produces for the corresponding image.
repeated uint32 append_tokens = 2; repeated uint32 append_tokens = 2;
// Offset / truncating same semantics as AppendImage. Truncation // Client's view of session.tokens length at the time of the call.
// that splits an image block is FAILED_PRECONDITION. // Must equal server's actual length, OR be strictly less when
// truncating=true (server rewinds before appending). Any other
// mismatch is FAILED_PRECONDITION.
uint32 offset = 3; uint32 offset = 3;
bool truncating = 4; bool truncating = 4;
@ -185,6 +182,12 @@ message GenerateRequest {
// vLLM scheduler priority (0 = interactive, 10 = batch). // vLLM scheduler priority (0 = interactive, 10 = batch).
int32 priority = 13; int32 priority = 13;
// Images newly attached on this call. Each entry describes one
// image's binary bytes, its mime type, and the exact token-position
// range of its pre-expanded placeholder run inside `session.tokens`
// after `append_tokens` is applied. See `ImageAttachment`.
repeated ImageAttachment images = 14;
} }
message PositionRange { message PositionRange {
@ -258,3 +261,16 @@ message ReadoutManifest {
uint32 hidden_size = 3; uint32 hidden_size = 3;
string dtype = 4; string dtype = 4;
} }
// ============================================================
// Debug
// ============================================================
message DumpSessionRequest {
string session_id = 1;
}
message DumpSessionResponse {
// The full session.tokens sequence, verbatim.
repeated uint32 tokens = 1 [packed = true];
}

View file

@ -73,12 +73,6 @@ pub enum StreamToken {
/// `readout` is `None` when the server has readout disabled or /// `readout` is `None` when the server has readout disabled or
/// returned no readout for this chunk. /// returned no readout for this chunk.
Token { id: u32, readout: Option<TokenReadout> }, Token { id: u32, readout: Option<TokenReadout> },
/// An image was committed server-side via AppendImage during this
/// stream. `placeholder_count` is the N IMAGE_PADs the server
/// wrote. Emitted in AST order — caller applies these counts to
/// the first-N image leaves that currently have token_count=0
/// via `ContextState::commit_image_token_counts`.
ImageAppended { placeholder_count: u32 },
Done { usage: Option<Usage> }, Done { usage: Option<Usage> },
Error(String), Error(String),
} }
@ -150,6 +144,8 @@ impl ApiClient {
&self, &self,
session_lock: std::sync::Arc<crate::Mutex<Option<salience::SessionHandle>>>, session_lock: std::sync::Arc<crate::Mutex<Option<salience::SessionHandle>>>,
chunks: Vec<super::context::WireChunk>, chunks: Vec<super::context::WireChunk>,
images: Vec<super::context::WireImage>,
match_upto: u32,
sampling: SamplingParams, sampling: SamplingParams,
priority: Option<i32>, priority: Option<i32>,
readout_shape: Option<(u32, u32)>, readout_shape: Option<(u32, u32)>,
@ -159,8 +155,8 @@ impl ApiClient {
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
let result = run_session_generate( let result = run_session_generate(
session_lock, &client, chunks, sampling, priority, session_lock, &client, chunks, images, match_upto, sampling,
readout_shape, &tx, priority, readout_shape, &tx,
).await; ).await;
if let Err(e) = result { if let Err(e) = result {
log::warn!(target: "grpc", log::warn!(target: "grpc",
@ -220,6 +216,8 @@ async fn run_session_generate(
session_lock: std::sync::Arc<crate::Mutex<Option<salience::SessionHandle>>>, session_lock: std::sync::Arc<crate::Mutex<Option<salience::SessionHandle>>>,
client: &ApiClient, client: &ApiClient,
chunks: Vec<super::context::WireChunk>, chunks: Vec<super::context::WireChunk>,
images: Vec<super::context::WireImage>,
match_upto: u32,
sampling: SamplingParams, sampling: SamplingParams,
priority: Option<i32>, priority: Option<i32>,
readout_shape: Option<(u32, u32)>, readout_shape: Option<(u32, u32)>,
@ -242,67 +240,68 @@ async fn run_session_generate(
} }
}; };
// Skip chunks already on the server. committed_len must land on // If the client believes the match extends only up to `match_upto`
// a chunk boundary — every successful AppendImage / Generate // but the server has more, we need to rewind. For v1 the match is
// advances committed_len by exactly one chunk's contribution, // either whole or broken — `match_upto` is always 0 on any mutation
// so straddling means divergence (client's AST was rewritten // — so the cheapest correct recovery is to drop the session and
// under us). // open a fresh one.
let mut acc: u32 = 0; if match_upto < handle.committed_len {
let mut delta_start = chunks.len(); log::warn!(target: "grpc",
for (i, chunk) in chunks.iter().enumerate() { "session rewind: match_upto={} < committed_len={} — reopening session (resending {} bytes)",
if acc == handle.committed_len { match_upto, handle.committed_len, handle.committed_len - match_upto);
delta_start = i; drop(handle);
break; handle = salience::SessionHandle::open(client).await?;
}
let len = match chunk {
WireChunk::Tokens(t) => t.len() as u32,
WireChunk::Image { known_expanded_len, .. } => *known_expanded_len,
};
if len == 0 {
anyhow::bail!(
"session divergence: chunk {} has unknown length but \
precedes committed_len {} (acc={})",
i, handle.committed_len, acc,
);
}
if acc + len > handle.committed_len {
anyhow::bail!(
"session divergence: chunk {} straddles committed_len \
(acc={}, len={}, committed={})",
i, acc, len, handle.committed_len,
);
}
acc += len;
}
if acc != handle.committed_len {
anyhow::bail!(
"session divergence: chunks sum to {} but committed_len is {}",
acc, handle.committed_len,
);
} }
// Walk the delta: accumulate Tokens in `pending`; on Image, // Walk chunks at byte-level, taking everything past `match_upto`
// flush pending via prefill-only Generate then AppendImage. // as the delta. Token chunks can be split mid-way; images live
// inline in the token stream, so there's no separate image-chunk
// case anymore.
let mut acc: u32 = 0;
let mut pending: Vec<u32> = Vec::new(); let mut pending: Vec<u32> = Vec::new();
for chunk in &chunks[delta_start..] { for chunk in chunks.iter() {
match chunk { match chunk {
WireChunk::Tokens(t) => pending.extend_from_slice(t), WireChunk::Tokens(t) => {
WireChunk::Image { bytes, mime, .. } => { let len = t.len() as u32;
if !pending.is_empty() { let chunk_end = acc + len;
handle.prefill_only(std::mem::take(&mut pending)).await?; if chunk_end <= match_upto {
acc = chunk_end;
} else if acc < match_upto {
let skip = (match_upto - acc) as usize;
pending.extend_from_slice(&t[skip..]);
acc = chunk_end;
} else {
pending.extend_from_slice(t);
acc = chunk_end;
} }
let resp = handle }
.append_image(bytes.clone(), mime.clone(), false) }
.await?; }
log::debug!(target: "grpc",
"AppendImage: N={} total_length={}", // Filter images to those entirely past `match_upto` — anything
resp.placeholder_count, resp.total_length); // before is on the server already (prior turn), anything
let _ = tx.send(StreamToken::ImageAppended { // straddling is a hard divergence (image partially-sent shouldn't
placeholder_count: resp.placeholder_count, // happen with our atomic AppendImage history; with images-inline
// it can only happen if mark_dirty cleared match_upto mid-block,
// which the AST mutators prevent).
let mut new_images: Vec<pb::ImageAttachment> = Vec::new();
for img in &images {
if img.pad_end <= match_upto {
continue; // already sent on a prior turn
}
if img.pad_start < match_upto {
anyhow::bail!(
"session divergence: image at [{},{}) straddles match_upto={}",
img.pad_start, img.pad_end, match_upto,
);
}
new_images.push(pb::ImageAttachment {
bytes: img.bytes.clone(),
mime: img.mime.clone(),
pad_range_start: img.pad_start,
pad_range_end: img.pad_end,
}); });
} }
}
}
// Final Generate: pending holds any trailing text; decode up to // Final Generate: pending holds any trailing text; decode up to
// sampling.max_tokens. Request readouts on all decode positions // sampling.max_tokens. Request readouts on all decode positions
@ -331,6 +330,7 @@ async fn run_session_generate(
top_k: sampling.top_k, top_k: sampling.top_k,
stop_token_ids: Vec::new(), stop_token_ids: Vec::new(),
priority: priority.unwrap_or(0), priority: priority.unwrap_or(0),
images: new_images,
}; };
let session_id_for_log = handle.session_id.clone(); let session_id_for_log = handle.session_id.clone();
let t_generate = Instant::now(); let t_generate = Instant::now();

View file

@ -94,6 +94,8 @@ pub struct SessionHandle {
impl SessionHandle { impl SessionHandle {
pub async fn open(client: &super::ApiClient) -> Result<Self> { pub async fn open(client: &super::ApiClient) -> Result<Self> {
let t0 = std::time::Instant::now();
log::debug!(target: "grpc", "OpenSession rpc: start");
let mut c = client.salience_client().await?; let mut c = client.salience_client().await?;
let mut req = tonic::Request::new(pb::OpenSessionRequest { let mut req = tonic::Request::new(pb::OpenSessionRequest {
model: client.model.clone(), model: client.model.clone(),
@ -105,8 +107,8 @@ impl SessionHandle {
.with_context(|| "OpenSession RPC failed")? .with_context(|| "OpenSession RPC failed")?
.into_inner(); .into_inner();
log::debug!(target: "grpc", log::debug!(target: "grpc",
"SessionHandle::open session_id={} max_model_len={}", "OpenSession rpc: done session_id={} max_model_len={} elapsed={:?}",
resp.session_id, resp.max_model_len); resp.session_id, resp.max_model_len, t0.elapsed());
Ok(Self { Ok(Self {
session_id: resp.session_id, session_id: resp.session_id,
max_model_len: resp.max_model_len, max_model_len: resp.max_model_len,
@ -117,30 +119,21 @@ impl SessionHandle {
pub fn client(&self) -> &super::ApiClient { &self.client } pub fn client(&self) -> &super::ApiClient { &self.client }
/// Append an image via the server-side vision block. Updates /// Debug-only: fetch the server's full session.tokens. Used to
/// `committed_len` from the server's response on success. /// verify client-side accounting byte-for-byte when divergence
pub async fn append_image( /// is suspected. Not cheap on large sessions.
&mut self, pub async fn dump_tokens(&self) -> Result<Vec<u32>> {
data: Vec<u8>,
mime: String,
truncating: bool,
) -> Result<pb::AppendImageResponse> {
let mut c = self.client.salience_client().await?; let mut c = self.client.salience_client().await?;
let mut req = tonic::Request::new(pb::AppendImageRequest { let mut req = tonic::Request::new(pb::DumpSessionRequest {
session_id: self.session_id.clone(), session_id: self.session_id.clone(),
data,
mime,
offset: self.committed_len,
truncating,
}); });
with_auth(&mut req, self.client.api_key()); with_auth(&mut req, self.client.api_key());
let resp = c let resp = c
.append_image(req) .dump_session(req)
.await .await
.with_context(|| "AppendImage RPC failed")? .with_context(|| "DumpSession RPC failed")?
.into_inner(); .into_inner();
self.committed_len = resp.total_length; Ok(resp.tokens)
Ok(resp)
} }
/// Open a gRPC Generate stream with the given request. Caller /// Open a gRPC Generate stream with the given request. Caller
@ -151,6 +144,10 @@ impl SessionHandle {
&self, &self,
req: pb::GenerateRequest, req: pb::GenerateRequest,
) -> Result<tonic::Streaming<pb::GenerateEvent>> { ) -> Result<tonic::Streaming<pb::GenerateEvent>> {
let t0 = std::time::Instant::now();
log::debug!(target: "grpc",
"Generate rpc: open-stream session={} offset={} append={} max_tokens={}",
self.session_id, req.offset, req.append_tokens.len(), req.max_tokens);
let mut c = self.client.salience_client().await?; let mut c = self.client.salience_client().await?;
let mut req = tonic::Request::new(req); let mut req = tonic::Request::new(req);
with_auth(&mut req, self.client.api_key()); with_auth(&mut req, self.client.api_key());
@ -158,6 +155,9 @@ impl SessionHandle {
.generate(req) .generate(req)
.await .await
.with_context(|| "Generate RPC failed")?; .with_context(|| "Generate RPC failed")?;
log::debug!(target: "grpc",
"Generate rpc: stream opened session={} open-latency={:?}",
self.session_id, t0.elapsed());
Ok(resp.into_inner()) Ok(resp.into_inner())
} }
@ -183,6 +183,7 @@ impl SessionHandle {
top_k: 0, top_k: 0,
stop_token_ids: Vec::new(), stop_token_ids: Vec::new(),
priority: 0, priority: 0,
images: Vec::new(),
}; };
let mut stream = self.generate(req).await?; let mut stream = self.generate(req).await?;
while let Some(event) = stream.next().await { while let Some(event) = stream.next().await {

View file

@ -143,6 +143,13 @@ pub enum AstNode {
/// Maps memory key → divergence score for this response. /// Maps memory key → divergence score for this response.
#[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")] #[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")]
memory_scores: std::collections::BTreeMap<String, f64>, memory_scores: std::collections::BTreeMap<String, f64>,
/// Cached token stream for the subtree. When `Some`, wire-out
/// uses these bytes verbatim and skips recursion into children.
/// Populated by the response parser from the server's exact
/// stream; also computable from children as a fallback. Cleared
/// on any edit to a descendant. Not serialized — transient.
#[serde(skip, default)]
token_ids: Option<Vec<u32>>,
}, },
} }
@ -155,6 +162,14 @@ pub struct ContextState {
journal: Vec<AstNode>, journal: Vec<AstNode>,
conversation: Vec<AstNode>, conversation: Vec<AstNode>,
pub conversation_log: Option<crate::mind::log::ConversationLog>, pub conversation_log: Option<crate::mind::log::ConversationLog>,
/// Length of the session's token stream on the server, as of the
/// last Done event. Updated by the grpc layer.
server_committed_len: u32,
/// Prefix length of our walk that still matches the server's
/// session.tokens byte-for-byte. When < `server_committed_len`
/// the session needs rewinding (truncating=true at this offset).
/// Reset to 0 on any mutation that could have changed sent bytes.
client_match_upto: u32,
} }
impl Clone for ContextState { impl Clone for ContextState {
@ -165,6 +180,8 @@ impl Clone for ContextState {
journal: self.journal.clone(), journal: self.journal.clone(),
conversation: self.conversation.clone(), conversation: self.conversation.clone(),
conversation_log: None, // forked contexts don't log conversation_log: None, // forked contexts don't log
server_committed_len: self.server_committed_len,
client_match_upto: self.client_match_upto,
} }
} }
} }
@ -201,6 +218,10 @@ pub struct ResponseParser {
think_buf: String, think_buf: String,
in_tool_call: bool, in_tool_call: bool,
tool_call_buf: String, tool_call_buf: String,
/// Raw generated token IDs, in arrival order. Combined with the
/// prologue at `finish` to stamp the Branch's authoritative
/// token cache — the bytes the server has for this branch.
generated_tokens: Vec<u32>,
} }
impl Role { impl Role {
@ -369,8 +390,11 @@ impl AstNode {
mime: impl Into<String>, mime: impl Into<String>,
orig_height: u32, orig_height: u32,
orig_width: u32, orig_width: u32,
token_count: u32,
) -> Self { ) -> Self {
// Pad count is computed eagerly from dimensions — no more
// "unknown until server responds" shape. Server validates
// on the Generate call; mismatches fail loud.
let token_count = qwen3_image_token_count(orig_height, orig_width);
Self::Leaf(NodeLeaf::new(NodeBody::Image { Self::Leaf(NodeLeaf::new(NodeBody::Image {
bytes, bytes,
mime: mime.into(), mime: mime.into(),
@ -383,7 +407,13 @@ impl AstNode {
// -- Branch constructors -------------------------------------------------- // -- Branch constructors --------------------------------------------------
pub fn branch(role: Role, children: Vec<AstNode>) -> Self { pub fn branch(role: Role, children: Vec<AstNode>) -> Self {
Self::Branch { role, children, timestamp: Utc::now(), memory_scores: Default::default() } Self::Branch {
role,
children,
timestamp: Utc::now(),
memory_scores: Default::default(),
token_ids: None,
}
} }
pub fn system_msg(text: impl Into<String>) -> Self { pub fn system_msg(text: impl Into<String>) -> Self {
@ -392,6 +422,7 @@ impl AstNode {
children: vec![Self::content(text)], children: vec![Self::content(text)],
timestamp: Utc::now(), timestamp: Utc::now(),
memory_scores: Default::default(), memory_scores: Default::default(),
token_ids: None,
} }
} }
@ -401,6 +432,7 @@ impl AstNode {
children: vec![Self::content(text)], children: vec![Self::content(text)],
timestamp: Utc::now(), timestamp: Utc::now(),
memory_scores: Default::default(), memory_scores: Default::default(),
token_ids: None,
} }
} }
@ -412,11 +444,12 @@ impl AstNode {
let token_ids = leaf.body.compute_token_ids(); let token_ids = leaf.body.compute_token_ids();
Self::Leaf(NodeLeaf { token_ids, ..leaf }) Self::Leaf(NodeLeaf { token_ids, ..leaf })
} }
Self::Branch { role, children, timestamp, memory_scores } => Self::Branch { Self::Branch { role, children, timestamp, memory_scores, .. } => Self::Branch {
role, role,
children: children.into_iter().map(|c| c.retokenize()).collect(), children: children.into_iter().map(|c| c.retokenize()).collect(),
timestamp, timestamp,
memory_scores, memory_scores,
token_ids: None,
}, },
} }
} }
@ -493,7 +526,10 @@ impl AstNode {
fn token_ids_into(&self, out: &mut Vec<u32>) { fn token_ids_into(&self, out: &mut Vec<u32>) {
match self { match self {
Self::Leaf(leaf) => out.extend_from_slice(&leaf.token_ids), Self::Leaf(leaf) => out.extend_from_slice(&leaf.token_ids),
Self::Branch { role, children, .. } => { Self::Branch { token_ids: Some(cached), .. } => {
out.extend_from_slice(cached);
}
Self::Branch { role, children, token_ids: None, .. } => {
out.push(tokenizer::IM_START); out.push(tokenizer::IM_START);
out.extend(tokenizer::encode(&format!("{}\n", role.as_str()))); out.extend(tokenizer::encode(&format!("{}\n", role.as_str())));
for child in children { for child in children {
@ -522,7 +558,8 @@ impl Ast for AstNode {
fn tokens(&self) -> usize { fn tokens(&self) -> usize {
match self { match self {
Self::Leaf(leaf) => leaf.tokens(), Self::Leaf(leaf) => leaf.tokens(),
Self::Branch { role, children, .. } => { Self::Branch { token_ids: Some(cached), .. } => cached.len(),
Self::Branch { role, children, token_ids: None, .. } => {
1 + role_header_tokens(*role) 1 + role_header_tokens(*role)
+ children.iter().map(|c| c.tokens()).sum::<usize>() + children.iter().map(|c| c.tokens()).sum::<usize>()
+ 1 + newline_tokens() + 1 + newline_tokens()
@ -676,6 +713,7 @@ impl ResponseParser {
think_buf: String::new(), think_buf: String::new(),
in_tool_call: false, in_tool_call: false,
tool_call_buf: String::new(), tool_call_buf: String::new(),
generated_tokens: Vec::new(),
} }
} }
@ -706,6 +744,7 @@ impl ResponseParser {
buf.push(id, r); buf.push(id, r);
} }
} }
parser.generated_tokens.push(id);
let text = super::tokenizer::decode(&[id]); let text = super::tokenizer::decode(&[id]);
full_text.push_str(&text); full_text.push_str(&text);
let mut ctx = agent.context.lock().await; let mut ctx = agent.context.lock().await;
@ -740,21 +779,15 @@ impl ResponseParser {
let _ = writeln!(f, " unparsed text: {}", &full_text[..end]); let _ = writeln!(f, " unparsed text: {}", &full_text[..end]);
} }
} }
if let Some(u) = usage { if let Some(ref u) = usage {
agent.state.lock().await.last_prompt_tokens = u.prompt_tokens; agent.state.lock().await.last_prompt_tokens = u.prompt_tokens;
} }
let mut ctx = agent.context.lock().await; let mut ctx = agent.context.lock().await;
parser.finish(&mut ctx); parser.finish(&mut ctx);
return Ok(()); if let Some(u) = usage {
ctx.note_session_synced(u.total_tokens);
} }
super::api::StreamToken::ImageAppended { placeholder_count } => { return Ok(());
// Commit the server-authoritative IMAGE_PAD
// count into the first zero-count image leaf
// in wire order. AppendImage always runs
// before the final Generate, so this fires
// before any Token events for this stream.
let mut ctx = agent.context.lock().await;
ctx.commit_image_token_counts(&[placeholder_count]);
} }
super::api::StreamToken::Error(e) => { super::api::StreamToken::Error(e) => {
return Err(anyhow::anyhow!("{}", e)); return Err(anyhow::anyhow!("{}", e));
@ -842,7 +875,7 @@ impl ResponseParser {
} }
fn push_child(&self, ctx: &mut ContextState, child: AstNode) { fn push_child(&self, ctx: &mut ContextState, child: AstNode) {
ctx.push_child(Section::Conversation, self.branch_idx, child); ctx.push_child_raw(Section::Conversation, self.branch_idx, child);
} }
fn flush_content(&mut self, ctx: &mut ContextState) { fn flush_content(&mut self, ctx: &mut ContextState) {
@ -860,6 +893,29 @@ impl ResponseParser {
self.content_parts.push(std::mem::take(&mut self.buf)); self.content_parts.push(std::mem::take(&mut self.buf));
} }
self.flush_content(ctx); self.flush_content(ctx);
// Stamp the authoritative token cache onto the branch.
// Layout mirrors the full chat-template rendering of a
// message block:
//
// IM_START + "assistant\n" [+ "<think>\n"] (prologue — what we sent)
// + generated_tokens (what the server generated, ends in IM_END)
// + "\n" (trailing newline — template-required)
//
// Server only has through the IM_END (model stops on it,
// doesn't emit "\n"). Match-upto lands inside the cache
// right after IM_END; the chunk-walk's straddle path picks
// up the trailing "\n" as the head of the next turn's delta.
// The "\n" between turns matters: without it Qwen sees
// `<|im_end|><|im_start|>` back-to-back (no newline) and
// responds with garbage.
let prologue_text = if self.in_think { "assistant\n<think>\n" } else { "assistant\n" };
let mut cache = Vec::with_capacity(1 + self.generated_tokens.len() + 8);
cache.push(tokenizer::IM_START);
cache.extend(tokenizer::encode(prologue_text));
cache.extend(self.generated_tokens);
cache.extend(tokenizer::encode("\n"));
ctx.set_branch_cache(Section::Conversation, self.branch_idx, cache);
} }
} }
@ -871,9 +927,39 @@ impl ContextState {
journal: Vec::new(), journal: Vec::new(),
conversation: Vec::new(), conversation: Vec::new(),
conversation_log: None, conversation_log: None,
server_committed_len: 0,
client_match_upto: 0,
} }
} }
// -- Server sync tracking -------------------------------------------------
/// Length of the session's token stream on the server. Updated by
/// the grpc layer from Generate Done events.
pub fn server_committed_len(&self) -> u32 { self.server_committed_len }
/// Prefix of our walk we still believe matches the server
/// byte-for-byte. If less than `server_committed_len`, the next
/// Generate must send `truncating=true` at this offset.
pub fn client_match_upto(&self) -> u32 { self.client_match_upto }
/// Called by the grpc layer after a successful Generate Done:
/// records both the server's new length and the fact that we
/// match up to it (we just sent everything).
pub fn note_session_synced(&mut self, total_tokens: u32) {
self.server_committed_len = total_tokens;
self.client_match_upto = total_tokens;
}
/// Reset match-upto to 0. Called from every mutation that could
/// have touched a region the server already has. For now,
/// conservatively drops alignment entirely — finer-grained
/// tracking (match-upto at the mutated node's offset) is a
/// future optimization.
fn mark_dirty(&mut self) {
self.client_match_upto = 0;
}
// -- Read access ---------------------------------------------------------- // -- Read access ----------------------------------------------------------
pub fn system(&self) -> &[AstNode] { &self.system } pub fn system(&self) -> &[AstNode] { &self.system }
@ -886,35 +972,6 @@ impl ContextState {
[&self.system, &self.identity, &self.journal, &self.conversation] [&self.system, &self.identity, &self.journal, &self.conversation]
} }
/// Walk image leaves across all sections in wire order and fill in
/// the first N leaves that have `token_count == 0` with successive
/// values from `counts`. Used after a gRPC session's stream of
/// AppendImage responses to commit the server's IMAGE_PAD counts
/// back into the AST so the next wire walk doesn't see zero-count
/// images in the already-committed prefix.
pub fn commit_image_token_counts(&mut self, counts: &[u32]) {
fn visit(node: &mut AstNode, counts: &[u32], idx: &mut usize) {
if *idx >= counts.len() { return; }
match node {
AstNode::Leaf(leaf) => {
if let NodeBody::Image { token_count, .. } = leaf.body() {
if *token_count == 0 {
leaf.set_image_token_count(counts[*idx]);
*idx += 1;
}
}
}
AstNode::Branch { children, .. } => {
for c in children { visit(c, counts, idx); }
}
}
}
let mut idx = 0usize;
for node in &mut self.system { visit(node, counts, &mut idx); }
for node in &mut self.identity { visit(node, counts, &mut idx); }
for node in &mut self.journal { visit(node, counts, &mut idx); }
for node in &mut self.conversation { visit(node, counts, &mut idx); }
}
} }
impl Ast for ContextState { impl Ast for ContextState {
@ -947,55 +1004,57 @@ impl Ast for ContextState {
} }
/// An image collected from the AST for a request body. The AST stores /// An image collected from the AST for a request body. The AST stores
/// the pre-expanded token form (`<|vision_start|> + <|image_pad|>×N + /// Image metadata collected during `wire_chunks` — the binary +
/// <|vision_end|>`), and the wire form mirrors that exactly so the /// mime plus the absolute token-position range of the image's
/// server's `session.tokens` length matches what vLLM's engine will /// pre-expanded placeholder run in the full wire stream. Sent
/// process. The authoritative N is obtained from the server via the /// alongside `append_tokens` in `GenerateRequest` so the server
/// CountImageTokens RPC before the Image leaf is constructed. /// can attach vision features to the declared positions. Positions
/// are absolute within the full wire walk starting at offset 0,
/// i.e. the same coordinate system as `session.tokens` on the
/// server once the walk has been applied.
#[derive(Clone)] #[derive(Clone)]
pub struct WireImage { pub struct WireImage {
pub bytes: Vec<u8>, pub bytes: Vec<u8>,
pub mime: String, pub mime: String,
pub pad_start: u32,
pub pad_end: u32,
} }
/// One piece of the wire stream for the gRPC session path. Runs of /// One piece of the wire stream for the gRPC session path. Since
/// text/tool/thinking tokens are batched into `Tokens`; each Image /// images now live inline in the token stream (pre-expanded at AST
/// leaf becomes its own `Image` chunk because the server writes the /// construction time), there's only one variant — a run of tokens.
/// full vision block on AppendImage — the client never sends vision /// The parallel `Vec<WireImage>` returned by `wire_chunks` gives the
/// tokens inline. Order matches the AST's depth-first wire order. /// binary + position metadata for each embedded image.
#[derive(Clone)] #[derive(Clone)]
pub enum WireChunk { pub enum WireChunk {
Tokens(Vec<u32>), Tokens(Vec<u32>),
Image {
bytes: Vec<u8>,
mime: String,
/// Client's current best guess at how many tokens the server
/// will expand this image to, including bookends. `0` means
/// the count is unknown (view_image just loaded the image and
/// AppendImage hasn't run yet). Callers use this only to know
/// this chunk's contribution to the server-visible length for
/// offset bookkeeping on chunks that were already appended on
/// a prior turn.
known_expanded_len: u32,
},
} }
fn wire_into(node: &AstNode, tokens: &mut Vec<u32>, images: &mut Vec<WireImage>) { fn wire_into(node: &AstNode, tokens: &mut Vec<u32>, images: &mut Vec<WireImage>) {
match node { match node {
AstNode::Leaf(leaf) => match leaf.body() { AstNode::Leaf(leaf) => match leaf.body() {
NodeBody::Image { bytes, mime, .. } => { NodeBody::Image { bytes, mime, .. } => {
// Send the pre-expanded token form (includes N // The Image leaf's token_ids is already
// <|image_pad|> tokens); engine's multi_modal // [VISION_START, IMAGE_PAD * N, VISION_END]. Inline
// pipeline pairs them with the binary data below. // those into the token stream and record the pad-run
// range so the server can attach features to the
// declared positions.
let pad_start = tokens.len() as u32;
tokens.extend_from_slice(leaf.token_ids()); tokens.extend_from_slice(leaf.token_ids());
let pad_end = tokens.len() as u32;
images.push(WireImage { images.push(WireImage {
bytes: bytes.clone(), bytes: bytes.clone(),
mime: mime.clone(), mime: mime.clone(),
pad_start,
pad_end,
}); });
} }
_ => tokens.extend_from_slice(leaf.token_ids()), _ => tokens.extend_from_slice(leaf.token_ids()),
}, },
AstNode::Branch { role, children, .. } => { AstNode::Branch { token_ids: Some(cached), .. } => {
tokens.extend_from_slice(cached);
}
AstNode::Branch { role, children, token_ids: None, .. } => {
tokens.push(tokenizer::IM_START); tokens.push(tokenizer::IM_START);
tokens.extend(tokenizer::encode(&format!("{}\n", role.as_str()))); tokens.extend(tokenizer::encode(&format!("{}\n", role.as_str())));
for c in children { for c in children {
@ -1118,10 +1177,16 @@ impl ContextState {
} }
/// Build the wire stream as interleaved `WireChunk`s for the gRPC /// Build the wire stream as interleaved `WireChunk`s for the gRPC
/// session path. Unlike `wire_prompt`, this preserves the order /// session path. Returns a tuple of (chunks, images): the chunks
/// of text runs vs image blocks so the caller can drive the /// hold the full token stream (with vision blocks inlined as
/// append flow (AppendImage for each Image, Generate append for /// `VISION_START + IMAGE_PAD*N + VISION_END`), and the images
/// contiguous text runs). /// list carries each embedded image's binary + position range so
/// the gRPC layer can attach them via `GenerateRequest.images`.
///
/// Note: with images inlined into the token stream, the chunks
/// list is structurally a single `Tokens` chunk in the common
/// case — the multi-chunk shape persists only because some
/// callers may want the option of inserting breakpoints later.
/// ///
/// `conv_range` and `skip` mirror `wire_prompt` — select a /// `conv_range` and `skip` mirror `wire_prompt` — select a
/// conversation slice and drop identity / conversation nodes by /// conversation slice and drop identity / conversation nodes by
@ -1130,46 +1195,43 @@ impl ContextState {
&self, &self,
conv_range: std::ops::Range<usize>, conv_range: std::ops::Range<usize>,
mut skip: F, mut skip: F,
) -> Vec<WireChunk> ) -> (Vec<WireChunk>, Vec<WireImage>)
where F: FnMut(&AstNode) -> bool, where F: FnMut(&AstNode) -> bool,
{ {
let mut out: Vec<WireChunk> = Vec::new();
let mut buf: Vec<u32> = Vec::new(); let mut buf: Vec<u32> = Vec::new();
let mut images: Vec<WireImage> = Vec::new();
fn flush(buf: &mut Vec<u32>, out: &mut Vec<WireChunk>) { fn visit(
if !buf.is_empty() { node: &AstNode,
out.push(WireChunk::Tokens(std::mem::take(buf))); buf: &mut Vec<u32>,
} images: &mut Vec<WireImage>,
} ) {
fn visit(node: &AstNode, buf: &mut Vec<u32>, out: &mut Vec<WireChunk>) {
match node { match node {
AstNode::Leaf(leaf) => match leaf.body() { AstNode::Leaf(leaf) => match leaf.body() {
NodeBody::Image { bytes, mime, token_count, .. } => { NodeBody::Image { bytes, mime, .. } => {
flush(buf, out); // Pre-expanded vision block lives in
// Bookends (VISION_START + VISION_END) add 2 // leaf.token_ids: [VISION_START, IMAGE_PAD*N,
// to the expanded length; token_count is the // VISION_END]. Inline + record the range.
// IMAGE_PAD run. 0 means count is still let pad_start = buf.len() as u32;
// unknown (no AppendImage yet) — don't claim buf.extend_from_slice(leaf.token_ids());
// a length the server will disagree with. let pad_end = buf.len() as u32;
let expanded = if *token_count == 0 { images.push(WireImage {
0
} else {
*token_count + 2
};
out.push(WireChunk::Image {
bytes: bytes.clone(), bytes: bytes.clone(),
mime: mime.clone(), mime: mime.clone(),
known_expanded_len: expanded, pad_start,
pad_end,
}); });
} }
_ => buf.extend_from_slice(leaf.token_ids()), _ => buf.extend_from_slice(leaf.token_ids()),
}, },
AstNode::Branch { role, children, .. } => { AstNode::Branch { token_ids: Some(cached), .. } => {
buf.extend_from_slice(cached);
}
AstNode::Branch { role, children, token_ids: None, .. } => {
buf.push(tokenizer::IM_START); buf.push(tokenizer::IM_START);
buf.extend(tokenizer::encode(&format!("{}\n", role.as_str()))); buf.extend(tokenizer::encode(&format!("{}\n", role.as_str())));
for c in children { for c in children {
visit(c, buf, out); visit(c, buf, images);
} }
buf.push(tokenizer::IM_END); buf.push(tokenizer::IM_END);
buf.extend(tokenizer::encode("\n")); buf.extend(tokenizer::encode("\n"));
@ -1177,18 +1239,22 @@ impl ContextState {
} }
} }
for node in self.system() { visit(node, &mut buf, &mut out); } for node in self.system() { visit(node, &mut buf, &mut images); }
for node in self.identity() { for node in self.identity() {
if skip(node) { continue; } if skip(node) { continue; }
visit(node, &mut buf, &mut out); visit(node, &mut buf, &mut images);
} }
for node in self.journal() { visit(node, &mut buf, &mut out); } for node in self.journal() { visit(node, &mut buf, &mut images); }
for node in &self.conversation()[conv_range] { for node in &self.conversation()[conv_range] {
if skip(node) { continue; } if skip(node) { continue; }
visit(node, &mut buf, &mut out); visit(node, &mut buf, &mut images);
} }
flush(&mut buf, &mut out); let chunks = if buf.is_empty() {
out Vec::new()
} else {
vec![WireChunk::Tokens(buf)]
};
(chunks, images)
} }
} }
@ -1209,17 +1275,27 @@ impl ContextState {
dbglog!("warning: log: {:#}", e); dbglog!("warning: log: {:#}", e);
} }
} }
// Conversation appends always go to the tail — past committed —
// so they don't break the match. Any other section mutates a
// region the server may already have, so drop alignment.
if section != Section::Conversation {
self.mark_dirty();
}
self.section_mut(section).push(node); self.section_mut(section).push(node);
} }
/// Push without logging. /// Push without logging.
pub fn push_no_log(&mut self, section: Section, node: AstNode) { pub fn push_no_log(&mut self, section: Section, node: AstNode) {
if section != Section::Conversation {
self.mark_dirty();
}
self.section_mut(section).push(node); self.section_mut(section).push(node);
} }
/// Replace the body of a leaf at `index` in `section`. /// Replace the body of a leaf at `index` in `section`.
/// Re-tokenizes to maintain the invariant. /// Re-tokenizes to maintain the invariant.
pub fn set_message(&mut self, section: Section, index: usize, body: NodeBody) { pub fn set_message(&mut self, section: Section, index: usize, body: NodeBody) {
self.mark_dirty();
let nodes = self.section_mut(section); let nodes = self.section_mut(section);
let node = &mut nodes[index]; let node = &mut nodes[index];
match node { match node {
@ -1245,10 +1321,12 @@ impl ContextState {
} }
pub fn del(&mut self, section: Section, index: usize) -> AstNode { pub fn del(&mut self, section: Section, index: usize) -> AstNode {
self.mark_dirty();
self.section_mut(section).remove(index) self.section_mut(section).remove(index)
} }
pub fn clear(&mut self, section: Section) { pub fn clear(&mut self, section: Section) {
self.mark_dirty();
self.section_mut(section).clear(); self.section_mut(section).clear();
} }
@ -1269,6 +1347,7 @@ impl ContextState {
/// are > 50% of conversation tokens) or oldest conversation entry. /// are > 50% of conversation tokens) or oldest conversation entry.
/// Phase 3: Snap to user message boundary at start. /// Phase 3: Snap to user message boundary at start.
pub fn trim_conversation(&mut self) { pub fn trim_conversation(&mut self) {
self.mark_dirty();
let max_tokens = context_budget_tokens(); let max_tokens = context_budget_tokens();
let fixed = self.system.iter().map(|n| n.tokens()).sum::<usize>() let fixed = self.system.iter().map(|n| n.tokens()).sum::<usize>()
+ self.identity.iter().map(|n| n.tokens()).sum::<usize>() + self.identity.iter().map(|n| n.tokens()).sum::<usize>()
@ -1345,11 +1424,49 @@ impl ContextState {
} }
/// Push a child node into a branch at `index` in `section`. /// Push a child node into a branch at `index` in `section`.
/// Clears the branch's cached token stream — wire-out will recompute
/// from children until the cache is repopulated. If the cache was
/// populated (server had these bytes), drops session alignment.
pub fn push_child(&mut self, section: Section, index: usize, child: AstNode) { pub fn push_child(&mut self, section: Section, index: usize, child: AstNode) {
let node = &mut self.section_mut(section)[index];
let was_cached = matches!(node, AstNode::Branch { token_ids: Some(_), .. });
match node {
AstNode::Branch { children, token_ids, .. } => {
children.push(child);
*token_ids = None;
}
AstNode::Leaf(_) => panic!("push_child on leaf node"),
}
if was_cached {
self.mark_dirty();
}
}
/// Like `push_child` but preserves the branch's cached token stream.
/// Used by the response parser, which is simultaneously populating
/// the cache from the authoritative server stream and pushing the
/// parsed-out children — the two stay consistent by construction.
/// Module-private: callers outside `context.rs` must go through
/// `push_child` so the invariant is maintained.
fn push_child_raw(&mut self, section: Section, index: usize, child: AstNode) {
let node = &mut self.section_mut(section)[index]; let node = &mut self.section_mut(section)[index];
match node { match node {
AstNode::Branch { children, .. } => children.push(child), AstNode::Branch { children, .. } => children.push(child),
AstNode::Leaf(_) => panic!("push_child on leaf node"), AstNode::Leaf(_) => panic!("push_child_raw on leaf node"),
}
}
/// Stamp a verbatim token cache onto the branch at `index` in
/// `section`. Used by the response parser to record the server's
/// authoritative token stream for the just-finished turn.
/// Module-private: the cache is an invariant-load-bearing piece
/// of state, populated only by code that holds the server's
/// ground truth.
fn set_branch_cache(&mut self, section: Section, index: usize, tokens: Vec<u32>) {
let node = &mut self.section_mut(section)[index];
match node {
AstNode::Branch { token_ids, .. } => *token_ids = Some(tokens),
AstNode::Leaf(_) => panic!("set_branch_cache on leaf node"),
} }
} }
@ -1373,20 +1490,19 @@ impl ContextState {
// to at request time. Constants come from Qwen3.5-27B's preprocessor_config. // to at request time. Constants come from Qwen3.5-27B's preprocessor_config.
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Test-only client-side estimate of image token expansion. Production // Production client-side computation of image-token expansion. With
// callers obtain the authoritative count from the server via // the delta-session protocol, the client writes the pre-expanded
// CountImageTokens; these constants and helpers stay around only to // vision block (VISION_START + N*IMAGE_PAD + VISION_END) directly
// keep the context-shape unit tests self-contained. // into the token stream at Image-leaf construction time, and tells
#[cfg(test)] // the server where each image's pad run lives via
// GenerateRequest.images. Server validates that this N matches
// what the vision encoder actually produces and rejects on
// mismatch — so drift here fails loudly, not silently.
const QWEN3_PATCH_SIZE: u32 = 16; const QWEN3_PATCH_SIZE: u32 = 16;
#[cfg(test)]
const QWEN3_MERGE_SIZE: u32 = 2; const QWEN3_MERGE_SIZE: u32 = 2;
#[cfg(test)]
const QWEN3_MIN_PIXELS: u64 = 65_536; const QWEN3_MIN_PIXELS: u64 = 65_536;
#[cfg(test)]
const QWEN3_MAX_PIXELS: u64 = 16_777_216; const QWEN3_MAX_PIXELS: u64 = 16_777_216;
#[cfg(test)]
fn smart_resize(h: u32, w: u32, factor: u32, min_pixels: u64, max_pixels: u64) -> (u32, u32) { fn smart_resize(h: u32, w: u32, factor: u32, min_pixels: u64, max_pixels: u64) -> (u32, u32) {
let max_s = h.max(w) as f64; let max_s = h.max(w) as f64;
let min_s = h.min(w) as f64; let min_s = h.min(w) as f64;
@ -1415,11 +1531,10 @@ fn smart_resize(h: u32, w: u32, factor: u32, min_pixels: u64, max_pixels: u64) -
} }
} }
/// Test-only: client-side estimate of how many `<|image_pad|>` tokens /// How many `<|image_pad|>` tokens the Qwen3-VL vision encoder will
/// vLLM will emit for an image of the given dimensions. Production /// produce for an image of the given dimensions. Server verifies
/// callers use `salience::count_image_tokens` (server-authoritative). /// this count against its own encoder run and rejects on mismatch.
#[cfg(test)] pub fn qwen3_image_token_count(orig_h: u32, orig_w: u32) -> u32 {
fn qwen3_image_token_count(orig_h: u32, orig_w: u32) -> u32 {
let factor = QWEN3_PATCH_SIZE * QWEN3_MERGE_SIZE; let factor = QWEN3_PATCH_SIZE * QWEN3_MERGE_SIZE;
let (rh, rw) = smart_resize(orig_h, orig_w, factor, QWEN3_MIN_PIXELS, QWEN3_MAX_PIXELS); let (rh, rw) = smart_resize(orig_h, orig_w, factor, QWEN3_MIN_PIXELS, QWEN3_MAX_PIXELS);
(rh / QWEN3_PATCH_SIZE) * (rw / QWEN3_PATCH_SIZE) / (QWEN3_MERGE_SIZE * QWEN3_MERGE_SIZE) (rh / QWEN3_PATCH_SIZE) * (rw / QWEN3_PATCH_SIZE) / (QWEN3_MERGE_SIZE * QWEN3_MERGE_SIZE)
@ -1854,7 +1969,7 @@ mod tests {
#[test] #[test]
fn test_image_render_and_token_ids() { fn test_image_render_and_token_ids() {
let node = AstNode::image(vec![0u8, 1, 2, 3], "image/png", 512, 512, qwen3_image_token_count(512, 512)); let node = AstNode::image(vec![0u8, 1, 2, 3], "image/png", 512, 512);
let leaf = node.leaf().unwrap(); let leaf = node.leaf().unwrap();
// 3 tokens of bookend + 256 image_pad tokens // 3 tokens of bookend + 256 image_pad tokens
assert_eq!(leaf.token_ids().len(), 258); assert_eq!(leaf.token_ids().len(), 258);
@ -1874,7 +1989,7 @@ mod tests {
let mut ctx = ContextState::new(); let mut ctx = ContextState::new();
ctx.push_no_log(Section::Conversation, AstNode::branch(Role::User, vec![ ctx.push_no_log(Section::Conversation, AstNode::branch(Role::User, vec![
AstNode::content("look:"), AstNode::content("look:"),
AstNode::image(vec![0xDE, 0xAD], "image/png", 512, 512, qwen3_image_token_count(512, 512)), AstNode::image(vec![0xDE, 0xAD], "image/png", 512, 512),
])); ]));
// AST side and wire side should both carry N image_pads + bookends — // AST side and wire side should both carry N image_pads + bookends —
@ -1904,7 +2019,7 @@ mod tests {
#[test] #[test]
fn test_image_serde_roundtrip() { fn test_image_serde_roundtrip() {
let node = AstNode::image(vec![0xDE, 0xAD, 0xBE, 0xEF], "image/png", 64, 64, qwen3_image_token_count(64, 64)); let node = AstNode::image(vec![0xDE, 0xAD, 0xBE, 0xEF], "image/png", 64, 64);
let json = serde_json::to_string(&node).unwrap(); let json = serde_json::to_string(&node).unwrap();
// bytes must be base64-encoded in the JSON form // bytes must be base64-encoded in the JSON form
assert!(json.contains("3q2+7w==")); assert!(json.contains("3q2+7w=="));

View file

@ -333,14 +333,16 @@ impl Agent {
/// becomes its own chunk. Also trims the conversation to budget /// becomes its own chunk. Also trims the conversation to budget
/// first so we don't build a prompt the server will reject for /// first so we don't build a prompt the server will reject for
/// length. /// length.
pub async fn assemble_prompt(&self) -> Vec<context::WireChunk> { pub async fn assemble_prompt(&self)
-> (Vec<context::WireChunk>, Vec<context::WireImage>, u32)
{
let mut ctx = self.context.lock().await; let mut ctx = self.context.lock().await;
if ctx.total_tokens() > context::context_budget_tokens() { if ctx.total_tokens() > context::context_budget_tokens() {
ctx.trim_conversation(); ctx.trim_conversation();
} }
let st = self.state.lock().await; let st = self.state.lock().await;
let conv_len = ctx.conversation().len(); let conv_len = ctx.conversation().len();
let mut chunks = ctx.wire_chunks(0..conv_len, |_| false); let (mut chunks, images) = ctx.wire_chunks(0..conv_len, |_| false);
// Assistant-turn prologue. Merge into the trailing Tokens // Assistant-turn prologue. Merge into the trailing Tokens
// chunk if there is one, else push as a new chunk. // chunk if there is one, else push as a new chunk.
let mut prologue = vec![tokenizer::IM_START]; let mut prologue = vec![tokenizer::IM_START];
@ -353,7 +355,8 @@ impl Agent {
Some(context::WireChunk::Tokens(last)) => last.extend(prologue), Some(context::WireChunk::Tokens(last)) => last.extend(prologue),
_ => chunks.push(context::WireChunk::Tokens(prologue)), _ => chunks.push(context::WireChunk::Tokens(prologue)),
} }
chunks let match_upto = ctx.client_match_upto();
(chunks, images, match_upto)
} }
/// Rebuild the tools section of the system prompt from the current tools list. /// Rebuild the tools section of the system prompt from the current tools list.
@ -413,7 +416,7 @@ impl Agent {
let _thinking = start_activity(&agent, "thinking...").await; let _thinking = start_activity(&agent, "thinking...").await;
let (rx, _stream_guard) = { let (rx, _stream_guard) = {
let chunks = agent.assemble_prompt().await; let (chunks, images, match_upto) = agent.assemble_prompt().await;
let st = agent.state.lock().await; let st = agent.state.lock().await;
let readout_shape = agent.readout.lock().ok().and_then(|buf| { let readout_shape = agent.readout.lock().ok().and_then(|buf| {
buf.manifest.as_ref().map(|m| { buf.manifest.as_ref().map(|m| {
@ -423,6 +426,8 @@ impl Agent {
agent.client.stream_session_mm( agent.client.stream_session_mm(
agent.grpc_session.clone(), agent.grpc_session.clone(),
chunks, chunks,
images,
match_upto,
st.sampling, st.sampling,
st.priority, st.priority,
readout_shape, readout_shape,

View file

@ -63,7 +63,7 @@ async fn view_image(
// AppendImage (the server is authoritative for the IMAGE_PAD // AppendImage (the server is authoritative for the IMAGE_PAD
// count). Placeholder of 0 here until AppendImage is wired; the // count). Placeholder of 0 here until AppendImage is wired; the
// leaf's count gets rewritten from the RPC response at send time. // leaf's count gets rewritten from the RPC response at send time.
let image_leaf = AstNode::image(bytes.clone(), mime, h, w, 0); let image_leaf = AstNode::image(bytes.clone(), mime, h, w);
let branch = AstNode::branch(Role::User, vec![image_leaf]); let branch = AstNode::branch(Role::User, vec![image_leaf]);
agent.context.lock().await.push_log(Section::Conversation, branch); agent.context.lock().await.push_log(Section::Conversation, branch);

View file

@ -693,7 +693,7 @@ impl Mind {
} }
}); });
let mut sub_handle: Option<tokio::task::JoinHandle<()>> = None; let _sub_handle: Option<tokio::task::JoinHandle<()>> = None;
// Start finetune scoring at startup (scores existing conversation) // Start finetune scoring at startup (scores existing conversation)
if !self.config.no_agents { if !self.config.no_agents {
@ -743,6 +743,7 @@ impl Mind {
_ = tokio::time::sleep(timeout), if !has_input => _dmn_expired = true, _ = tokio::time::sleep(timeout), if !has_input => _dmn_expired = true,
} }
/*
if !self.config.no_agents { if !self.config.no_agents {
if sub_handle.as_ref().map_or(true, |h| h.is_finished()) { if sub_handle.as_ref().map_or(true, |h| h.is_finished()) {
let sub = self.subconscious.clone(); let sub = self.subconscious.clone();
@ -754,6 +755,7 @@ impl Mind {
})); }));
} }
} }
*/
// Check for pending user input → push to agent context and start turn // Check for pending user input → push to agent context and start turn
let pending = self.shared.lock().unwrap().take_pending_input(); let pending = self.shared.lock().unwrap().take_pending_input();

View file

@ -26,7 +26,7 @@ pub async fn gen_continuation<F>(
) -> anyhow::Result<String> ) -> anyhow::Result<String>
where F: FnMut(&AstNode) -> bool, where F: FnMut(&AstNode) -> bool,
{ {
let mut chunks = context.wire_chunks(0..entry_idx, skip); let (mut chunks, images) = context.wire_chunks(0..entry_idx, skip);
// Assistant-turn prologue. // Assistant-turn prologue.
let prologue = { let prologue = {
@ -50,19 +50,13 @@ where F: FnMut(&AstNode) -> bool,
// `_guard` drops at function end. // `_guard` drops at function end.
let session_lock = Arc::new(crate::Mutex::new(None)); let session_lock = Arc::new(crate::Mutex::new(None));
let (mut rx, _guard) = client.stream_session_mm( let (mut rx, _guard) = client.stream_session_mm(
session_lock, chunks, sampling, Some(-5), None, session_lock, chunks, images, 0, sampling, Some(-5), None,
); );
let mut tokens = Vec::new(); let mut tokens = Vec::new();
while let Some(tok) = rx.recv().await { while let Some(tok) = rx.recv().await {
match tok { match tok {
StreamToken::Token { id, .. } => tokens.push(id), StreamToken::Token { id, .. } => tokens.push(id),
StreamToken::ImageAppended { .. } => {
// subconscious/generate uses wire_chunks over an AST
// slice that shouldn't have unsized images — but if
// it ever does, we just don't care about updating the
// ephemeral session's AST view.
}
StreamToken::Done { .. } => break, StreamToken::Done { .. } => break,
StreamToken::Error(e) => anyhow::bail!("generation error: {}", e), StreamToken::Error(e) => anyhow::bail!("generation error: {}", e),
} }

View file

@ -40,14 +40,15 @@ struct ScoreResult {
total_logprob: f64, total_logprob: f64,
} }
/// Convert a flat (prompt_tokens, images) pair into the interleaved /// Find each <|vision_start|>...<|vision_end|> run in the flat prompt
/// chunks the session protocol expects. Tokens up to the next /// and pair it with the matching entry in `images`. Returns a list
/// `<|vision_start|>` become a Tokens chunk; each /// of `ImageAttachment` with absolute pad-range positions, ready
/// `<|vision_start|>..<|vision_end|>` run collapses into one Image /// to drop into `GenerateRequest.images`.
/// chunk paired by position with the next entry in `images`. The fn pair_images_to_ranges(
/// server re-expands the IMAGE_PADs on AppendImage. prompt: &[u32],
fn prompt_to_chunks(prompt: &[u32], images: &[WireImage]) -> Vec<WireChunk> { images: &[WireImage],
let mut out: Vec<WireChunk> = Vec::new(); ) -> Vec<pb::ImageAttachment> {
let mut out: Vec<pb::ImageAttachment> = Vec::new();
let mut cur = 0; let mut cur = 0;
let mut img_idx = 0; let mut img_idx = 0;
while cur < prompt.len() { while cur < prompt.len() {
@ -60,22 +61,16 @@ fn prompt_to_chunks(prompt: &[u32], images: &[WireImage]) -> Vec<WireChunk> {
let img = images.get(img_idx) let img = images.get(img_idx)
.unwrap_or_else(|| panic!( .unwrap_or_else(|| panic!(
"image index {} out of range for {} images", img_idx, images.len())); "image index {} out of range for {} images", img_idx, images.len()));
out.push(WireChunk::Image { out.push(pb::ImageAttachment {
bytes: img.bytes.clone(), bytes: img.bytes.clone(),
mime: img.mime.clone(), mime: img.mime.clone(),
known_expanded_len: (end - cur) as u32, pad_range_start: cur as u32,
pad_range_end: end as u32,
}); });
img_idx += 1; img_idx += 1;
cur = end; cur = end;
} else { } else {
let next_vs = prompt[cur..].iter() cur += 1;
.position(|&t| t == tokenizer::VISION_START);
let end = match next_vs {
Some(o) => cur + o,
None => prompt.len(),
};
out.push(WireChunk::Tokens(prompt[cur..end].to_vec()));
cur = end;
} }
} }
out out
@ -95,36 +90,22 @@ async fn call_score(
return Ok(Vec::new()); return Ok(Vec::new());
} }
let chunks = prompt_to_chunks(prompt, images); let images_pb = pair_images_to_ranges(prompt, images);
let mut handle = SessionHandle::open(client).await?; let mut handle = SessionHandle::open(client).await?;
// Walk chunks: AppendImage for each image, prefill-only Generate
// for each text run between images. Accumulate any trailing text
// run into `pending` for the final logprob-generating Generate.
let mut pending: Vec<u32> = Vec::new();
for chunk in chunks {
match chunk {
WireChunk::Tokens(t) => pending.extend(t),
WireChunk::Image { bytes, mime, .. } => {
if !pending.is_empty() {
handle.prefill_only(std::mem::take(&mut pending)).await?;
}
handle.append_image(bytes, mime, false).await?;
}
}
}
// Final Generate: max_tokens=0 so the server runs prefill of the // Final Generate: max_tokens=0 so the server runs prefill of the
// trailing `pending` tokens and emits Token events for each // full prompt and emits Token events for each position covered
// position covered by logprobs_ranges, then Done. logprob_top_k=0 // by logprobs_ranges, then Done. logprob_top_k=0 means "just
// means "just the sampled (prompt) token's logprob" — no top-k // the sampled (prompt) token's logprob" — no top-k alternatives,
// alternatives, which is all call_score historically needed. // which is all call_score historically needed. Images attach
// inline via `images`; the prompt already contains their pre-
// expanded vision blocks at the declared ranges.
let logprobs_ranges: Vec<pb::PositionRange> = ranges.iter() let logprobs_ranges: Vec<pb::PositionRange> = ranges.iter()
.map(|(s, e)| pb::PositionRange { start: *s as u32, end: *e as u32 }) .map(|(s, e)| pb::PositionRange { start: *s as u32, end: *e as u32 })
.collect(); .collect();
let req = pb::GenerateRequest { let req = pb::GenerateRequest {
session_id: handle.session_id.clone(), session_id: handle.session_id.clone(),
append_tokens: pending, append_tokens: prompt.to_vec(),
offset: handle.committed_len, offset: handle.committed_len,
truncating: false, truncating: false,
max_tokens: 0, max_tokens: 0,
@ -136,6 +117,7 @@ async fn call_score(
top_k: 0, top_k: 0,
stop_token_ids: Vec::new(), stop_token_ids: Vec::new(),
priority: priority.unwrap_or(0), priority: priority.unwrap_or(0),
images: images_pb,
}; };
let mut stream = handle.generate(req).await?; let mut stream = handle.generate(req).await?;

View file

@ -43,6 +43,7 @@ impl ConsciousScreen {
name: format!("mem: {}", key), name: format!("mem: {}", key),
tokens: node.tokens(), tokens: node.tokens(),
content: text.clone(), content: text.clone(),
token_ids: leaf.token_ids().to_vec(),
children: Vec::new(), children: Vec::new(),
status: score.map(|s| format!("{:.2}", s)).unwrap_or_default(), status: score.map(|s| format!("{:.2}", s)).unwrap_or_default(),
}); });
@ -55,6 +56,7 @@ impl ConsciousScreen {
name: format!("Memory nodes ({})", mem_children.len()), name: format!("Memory nodes ({})", mem_children.len()),
tokens: mem_tokens, tokens: mem_tokens,
content: String::new(), content: String::new(),
token_ids: Vec::new(),
children: mem_children, children: mem_children,
status: format!("{} scored, {} unscored", scored, unscored), status: format!("{} scored, {} unscored", scored, unscored),
}); });
@ -70,11 +72,13 @@ impl ConsciousScreen {
AstNode::Leaf(leaf) => leaf.body().text().to_string(), AstNode::Leaf(leaf) => leaf.body().text().to_string(),
_ => String::new(), _ => String::new(),
}, },
token_ids: node.token_ids(),
children: match node { children: match node {
AstNode::Branch { children, .. } => children.iter() AstNode::Branch { children, .. } => children.iter()
.map(|c| SectionView { .map(|c| SectionView {
name: c.label(), tokens: c.tokens(), name: c.label(), tokens: c.tokens(),
content: match c { AstNode::Leaf(l) => l.body().text().to_string(), _ => String::new() }, content: match c { AstNode::Leaf(l) => l.body().text().to_string(), _ => String::new() },
token_ids: match c { AstNode::Leaf(l) => l.token_ids().to_vec(), _ => c.token_ids() },
children: Vec::new(), status: String::new(), children: Vec::new(), status: String::new(),
}).collect(), }).collect(),
_ => Vec::new(), _ => Vec::new(),
@ -101,6 +105,7 @@ impl ConsciousScreen {
name: format!("Conversation ({} entries)", conv_children.len()), name: format!("Conversation ({} entries)", conv_children.len()),
tokens: conv_tokens, tokens: conv_tokens,
content: String::new(), content: String::new(),
token_ids: Vec::new(),
children: conv_children, children: conv_children,
status: String::new(), status: String::new(),
}); });

View file

@ -207,6 +207,7 @@ impl SubconsciousScreen {
name: key.clone(), name: key.clone(),
tokens: 0, tokens: 0,
content: val.clone(), content: val.clone(),
token_ids: Vec::new(),
children: Vec::new(), children: Vec::new(),
status: String::new(), status: String::new(),
} }
@ -238,6 +239,7 @@ impl SubconsciousScreen {
name: format!("Conversation ({} entries)", conv_children.len()), name: format!("Conversation ({} entries)", conv_children.len()),
tokens: conv_children.iter().map(|c| c.tokens).sum(), tokens: conv_children.iter().map(|c| c.tokens).sum(),
content: String::new(), content: String::new(),
token_ids: Vec::new(),
children: conv_children, children: conv_children,
status: String::new(), status: String::new(),
}); });

View file

@ -8,11 +8,18 @@ use ratatui::{
}; };
use crate::agent::context::{AstNode, Ast, NodeBody}; use crate::agent::context::{AstNode, Ast, NodeBody};
#[derive(Debug, Clone)] #[derive(Debug, Clone, Default)]
pub struct SectionView { pub struct SectionView {
pub name: String, pub name: String,
pub tokens: usize, pub tokens: usize,
pub content: String, pub content: String,
/// Token-id stream for this subtree, displayed in place of
/// `content` when the tree's show-tokens mode is on. Populated
/// from `leaf.token_ids()` / `node.token_ids()` for views built
/// from the AST; empty for views that don't have a corresponding
/// AST node (subconscious entries, etc.), in which case the
/// token view falls back to the text content.
pub token_ids: Vec<u32>,
pub children: Vec<SectionView>, pub children: Vec<SectionView>,
/// Extra status text shown after the token count. /// Extra status text shown after the token count.
pub status: String, pub status: String,
@ -32,6 +39,7 @@ fn node_to_view(node: &AstNode) -> SectionView {
name, name,
tokens: node.tokens(), tokens: node.tokens(),
content: leaf.body().text().to_string(), content: leaf.body().text().to_string(),
token_ids: leaf.token_ids().to_vec(),
children: Vec::new(), children: Vec::new(),
status, status,
} }
@ -44,6 +52,7 @@ fn node_to_view(node: &AstNode) -> SectionView {
name: node.label(), name: node.label(),
tokens: node.tokens(), tokens: node.tokens(),
content: String::new(), content: String::new(),
token_ids: node.token_ids(),
children: child_views, children: child_views,
status: String::new(), status: String::new(),
} }
@ -54,10 +63,12 @@ fn node_to_view(node: &AstNode) -> SectionView {
pub fn section_to_view(name: &str, nodes: &[AstNode]) -> SectionView { pub fn section_to_view(name: &str, nodes: &[AstNode]) -> SectionView {
let children: Vec<SectionView> = nodes.iter().map(|n| node_to_view(n)).collect(); let children: Vec<SectionView> = nodes.iter().map(|n| node_to_view(n)).collect();
let total_tokens: usize = nodes.iter().map(|n| n.tokens()).sum(); let total_tokens: usize = nodes.iter().map(|n| n.tokens()).sum();
let token_ids: Vec<u32> = nodes.iter().flat_map(|n| n.token_ids()).collect();
SectionView { SectionView {
name: name.to_string(), name: name.to_string(),
tokens: total_tokens, tokens: total_tokens,
content: String::new(), content: String::new(),
token_ids,
children, children,
status: String::new(), status: String::new(),
} }
@ -104,7 +115,7 @@ pub fn format_ts_age(ts: i64) -> String {
/// Key legend for SectionTree panes. /// Key legend for SectionTree panes.
pub fn tree_legend() -> Line<'static> { pub fn tree_legend() -> Line<'static> {
Line::styled( Line::styled(
" ↑↓:nav →/Enter:expand ←:collapse e:expand all c:collapse all PgUp/Dn Home/End ", " ↑↓:nav →/Enter:expand ←:collapse e:expand c:collapse v:toggle tokens/text PgUp/Dn ",
Style::default().fg(Color::DarkGray), Style::default().fg(Color::DarkGray),
) )
} }
@ -185,11 +196,19 @@ pub struct SectionTree {
pub selected: Option<usize>, pub selected: Option<usize>,
pub expanded: std::collections::HashSet<usize>, pub expanded: std::collections::HashSet<usize>,
pub scroll: super::scroll_pane::ScrollPaneState, pub scroll: super::scroll_pane::ScrollPaneState,
/// When true, render `token_ids` as space-separated IDs in place
/// of `content` in expanded panels. Toggled with 'v'.
pub show_tokens: bool,
} }
impl SectionTree { impl SectionTree {
pub fn new() -> Self { pub fn new() -> Self {
Self { selected: None, expanded: std::collections::HashSet::new(), scroll: super::scroll_pane::ScrollPaneState::new() } Self {
selected: None,
expanded: std::collections::HashSet::new(),
scroll: super::scroll_pane::ScrollPaneState::new(),
show_tokens: false,
}
} }
fn total_nodes(&self, sections: &[SectionView]) -> usize { fn total_nodes(&self, sections: &[SectionView]) -> usize {
@ -264,6 +283,9 @@ impl SectionTree {
KeyCode::Char('c') => { KeyCode::Char('c') => {
self.expanded.clear(); self.expanded.clear();
} }
KeyCode::Char('v') => {
self.show_tokens = !self.show_tokens;
}
_ => {} _ => {}
} }
self.scroll_to_selected(height); self.scroll_to_selected(height);
@ -326,7 +348,12 @@ impl SectionTree {
} }
} else if has_content { } else if has_content {
let content_indent = format!("{}", " ".repeat(depth + 1)); let content_indent = format!("{}", " ".repeat(depth + 1));
let content_lines: Vec<&str> = section.content.lines().collect(); let body = if self.show_tokens && !section.token_ids.is_empty() {
format_token_ids_wrapped(&section.token_ids)
} else {
section.content.clone()
};
let content_lines: Vec<&str> = body.lines().collect();
let show = content_lines.len().min(50); let show = content_lines.len().min(50);
for line in &content_lines[..show] { for line in &content_lines[..show] {
lines.push(Line::styled( lines.push(Line::styled(
@ -344,3 +371,16 @@ impl SectionTree {
} }
} }
} }
/// Format token IDs for the content panel: space-separated, wrapped
/// at 12 ids per line so they fit comfortably in a pane.
fn format_token_ids_wrapped(ids: &[u32]) -> String {
let mut out = String::new();
for (i, id) in ids.iter().enumerate() {
if i > 0 {
if i % 12 == 0 { out.push('\n'); } else { out.push(' '); }
}
out.push_str(&id.to_string());
}
out
}