diff --git a/src/agent/api/mod.rs b/src/agent/api/mod.rs index 06ecf70..a7a87f7 100644 --- a/src/agent/api/mod.rs +++ b/src/agent/api/mod.rs @@ -49,7 +49,6 @@ impl Drop for AbortOnDrop { /// Sampling parameters for model generation. #[derive(Clone, Copy)] -#[allow(dead_code)] // fields used once Generate RPC lands in a later step pub(crate) struct SamplingParams { pub temperature: f32, pub top_p: f32, @@ -66,6 +65,12 @@ pub enum StreamToken { /// `readout` is `None` when the server has readout disabled or /// returned no readout for this chunk. Token { id: u32, readout: Option }, + /// An image was committed server-side via AppendImage during this + /// stream. `placeholder_count` is the N IMAGE_PADs the server + /// wrote. Emitted in AST order — caller applies these counts to + /// the first-N image leaves that currently have token_count=0 + /// via `ContextState::commit_image_token_counts`. + ImageAppended { placeholder_count: u32 }, Done { usage: Option }, Error(String), } @@ -98,26 +103,41 @@ impl ApiClient { } } - /// Stream generation via a gRPC session. Stubbed during the - /// unary-rewrite transition — the Generate RPC is wired in a - /// later step of this series. Until then, callers that reach - /// this path get a StreamToken::Error. + /// Stream generation via a gRPC session. Walks the prompt chunks + /// comparing against the session's `committed_len`, sends the + /// delta as interleaved `AppendImage` + intermediate + /// `Generate(max_tokens=0)` (for text runs separating images) + + /// a final `Generate(max_tokens=sampling.max_tokens, ...)` whose + /// Token events stream back through the channel. + /// + /// On any gRPC error the session is dropped; the next call + /// reopens fresh. Happy-path ordering: Token* Done. Error paths + /// emit `StreamToken::Error` and close. pub(crate) fn stream_session_mm( &self, - _session_lock: std::sync::Arc>>, - _prompt_tokens: &[u32], - _images: &[super::context::WireImage], - _sampling: SamplingParams, - _priority: Option, + session_lock: std::sync::Arc>>, + chunks: Vec, + sampling: SamplingParams, + priority: Option, + readout_shape: Option<(u32, u32)>, ) -> (mpsc::UnboundedReceiver, AbortOnDrop) { let (tx, rx) = mpsc::unbounded_channel(); + let base_url = self.base_url.clone(); + let api_key = self.api_key.clone(); + let model = self.model.clone(); + let handle = tokio::spawn(async move { - let _ = tx.send(StreamToken::Error( - "Generate RPC not yet wired after protocol rewrite — see \ - proto/salience.proto; AppendImage / Generate land next." - .into(), - )); + let result = run_session_generate( + session_lock, &base_url, &api_key, &model, + chunks, sampling, priority, readout_shape, &tx, + ).await; + if let Err(e) = result { + log::warn!(target: "grpc", + "stream_session_mm error, forwarding to UI: {:#}", e); + let _ = tx.send(StreamToken::Error(format!("{:#}", e))); + } }); + (rx, AbortOnDrop(handle)) } @@ -131,6 +151,8 @@ impl ApiClient { /// First call performs the HTTP fetch; subsequent calls (including /// across ApiClient clones sharing the same cell) return the /// cached result. The manifest doesn't change during a server run. + pub fn model_str(&self) -> &str { &self.model } + pub async fn fetch_readout_manifest(&self) -> Result> { let manifest = self.manifest.get_or_try_init(|| async { let url = format!("{}/readout/manifest", self.base_url); @@ -156,3 +178,254 @@ impl ApiClient { } +/// Body of the gRPC-path streaming task. Walks the wire chunks +/// against the session's `committed_len`, sends the delta via +/// AppendImage / intermediate prefill-only Generates / final decode +/// Generate, and translates the final Generate's Token events into +/// StreamTokens on `tx`. On success the session handle is returned +/// to `session_lock` with an updated `committed_len`; on error the +/// handle is dropped so the next call reopens. +async fn run_session_generate( + session_lock: std::sync::Arc>>, + base_url: &str, + api_key: &str, + model: &str, + chunks: Vec, + sampling: SamplingParams, + priority: Option, + readout_shape: Option<(u32, u32)>, + tx: &mpsc::UnboundedSender, +) -> Result<()> { + use std::time::Instant; + use futures::StreamExt; + use super::context::WireChunk; + use salience::pb; + + let mut handle: salience::SessionHandle = { + let mut guard = session_lock.lock().await; + match guard.take() { + Some(h) => h, + None => { + drop(guard); + log::debug!(target: "grpc", "run_session_generate: opening new session"); + salience::SessionHandle::open(base_url, api_key, model).await? + } + } + }; + + // Skip chunks already on the server. committed_len must land on + // a chunk boundary — every successful AppendImage / Generate + // advances committed_len by exactly one chunk's contribution, + // so straddling means divergence (client's AST was rewritten + // under us). + let mut acc: u32 = 0; + let mut delta_start = chunks.len(); + for (i, chunk) in chunks.iter().enumerate() { + if acc == handle.committed_len { + delta_start = i; + break; + } + let len = match chunk { + WireChunk::Tokens(t) => t.len() as u32, + WireChunk::Image { known_expanded_len, .. } => *known_expanded_len, + }; + if len == 0 { + anyhow::bail!( + "session divergence: chunk {} has unknown length but \ + precedes committed_len {} (acc={})", + i, handle.committed_len, acc, + ); + } + if acc + len > handle.committed_len { + anyhow::bail!( + "session divergence: chunk {} straddles committed_len \ + (acc={}, len={}, committed={})", + i, acc, len, handle.committed_len, + ); + } + acc += len; + } + if acc != handle.committed_len { + anyhow::bail!( + "session divergence: chunks sum to {} but committed_len is {}", + acc, handle.committed_len, + ); + } + + // Walk the delta: accumulate Tokens in `pending`; on Image, + // flush pending via prefill-only Generate then AppendImage. + let mut pending: Vec = Vec::new(); + for chunk in &chunks[delta_start..] { + match chunk { + WireChunk::Tokens(t) => pending.extend_from_slice(t), + WireChunk::Image { bytes, mime, .. } => { + if !pending.is_empty() { + flush_pending(&mut handle, std::mem::take(&mut pending)).await?; + } + let resp = handle + .append_image(bytes.clone(), mime.clone(), false) + .await?; + log::debug!(target: "grpc", + "AppendImage: N={} total_length={}", + resp.placeholder_count, resp.total_length); + let _ = tx.send(StreamToken::ImageAppended { + placeholder_count: resp.placeholder_count, + }); + } + } + } + + // Final Generate: pending holds any trailing text; decode up to + // sampling.max_tokens. Request readouts on all decode positions + // via a catch-all range ending at u32::MAX — decode never + // reaches it. + let prompt_len_after_append = handle.committed_len + pending.len() as u32; + let readout_ranges = if readout_shape.is_some() { + vec![pb::PositionRange { + start: prompt_len_after_append, + end: u32::MAX, + }] + } else { + Vec::new() + }; + let max_tokens = sampling_max_tokens(&sampling); + let req = pb::GenerateRequest { + session_id: handle.session_id.clone(), + append_tokens: pending, + offset: handle.committed_len, + truncating: false, + max_tokens, + logprobs_ranges: Vec::new(), + logprob_top_k: 0, + readout_ranges, + temperature: sampling.temperature, + top_p: sampling.top_p, + top_k: sampling.top_k, + stop_token_ids: Vec::new(), + priority: priority.unwrap_or(0), + }; + let session_id_for_log = handle.session_id.clone(); + let t_generate = Instant::now(); + log::debug!(target: "grpc", + "session {} Generate: offset={} append={} max_tokens={} priority={}", + session_id_for_log, req.offset, req.append_tokens.len(), + req.max_tokens, req.priority); + + let mut stream = handle.generate(req).await?; + let (n_layers, n_concepts) = readout_shape.unwrap_or((0, 0)); + let mut session_terminated = false; + let mut first_token_at: Option = None; + + while let Some(event) = stream.next().await { + let event = match event { + Ok(e) => e, + Err(status) => { + log::warn!(target: "grpc", + "session {} Generate stream error: {} — dropping session", + session_id_for_log, status); + session_terminated = true; + let _ = tx.send(StreamToken::Error(format!( + "Generate stream error: {}", status, + ))); + break; + } + }; + let Some(inner) = event.event else { continue }; + match inner { + pb::generate_event::Event::Token(t) => { + if t.is_prefill { continue; } + if first_token_at.is_none() { + log::debug!(target: "grpc", + "session {} first decode token at {:?}", + session_id_for_log, t_generate.elapsed()); + first_token_at = Some(Instant::now()); + } + let readout = if t.readout.is_empty() { + None + } else if n_layers == 0 || n_concepts == 0 { + None + } else { + let expected = (n_layers as usize) * (n_concepts as usize); + if t.readout.len() != expected { + log::warn!(target: "grpc", + "readout shape mismatch: expected {}*{}={}, got {}", + n_layers, n_concepts, expected, t.readout.len()); + None + } else { + let n = n_concepts as usize; + let mut layers: Vec> = Vec::with_capacity(n_layers as usize); + for l in 0..(n_layers as usize) { + layers.push(t.readout[l * n..(l + 1) * n].to_vec()); + } + Some(layers) + } + }; + if tx.send(StreamToken::Token { id: t.id, readout }).is_err() { + break; + } + } + pb::generate_event::Event::Done(d) => { + log::debug!(target: "grpc", + "session {} Done: prompt={} completion={} total={} reason={:?} elapsed={:?}", + session_id_for_log, d.prompt_tokens, d.completion_tokens, + d.total_tokens, d.finish_reason, t_generate.elapsed()); + handle.committed_len = d.total_tokens; + let usage = Some(Usage { + prompt_tokens: d.prompt_tokens, + completion_tokens: d.completion_tokens, + total_tokens: d.total_tokens, + }); + let _ = tx.send(StreamToken::Done { usage }); + } + } + } + + if !session_terminated { + let mut guard = session_lock.lock().await; + *guard = Some(handle); + } + Ok(()) +} + +/// Emit a prefill-only Generate for the pending token run. Used to +/// append text that separates two image blocks — the server needs +/// those tokens in its session before we AppendImage the next image, +/// but we don't want the cost or output of a decode step. +async fn flush_pending( + handle: &mut salience::SessionHandle, + tokens: Vec, +) -> Result<()> { + use futures::StreamExt; + use salience::pb; + let req = pb::GenerateRequest { + session_id: handle.session_id.clone(), + append_tokens: tokens, + offset: handle.committed_len, + truncating: false, + max_tokens: 0, + logprobs_ranges: Vec::new(), + logprob_top_k: 0, + readout_ranges: Vec::new(), + temperature: 0.0, + top_p: 0.0, + top_k: 0, + stop_token_ids: Vec::new(), + priority: 0, + }; + let mut stream = handle.generate(req).await?; + while let Some(event) = stream.next().await { + let event = event.map_err(|s| anyhow::anyhow!("flush Generate stream: {}", s))?; + if let Some(pb::generate_event::Event::Done(d)) = event.event { + handle.committed_len = d.total_tokens; + } + } + Ok(()) +} + +fn sampling_max_tokens(_sampling: &SamplingParams) -> u32 { + // SamplingParams doesn't carry max_tokens today; 4096 mirrors + // the old server-side default and is a sensible interactive cap. + // TODO: plumb from the caller if we need bigger budgets. + 4096 +} + diff --git a/src/agent/api/salience.rs b/src/agent/api/salience.rs index f9ea83d..18f0d7b 100644 --- a/src/agent/api/salience.rs +++ b/src/agent/api/salience.rs @@ -145,12 +145,14 @@ pub async fn append_image( /// Handle to a server-side session. Carries the id + connection params /// so subsequent per-session RPCs (AppendImage, Generate, ForkSession) /// can be issued without the caller juggling base_url / api_key each -/// time. +/// time. `committed_len` tracks the server's current session.tokens +/// length so the client can submit deltas with the right `offset`. pub struct SessionHandle { pub session_id: String, pub max_model_len: u32, pub base_url: String, pub api_key: String, + pub committed_len: u32, } impl SessionHandle { @@ -168,6 +170,7 @@ impl SessionHandle { max_model_len: resp.max_model_len, base_url: grpc_url, api_key: api_key.to_string(), + committed_len: 0, }) } @@ -175,25 +178,44 @@ impl SessionHandle { close_session(&self.base_url, &self.api_key, &self.session_id).await } - /// Append an image via the server-side vision block. See - /// `append_image` free function for full semantics. + /// Append an image via the server-side vision block. Updates + /// `committed_len` from the server's response on success. pub async fn append_image( - &self, + &mut self, data: Vec, mime: String, - offset: u32, truncating: bool, ) -> Result { - append_image( + let resp = append_image( &self.base_url, &self.api_key, &self.session_id, data, mime, - offset, + self.committed_len, truncating, ) - .await + .await?; + self.committed_len = resp.total_length; + Ok(resp) + } + + /// Open a gRPC Generate stream with the given request. Caller + /// iterates the returned stream of GenerateEvents; the handle's + /// `committed_len` is advanced on Done based on the Done event's + /// `total_tokens` field. + pub async fn generate( + &self, + req: pb::GenerateRequest, + ) -> Result> { + let mut client = connect(&self.base_url).await?; + let mut req = tonic::Request::new(req); + with_auth(&mut req, &self.api_key); + let resp = client + .generate(req) + .await + .with_context(|| "Generate RPC failed")?; + Ok(resp.into_inner()) } } diff --git a/src/agent/context.rs b/src/agent/context.rs index ab21e21..2982851 100644 --- a/src/agent/context.rs +++ b/src/agent/context.rs @@ -312,6 +312,16 @@ impl NodeLeaf { pub fn token_ids(&self) -> &[u32] { &self.token_ids } pub fn tokens(&self) -> usize { self.token_ids.len() } pub fn timestamp(&self) -> DateTime { self.timestamp } + + /// If this is an Image leaf, update its IMAGE_PAD count to `n` and + /// recompute cached `token_ids`. No-op on non-Image leaves — + /// callers know the body shape via `body()`. + pub fn set_image_token_count(&mut self, n: u32) { + if let NodeBody::Image { token_count, .. } = &mut self.body { + *token_count = n; + self.token_ids = self.body.compute_token_ids(); + } + } } impl AstNode { @@ -737,6 +747,15 @@ impl ResponseParser { parser.finish(&mut ctx); return Ok(()); } + super::api::StreamToken::ImageAppended { placeholder_count } => { + // Commit the server-authoritative IMAGE_PAD + // count into the first zero-count image leaf + // in wire order. AppendImage always runs + // before the final Generate, so this fires + // before any Token events for this stream. + let mut ctx = agent.context.lock().await; + ctx.commit_image_token_counts(&[placeholder_count]); + } super::api::StreamToken::Error(e) => { return Err(anyhow::anyhow!("{}", e)); } @@ -866,6 +885,36 @@ impl ContextState { pub fn sections(&self) -> [&Vec; 4] { [&self.system, &self.identity, &self.journal, &self.conversation] } + + /// Walk image leaves across all sections in wire order and fill in + /// the first N leaves that have `token_count == 0` with successive + /// values from `counts`. Used after a gRPC session's stream of + /// AppendImage responses to commit the server's IMAGE_PAD counts + /// back into the AST so the next wire walk doesn't see zero-count + /// images in the already-committed prefix. + pub fn commit_image_token_counts(&mut self, counts: &[u32]) { + fn visit(node: &mut AstNode, counts: &[u32], idx: &mut usize) { + if *idx >= counts.len() { return; } + match node { + AstNode::Leaf(leaf) => { + if let NodeBody::Image { token_count, .. } = leaf.body() { + if *token_count == 0 { + leaf.set_image_token_count(counts[*idx]); + *idx += 1; + } + } + } + AstNode::Branch { children, .. } => { + for c in children { visit(c, counts, idx); } + } + } + } + let mut idx = 0usize; + for node in &mut self.system { visit(node, counts, &mut idx); } + for node in &mut self.identity { visit(node, counts, &mut idx); } + for node in &mut self.journal { visit(node, counts, &mut idx); } + for node in &mut self.conversation { visit(node, counts, &mut idx); } + } } impl Ast for ContextState { @@ -909,6 +958,28 @@ pub struct WireImage { pub mime: String, } +/// One piece of the wire stream for the gRPC session path. Runs of +/// text/tool/thinking tokens are batched into `Tokens`; each Image +/// leaf becomes its own `Image` chunk because the server writes the +/// full vision block on AppendImage — the client never sends vision +/// tokens inline. Order matches the AST's depth-first wire order. +#[derive(Clone)] +pub enum WireChunk { + Tokens(Vec), + Image { + bytes: Vec, + mime: String, + /// Client's current best guess at how many tokens the server + /// will expand this image to, including bookends. `0` means + /// the count is unknown (view_image just loaded the image and + /// AppendImage hasn't run yet). Callers use this only to know + /// this chunk's contribution to the server-visible length for + /// offset bookkeeping on chunks that were already appended on + /// a prior turn. + known_expanded_len: u32, + }, +} + fn wire_into(node: &AstNode, tokens: &mut Vec, images: &mut Vec) { match node { AstNode::Leaf(leaf) => match leaf.body() { @@ -1045,6 +1116,80 @@ impl ContextState { } (tokens, images, assistant_ranges) } + + /// Build the wire stream as interleaved `WireChunk`s for the gRPC + /// session path. Unlike `wire_prompt`, this preserves the order + /// of text runs vs image blocks so the caller can drive the + /// append flow (AppendImage for each Image, Generate append for + /// contiguous text runs). + /// + /// `conv_range` and `skip` mirror `wire_prompt` — select a + /// conversation slice and drop identity / conversation nodes by + /// predicate. + pub fn wire_chunks( + &self, + conv_range: std::ops::Range, + mut skip: F, + ) -> Vec + where F: FnMut(&AstNode) -> bool, + { + let mut out: Vec = Vec::new(); + let mut buf: Vec = Vec::new(); + + fn flush(buf: &mut Vec, out: &mut Vec) { + if !buf.is_empty() { + out.push(WireChunk::Tokens(std::mem::take(buf))); + } + } + + fn visit(node: &AstNode, buf: &mut Vec, out: &mut Vec) { + match node { + AstNode::Leaf(leaf) => match leaf.body() { + NodeBody::Image { bytes, mime, token_count, .. } => { + flush(buf, out); + // Bookends (VISION_START + VISION_END) add 2 + // to the expanded length; token_count is the + // IMAGE_PAD run. 0 means count is still + // unknown (no AppendImage yet) — don't claim + // a length the server will disagree with. + let expanded = if *token_count == 0 { + 0 + } else { + *token_count + 2 + }; + out.push(WireChunk::Image { + bytes: bytes.clone(), + mime: mime.clone(), + known_expanded_len: expanded, + }); + } + _ => buf.extend_from_slice(leaf.token_ids()), + }, + AstNode::Branch { role, children, .. } => { + buf.push(tokenizer::IM_START); + buf.extend(tokenizer::encode(&format!("{}\n", role.as_str()))); + for c in children { + visit(c, buf, out); + } + buf.push(tokenizer::IM_END); + buf.extend(tokenizer::encode("\n")); + } + } + } + + for node in self.system() { visit(node, &mut buf, &mut out); } + for node in self.identity() { + if skip(node) { continue; } + visit(node, &mut buf, &mut out); + } + for node in self.journal() { visit(node, &mut buf, &mut out); } + for node in &self.conversation()[conv_range] { + if skip(node) { continue; } + visit(node, &mut buf, &mut out); + } + flush(&mut buf, &mut out); + out + } } impl ContextState { diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 6a55f3f..a8e7592 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -329,35 +329,32 @@ impl Agent { }) } - pub async fn assemble_prompt_tokens(&self) -> Vec { - self.assemble_prompt().await.0 - } - - /// Assemble a ready-to-send prompt: token stream in wire form (each - /// image collapsed to a single `<|image_pad|>`) paired with the - /// images to attach as multi_modal_data. - /// - /// Pre-send size check: if the context has grown past budget since the - /// last compact (accumulation between turns, a fork's context getting - /// bigger than expected, etc.), trim here rather than letting vLLM - /// reject the request. Client-side tokenization means we already know - /// the exact token count so there's no reason to round-trip an - /// oversize request. - pub async fn assemble_prompt(&self) -> (Vec, Vec) { + /// Assemble a ready-to-send prompt as interleaved wire chunks for + /// the gRPC session path. Text runs are batched; each Image leaf + /// becomes its own chunk. Also trims the conversation to budget + /// first so we don't build a prompt the server will reject for + /// length. + pub async fn assemble_prompt(&self) -> Vec { let mut ctx = self.context.lock().await; if ctx.total_tokens() > context::context_budget_tokens() { ctx.trim_conversation(); } let st = self.state.lock().await; - let (mut tokens, images, _) = - ctx.wire_prompt(0..ctx.conversation().len(), |_| false); - tokens.push(tokenizer::IM_START); + let conv_len = ctx.conversation().len(); + let mut chunks = ctx.wire_chunks(0..conv_len, |_| false); + // Assistant-turn prologue. Merge into the trailing Tokens + // chunk if there is one, else push as a new chunk. + let mut prologue = vec![tokenizer::IM_START]; if st.think_native { - tokens.extend(tokenizer::encode("assistant\n\n")); + prologue.extend(tokenizer::encode("assistant\n\n")); } else { - tokens.extend(tokenizer::encode("assistant\n")); + prologue.extend(tokenizer::encode("assistant\n")); } - (tokens, images) + match chunks.last_mut() { + Some(context::WireChunk::Tokens(last)) => last.extend(prologue), + _ => chunks.push(context::WireChunk::Tokens(prologue)), + } + chunks } /// Rebuild the tools section of the system prompt from the current tools list. @@ -417,18 +414,23 @@ impl Agent { let _thinking = start_activity(&agent, "thinking...").await; let (rx, _stream_guard) = { - let (prompt_tokens, images) = agent.assemble_prompt().await; + let chunks = agent.assemble_prompt().await; let st = agent.state.lock().await; + let readout_shape = agent.readout.lock().ok().and_then(|buf| { + buf.manifest.as_ref().map(|m| { + (m.layers.len() as u32, m.concepts.len() as u32) + }) + }); agent.client.stream_session_mm( agent.grpc_session.clone(), - &prompt_tokens, - &images, + chunks, api::SamplingParams { temperature: st.temperature, top_p: st.top_p, top_k: st.top_k, }, st.priority, + readout_shape, ) }; diff --git a/src/mind/mod.rs b/src/mind/mod.rs index f1ddb54..b2eb77a 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -419,7 +419,9 @@ impl Mind { let subconscious = Arc::new(crate::Mutex::new(Subconscious::new())); subconscious.lock().await.init_output_tool(subconscious.clone()); - let unconscious = Arc::new(crate::Mutex::new(Unconscious::new())); + let unconscious = Arc::new(crate::Mutex::new( + Unconscious::new(agent.client.clone()), + )); // Spawn the unconscious loop on its own task if !config.no_agents { @@ -467,8 +469,11 @@ impl Mind { }; // Spawn agents outside lock + let client = unc.lock().await.client.clone(); for (idx, name, auto) in to_spawn { - match crate::mind::unconscious::prepare_spawn(&name, auto, wake.clone()).await { + match crate::mind::unconscious::prepare_spawn( + &name, auto, wake.clone(), client.clone(), + ).await { Ok(result) => unc.lock().await.complete_spawn(idx, result), Err(auto) => unc.lock().await.abort_spawn(idx, auto), } diff --git a/src/mind/unconscious.rs b/src/mind/unconscious.rs index 4f9a0ca..9c40e18 100644 --- a/src/mind/unconscious.rs +++ b/src/mind/unconscious.rs @@ -73,10 +73,15 @@ pub struct Unconscious { last_health_check: Option, /// Notified when agent state changes (finished, toggled) pub wake: std::sync::Arc, + /// Shared API client — cloned (cheap) into each spawned agent's + /// Agent::new call so they all share the manifest cache and + /// gRPC endpoint state. Override `.model` on the clone when a + /// per-agent backend differs from the default. + pub client: crate::agent::api::ApiClient, } impl Unconscious { - pub fn new() -> Self { + pub fn new(client: crate::agent::api::ApiClient) -> Self { let enabled_map = load_enabled_config(); // Scan all .agent files, exclude subconscious-* and surface-observe @@ -120,6 +125,7 @@ impl Unconscious { graph_health: None, last_health_check: None, wake: std::sync::Arc::new(tokio::sync::Notify::new()), + client, } } @@ -134,7 +140,8 @@ impl Unconscious { let agent_name = self.agents[idx].name.clone(); let auto = self.agents[idx].auto.take().unwrap(); let wake = self.wake.clone(); - match prepare_spawn(&agent_name, auto, wake).await { + let client = self.client.clone(); + match prepare_spawn(&agent_name, auto, wake, client).await { Ok(result) => self.complete_spawn(idx, result), Err(auto) => self.abort_spawn(idx, auto), } @@ -250,7 +257,12 @@ pub struct SpawnResult { /// Called outside the Unconscious lock. /// On success, auto is consumed (moved into spawned task). /// On failure, auto is returned so it can be restored. -pub async fn prepare_spawn(name: &str, mut auto: AutoAgent, wake: std::sync::Arc) -> Result { +pub async fn prepare_spawn( + name: &str, + mut auto: AutoAgent, + wake: std::sync::Arc, + base_client: crate::agent::api::ApiClient, +) -> Result { dbglog!("[unconscious] spawning {}", name); let def = match defs::get_def(name) { @@ -295,8 +307,10 @@ pub async fn prepare_spawn(name: &str, mut auto: AutoAgent, wake: std::sync::Arc }; // Unconscious agents have self-contained prompts — no standard context. - let client = crate::agent::api::ApiClient::new( - &resolved.api_base, &resolved.api_key, &resolved.model_id); + // Clone the shared client so we inherit the manifest cache and + // only override the model id per-agent. + let mut client = base_client; + client.model = resolved.model_id.clone(); let agent = crate::agent::Agent::new( client, Vec::new(), app, None, @@ -329,8 +343,9 @@ impl Unconscious { self.reap_finished(); let to_spawn = self.select_to_spawn(); let wake = self.wake.clone(); + let client = self.client.clone(); for (idx, name, auto) in to_spawn { - match prepare_spawn(&name, auto, wake.clone()).await { + match prepare_spawn(&name, auto, wake.clone(), client.clone()).await { Ok(result) => self.complete_spawn(idx, result), Err(auto) => self.abort_spawn(idx, auto), } diff --git a/src/subconscious/generate.rs b/src/subconscious/generate.rs index 757e08a..046911d 100644 --- a/src/subconscious/generate.rs +++ b/src/subconscious/generate.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use crate::agent::api::{ApiClient, SamplingParams, StreamToken}; -use crate::agent::context::{AstNode, ContextState}; +use crate::agent::context::{AstNode, ContextState, WireChunk}; use crate::agent::tokenizer; /// Generate an assistant continuation from the context up to `entry_idx`, @@ -26,10 +26,18 @@ pub async fn gen_continuation( ) -> anyhow::Result where F: FnMut(&AstNode) -> bool, { - let (mut prompt, images, _) = context.wire_prompt(0..entry_idx, skip); + let mut chunks = context.wire_chunks(0..entry_idx, skip); - prompt.push(tokenizer::IM_START); - prompt.extend(tokenizer::encode("assistant\n")); + // Assistant-turn prologue. + let prologue = { + let mut t = vec![tokenizer::IM_START]; + t.extend(tokenizer::encode("assistant\n")); + t + }; + match chunks.last_mut() { + Some(WireChunk::Tokens(last)) => last.extend(prologue), + _ => chunks.push(WireChunk::Tokens(prologue)), + } let sampling = SamplingParams { temperature: 0.6, @@ -41,13 +49,19 @@ where F: FnMut(&AstNode) -> bool, // `_guard` drops at function end. let session_lock = Arc::new(crate::Mutex::new(None)); let (mut rx, _guard) = client.stream_session_mm( - session_lock, &prompt, &images, sampling, Some(-5), + session_lock, chunks, sampling, Some(-5), None, ); let mut tokens = Vec::new(); while let Some(tok) = rx.recv().await { match tok { StreamToken::Token { id, .. } => tokens.push(id), + StreamToken::ImageAppended { .. } => { + // subconscious/generate uses wire_chunks over an AST + // slice that shouldn't have unsized images — but if + // it ever does, we just don't care about updating the + // ephemeral session's AST view. + } StreamToken::Done { .. } => break, StreamToken::Error(e) => anyhow::bail!("generation error: {}", e), }