agent: end-to-end gRPC Generate with delta-based session orchestration

Wires the client side of the new salience protocol so inference
actually runs over gRPC instead of emitting the stubbed "not yet
wired" error. Each turn walks the AST as interleaved chunks, sends
only what's new to the server, and streams decode tokens back.

context.rs:
  * `WireChunk` enum: `Tokens(Vec<u32>)` or `Image { bytes, mime,
    known_expanded_len }`. Preserves text/image/text ordering the
    wire path can't flatten.
  * `wire_chunks(range, skip)` walker, parallel to `wire_prompt` —
    branches emit `<|im_start|>…<|im_end|>` tokens, image leaves
    emit a single Image chunk (no inline vision tokens).
  * `NodeLeaf::set_image_token_count(n)` + recompute of cached
    `token_ids`; `ContextState::commit_image_token_counts(&[u32])`
    fills in the first-N zero-count image leaves in wire order.
  * `ResponseParser::run` handles the new
    `StreamToken::ImageAppended` by committing the server's N into
    the AST before the final Generate's Token events stream in.

salience.rs:
  * `SessionHandle` tracks `committed_len`. `append_image` advances
    it from the RPC response. New `generate(req)` opens the
    server-streaming RPC.

api/mod.rs:
  * `stream_session_mm(session_lock, chunks, sampling, priority,
    readout_shape)` replaces the stub. Spawns `run_session_generate`.
  * `run_session_generate`: takes the session out of the Mutex (or
    opens fresh), skips chunks covered by `committed_len` (bails on
    mid-chunk straddle or unknown-length image in the committed
    prefix), walks the delta: accumulates Tokens into `pending`, on
    Image flushes pending via `flush_pending` (max_tokens=0 Generate
    that just prefills), then AppendImage + emits
    StreamToken::ImageAppended. Final Generate carries any trailing
    pending text as `append_tokens` and the sampling params; Token
    events stream out as StreamToken::Token, Done as
    StreamToken::Done. On success, handle with updated
    `committed_len` returns to the Mutex; on error, handle drops
    and next call reopens.
  * `StreamToken::ImageAppended { placeholder_count }` variant —
    emitted in wire order before the final Generate's tokens.
  * Prefix-cache cap for readout coverage: `readout_ranges` covers
    `[prompt_len_after_append, u32::MAX)` when the caller provides
    a readout_shape, so decode positions stream their readouts.

agent/mod.rs:
  * `assemble_prompt` returns `Vec<WireChunk>` with the assistant
    prologue merged into the trailing Tokens chunk. Caller in
    `turn` passes chunks + readout_shape (pulled from
    `agent.readout.lock().manifest`) to `stream_session_mm`.
  * Dropped `assemble_prompt_tokens` — dead.

mind + unconscious:
  * `Unconscious::new(client)` stores a shared `ApiClient`. Fixes
    the repeated-manifest-fetch bug caused by each subagent's
    `ApiClient::new` having its own OnceCell. The client's Arc-
    wrapped manifest cache is now shared across every agent Mind
    spawns.
  * `prepare_spawn(name, auto, wake, base_client)` clones the base
    client and overrides `.model` for the resolved backend instead
    of constructing fresh. All three callers
    (`toggle`/`trigger`/unconscious loop) pass `self.client.clone()`.
  * `Mind::new` passes `agent.client.clone()` into
    `Unconscious::new`.

subconscious/generate.rs:
  * gen_continuation switched to `wire_chunks` + the new
    `stream_session_mm` signature. Ephemeral session opens on each
    call, tears down at scope end. No readouts requested.

Not changed yet, noted for follow-up:
  * Subconscious ablation scoring in learn.rs still talks to
    `/v1/score` over HTTP. Will migrate once we have time to verify
    the Generate+max_tokens=0+prompt_logprobs path end-to-end.
  * compare.rs constructs its own ApiClient for the
    `compare.test_backend` (which is intentionally a different
    endpoint) — left alone.
  * Readout manifest still fetched via HTTP at Agent::new.
    Migration to GetReadoutManifest gRPC is a separate cleanup.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-24 12:27:55 -04:00
commit 8d9c9e9f7b
7 changed files with 536 additions and 60 deletions

View file

@ -49,7 +49,6 @@ impl Drop for AbortOnDrop {
/// Sampling parameters for model generation.
#[derive(Clone, Copy)]
#[allow(dead_code)] // fields used once Generate RPC lands in a later step
pub(crate) struct SamplingParams {
pub temperature: f32,
pub top_p: f32,
@ -66,6 +65,12 @@ pub enum StreamToken {
/// `readout` is `None` when the server has readout disabled or
/// returned no readout for this chunk.
Token { id: u32, readout: Option<TokenReadout> },
/// An image was committed server-side via AppendImage during this
/// stream. `placeholder_count` is the N IMAGE_PADs the server
/// wrote. Emitted in AST order — caller applies these counts to
/// the first-N image leaves that currently have token_count=0
/// via `ContextState::commit_image_token_counts`.
ImageAppended { placeholder_count: u32 },
Done { usage: Option<Usage> },
Error(String),
}
@ -98,26 +103,41 @@ impl ApiClient {
}
}
/// Stream generation via a gRPC session. Stubbed during the
/// unary-rewrite transition — the Generate RPC is wired in a
/// later step of this series. Until then, callers that reach
/// this path get a StreamToken::Error.
/// Stream generation via a gRPC session. Walks the prompt chunks
/// comparing against the session's `committed_len`, sends the
/// delta as interleaved `AppendImage` + intermediate
/// `Generate(max_tokens=0)` (for text runs separating images) +
/// a final `Generate(max_tokens=sampling.max_tokens, ...)` whose
/// Token events stream back through the channel.
///
/// On any gRPC error the session is dropped; the next call
/// reopens fresh. Happy-path ordering: Token* Done. Error paths
/// emit `StreamToken::Error` and close.
pub(crate) fn stream_session_mm(
&self,
_session_lock: std::sync::Arc<crate::Mutex<Option<salience::SessionHandle>>>,
_prompt_tokens: &[u32],
_images: &[super::context::WireImage],
_sampling: SamplingParams,
_priority: Option<i32>,
session_lock: std::sync::Arc<crate::Mutex<Option<salience::SessionHandle>>>,
chunks: Vec<super::context::WireChunk>,
sampling: SamplingParams,
priority: Option<i32>,
readout_shape: Option<(u32, u32)>,
) -> (mpsc::UnboundedReceiver<StreamToken>, AbortOnDrop) {
let (tx, rx) = mpsc::unbounded_channel();
let base_url = self.base_url.clone();
let api_key = self.api_key.clone();
let model = self.model.clone();
let handle = tokio::spawn(async move {
let _ = tx.send(StreamToken::Error(
"Generate RPC not yet wired after protocol rewrite — see \
proto/salience.proto; AppendImage / Generate land next."
.into(),
));
let result = run_session_generate(
session_lock, &base_url, &api_key, &model,
chunks, sampling, priority, readout_shape, &tx,
).await;
if let Err(e) = result {
log::warn!(target: "grpc",
"stream_session_mm error, forwarding to UI: {:#}", e);
let _ = tx.send(StreamToken::Error(format!("{:#}", e)));
}
});
(rx, AbortOnDrop(handle))
}
@ -131,6 +151,8 @@ impl ApiClient {
/// First call performs the HTTP fetch; subsequent calls (including
/// across ApiClient clones sharing the same cell) return the
/// cached result. The manifest doesn't change during a server run.
pub fn model_str(&self) -> &str { &self.model }
pub async fn fetch_readout_manifest(&self) -> Result<Option<ReadoutManifest>> {
let manifest = self.manifest.get_or_try_init(|| async {
let url = format!("{}/readout/manifest", self.base_url);
@ -156,3 +178,254 @@ impl ApiClient {
}
/// Body of the gRPC-path streaming task. Walks the wire chunks
/// against the session's `committed_len`, sends the delta via
/// AppendImage / intermediate prefill-only Generates / final decode
/// Generate, and translates the final Generate's Token events into
/// StreamTokens on `tx`. On success the session handle is returned
/// to `session_lock` with an updated `committed_len`; on error the
/// handle is dropped so the next call reopens.
async fn run_session_generate(
session_lock: std::sync::Arc<crate::Mutex<Option<salience::SessionHandle>>>,
base_url: &str,
api_key: &str,
model: &str,
chunks: Vec<super::context::WireChunk>,
sampling: SamplingParams,
priority: Option<i32>,
readout_shape: Option<(u32, u32)>,
tx: &mpsc::UnboundedSender<StreamToken>,
) -> Result<()> {
use std::time::Instant;
use futures::StreamExt;
use super::context::WireChunk;
use salience::pb;
let mut handle: salience::SessionHandle = {
let mut guard = session_lock.lock().await;
match guard.take() {
Some(h) => h,
None => {
drop(guard);
log::debug!(target: "grpc", "run_session_generate: opening new session");
salience::SessionHandle::open(base_url, api_key, model).await?
}
}
};
// Skip chunks already on the server. committed_len must land on
// a chunk boundary — every successful AppendImage / Generate
// advances committed_len by exactly one chunk's contribution,
// so straddling means divergence (client's AST was rewritten
// under us).
let mut acc: u32 = 0;
let mut delta_start = chunks.len();
for (i, chunk) in chunks.iter().enumerate() {
if acc == handle.committed_len {
delta_start = i;
break;
}
let len = match chunk {
WireChunk::Tokens(t) => t.len() as u32,
WireChunk::Image { known_expanded_len, .. } => *known_expanded_len,
};
if len == 0 {
anyhow::bail!(
"session divergence: chunk {} has unknown length but \
precedes committed_len {} (acc={})",
i, handle.committed_len, acc,
);
}
if acc + len > handle.committed_len {
anyhow::bail!(
"session divergence: chunk {} straddles committed_len \
(acc={}, len={}, committed={})",
i, acc, len, handle.committed_len,
);
}
acc += len;
}
if acc != handle.committed_len {
anyhow::bail!(
"session divergence: chunks sum to {} but committed_len is {}",
acc, handle.committed_len,
);
}
// Walk the delta: accumulate Tokens in `pending`; on Image,
// flush pending via prefill-only Generate then AppendImage.
let mut pending: Vec<u32> = Vec::new();
for chunk in &chunks[delta_start..] {
match chunk {
WireChunk::Tokens(t) => pending.extend_from_slice(t),
WireChunk::Image { bytes, mime, .. } => {
if !pending.is_empty() {
flush_pending(&mut handle, std::mem::take(&mut pending)).await?;
}
let resp = handle
.append_image(bytes.clone(), mime.clone(), false)
.await?;
log::debug!(target: "grpc",
"AppendImage: N={} total_length={}",
resp.placeholder_count, resp.total_length);
let _ = tx.send(StreamToken::ImageAppended {
placeholder_count: resp.placeholder_count,
});
}
}
}
// Final Generate: pending holds any trailing text; decode up to
// sampling.max_tokens. Request readouts on all decode positions
// via a catch-all range ending at u32::MAX — decode never
// reaches it.
let prompt_len_after_append = handle.committed_len + pending.len() as u32;
let readout_ranges = if readout_shape.is_some() {
vec![pb::PositionRange {
start: prompt_len_after_append,
end: u32::MAX,
}]
} else {
Vec::new()
};
let max_tokens = sampling_max_tokens(&sampling);
let req = pb::GenerateRequest {
session_id: handle.session_id.clone(),
append_tokens: pending,
offset: handle.committed_len,
truncating: false,
max_tokens,
logprobs_ranges: Vec::new(),
logprob_top_k: 0,
readout_ranges,
temperature: sampling.temperature,
top_p: sampling.top_p,
top_k: sampling.top_k,
stop_token_ids: Vec::new(),
priority: priority.unwrap_or(0),
};
let session_id_for_log = handle.session_id.clone();
let t_generate = Instant::now();
log::debug!(target: "grpc",
"session {} Generate: offset={} append={} max_tokens={} priority={}",
session_id_for_log, req.offset, req.append_tokens.len(),
req.max_tokens, req.priority);
let mut stream = handle.generate(req).await?;
let (n_layers, n_concepts) = readout_shape.unwrap_or((0, 0));
let mut session_terminated = false;
let mut first_token_at: Option<Instant> = None;
while let Some(event) = stream.next().await {
let event = match event {
Ok(e) => e,
Err(status) => {
log::warn!(target: "grpc",
"session {} Generate stream error: {} — dropping session",
session_id_for_log, status);
session_terminated = true;
let _ = tx.send(StreamToken::Error(format!(
"Generate stream error: {}", status,
)));
break;
}
};
let Some(inner) = event.event else { continue };
match inner {
pb::generate_event::Event::Token(t) => {
if t.is_prefill { continue; }
if first_token_at.is_none() {
log::debug!(target: "grpc",
"session {} first decode token at {:?}",
session_id_for_log, t_generate.elapsed());
first_token_at = Some(Instant::now());
}
let readout = if t.readout.is_empty() {
None
} else if n_layers == 0 || n_concepts == 0 {
None
} else {
let expected = (n_layers as usize) * (n_concepts as usize);
if t.readout.len() != expected {
log::warn!(target: "grpc",
"readout shape mismatch: expected {}*{}={}, got {}",
n_layers, n_concepts, expected, t.readout.len());
None
} else {
let n = n_concepts as usize;
let mut layers: Vec<Vec<f32>> = Vec::with_capacity(n_layers as usize);
for l in 0..(n_layers as usize) {
layers.push(t.readout[l * n..(l + 1) * n].to_vec());
}
Some(layers)
}
};
if tx.send(StreamToken::Token { id: t.id, readout }).is_err() {
break;
}
}
pb::generate_event::Event::Done(d) => {
log::debug!(target: "grpc",
"session {} Done: prompt={} completion={} total={} reason={:?} elapsed={:?}",
session_id_for_log, d.prompt_tokens, d.completion_tokens,
d.total_tokens, d.finish_reason, t_generate.elapsed());
handle.committed_len = d.total_tokens;
let usage = Some(Usage {
prompt_tokens: d.prompt_tokens,
completion_tokens: d.completion_tokens,
total_tokens: d.total_tokens,
});
let _ = tx.send(StreamToken::Done { usage });
}
}
}
if !session_terminated {
let mut guard = session_lock.lock().await;
*guard = Some(handle);
}
Ok(())
}
/// Emit a prefill-only Generate for the pending token run. Used to
/// append text that separates two image blocks — the server needs
/// those tokens in its session before we AppendImage the next image,
/// but we don't want the cost or output of a decode step.
async fn flush_pending(
handle: &mut salience::SessionHandle,
tokens: Vec<u32>,
) -> Result<()> {
use futures::StreamExt;
use salience::pb;
let req = pb::GenerateRequest {
session_id: handle.session_id.clone(),
append_tokens: tokens,
offset: handle.committed_len,
truncating: false,
max_tokens: 0,
logprobs_ranges: Vec::new(),
logprob_top_k: 0,
readout_ranges: Vec::new(),
temperature: 0.0,
top_p: 0.0,
top_k: 0,
stop_token_ids: Vec::new(),
priority: 0,
};
let mut stream = handle.generate(req).await?;
while let Some(event) = stream.next().await {
let event = event.map_err(|s| anyhow::anyhow!("flush Generate stream: {}", s))?;
if let Some(pb::generate_event::Event::Done(d)) = event.event {
handle.committed_len = d.total_tokens;
}
}
Ok(())
}
fn sampling_max_tokens(_sampling: &SamplingParams) -> u32 {
// SamplingParams doesn't carry max_tokens today; 4096 mirrors
// the old server-side default and is a sensible interactive cap.
// TODO: plumb from the caller if we need bigger budgets.
4096
}

View file

@ -145,12 +145,14 @@ pub async fn append_image(
/// Handle to a server-side session. Carries the id + connection params
/// so subsequent per-session RPCs (AppendImage, Generate, ForkSession)
/// can be issued without the caller juggling base_url / api_key each
/// time.
/// time. `committed_len` tracks the server's current session.tokens
/// length so the client can submit deltas with the right `offset`.
pub struct SessionHandle {
pub session_id: String,
pub max_model_len: u32,
pub base_url: String,
pub api_key: String,
pub committed_len: u32,
}
impl SessionHandle {
@ -168,6 +170,7 @@ impl SessionHandle {
max_model_len: resp.max_model_len,
base_url: grpc_url,
api_key: api_key.to_string(),
committed_len: 0,
})
}
@ -175,25 +178,44 @@ impl SessionHandle {
close_session(&self.base_url, &self.api_key, &self.session_id).await
}
/// Append an image via the server-side vision block. See
/// `append_image` free function for full semantics.
/// Append an image via the server-side vision block. Updates
/// `committed_len` from the server's response on success.
pub async fn append_image(
&self,
&mut self,
data: Vec<u8>,
mime: String,
offset: u32,
truncating: bool,
) -> Result<pb::AppendImageResponse> {
append_image(
let resp = append_image(
&self.base_url,
&self.api_key,
&self.session_id,
data,
mime,
offset,
self.committed_len,
truncating,
)
.await
.await?;
self.committed_len = resp.total_length;
Ok(resp)
}
/// Open a gRPC Generate stream with the given request. Caller
/// iterates the returned stream of GenerateEvents; the handle's
/// `committed_len` is advanced on Done based on the Done event's
/// `total_tokens` field.
pub async fn generate(
&self,
req: pb::GenerateRequest,
) -> Result<tonic::Streaming<pb::GenerateEvent>> {
let mut client = connect(&self.base_url).await?;
let mut req = tonic::Request::new(req);
with_auth(&mut req, &self.api_key);
let resp = client
.generate(req)
.await
.with_context(|| "Generate RPC failed")?;
Ok(resp.into_inner())
}
}

View file

@ -312,6 +312,16 @@ impl NodeLeaf {
pub fn token_ids(&self) -> &[u32] { &self.token_ids }
pub fn tokens(&self) -> usize { self.token_ids.len() }
pub fn timestamp(&self) -> DateTime<Utc> { self.timestamp }
/// If this is an Image leaf, update its IMAGE_PAD count to `n` and
/// recompute cached `token_ids`. No-op on non-Image leaves —
/// callers know the body shape via `body()`.
pub fn set_image_token_count(&mut self, n: u32) {
if let NodeBody::Image { token_count, .. } = &mut self.body {
*token_count = n;
self.token_ids = self.body.compute_token_ids();
}
}
}
impl AstNode {
@ -737,6 +747,15 @@ impl ResponseParser {
parser.finish(&mut ctx);
return Ok(());
}
super::api::StreamToken::ImageAppended { placeholder_count } => {
// Commit the server-authoritative IMAGE_PAD
// count into the first zero-count image leaf
// in wire order. AppendImage always runs
// before the final Generate, so this fires
// before any Token events for this stream.
let mut ctx = agent.context.lock().await;
ctx.commit_image_token_counts(&[placeholder_count]);
}
super::api::StreamToken::Error(e) => {
return Err(anyhow::anyhow!("{}", e));
}
@ -866,6 +885,36 @@ impl ContextState {
pub fn sections(&self) -> [&Vec<AstNode>; 4] {
[&self.system, &self.identity, &self.journal, &self.conversation]
}
/// Walk image leaves across all sections in wire order and fill in
/// the first N leaves that have `token_count == 0` with successive
/// values from `counts`. Used after a gRPC session's stream of
/// AppendImage responses to commit the server's IMAGE_PAD counts
/// back into the AST so the next wire walk doesn't see zero-count
/// images in the already-committed prefix.
pub fn commit_image_token_counts(&mut self, counts: &[u32]) {
fn visit(node: &mut AstNode, counts: &[u32], idx: &mut usize) {
if *idx >= counts.len() { return; }
match node {
AstNode::Leaf(leaf) => {
if let NodeBody::Image { token_count, .. } = leaf.body() {
if *token_count == 0 {
leaf.set_image_token_count(counts[*idx]);
*idx += 1;
}
}
}
AstNode::Branch { children, .. } => {
for c in children { visit(c, counts, idx); }
}
}
}
let mut idx = 0usize;
for node in &mut self.system { visit(node, counts, &mut idx); }
for node in &mut self.identity { visit(node, counts, &mut idx); }
for node in &mut self.journal { visit(node, counts, &mut idx); }
for node in &mut self.conversation { visit(node, counts, &mut idx); }
}
}
impl Ast for ContextState {
@ -909,6 +958,28 @@ pub struct WireImage {
pub mime: String,
}
/// One piece of the wire stream for the gRPC session path. Runs of
/// text/tool/thinking tokens are batched into `Tokens`; each Image
/// leaf becomes its own `Image` chunk because the server writes the
/// full vision block on AppendImage — the client never sends vision
/// tokens inline. Order matches the AST's depth-first wire order.
#[derive(Clone)]
pub enum WireChunk {
Tokens(Vec<u32>),
Image {
bytes: Vec<u8>,
mime: String,
/// Client's current best guess at how many tokens the server
/// will expand this image to, including bookends. `0` means
/// the count is unknown (view_image just loaded the image and
/// AppendImage hasn't run yet). Callers use this only to know
/// this chunk's contribution to the server-visible length for
/// offset bookkeeping on chunks that were already appended on
/// a prior turn.
known_expanded_len: u32,
},
}
fn wire_into(node: &AstNode, tokens: &mut Vec<u32>, images: &mut Vec<WireImage>) {
match node {
AstNode::Leaf(leaf) => match leaf.body() {
@ -1045,6 +1116,80 @@ impl ContextState {
}
(tokens, images, assistant_ranges)
}
/// Build the wire stream as interleaved `WireChunk`s for the gRPC
/// session path. Unlike `wire_prompt`, this preserves the order
/// of text runs vs image blocks so the caller can drive the
/// append flow (AppendImage for each Image, Generate append for
/// contiguous text runs).
///
/// `conv_range` and `skip` mirror `wire_prompt` — select a
/// conversation slice and drop identity / conversation nodes by
/// predicate.
pub fn wire_chunks<F>(
&self,
conv_range: std::ops::Range<usize>,
mut skip: F,
) -> Vec<WireChunk>
where F: FnMut(&AstNode) -> bool,
{
let mut out: Vec<WireChunk> = Vec::new();
let mut buf: Vec<u32> = Vec::new();
fn flush(buf: &mut Vec<u32>, out: &mut Vec<WireChunk>) {
if !buf.is_empty() {
out.push(WireChunk::Tokens(std::mem::take(buf)));
}
}
fn visit(node: &AstNode, buf: &mut Vec<u32>, out: &mut Vec<WireChunk>) {
match node {
AstNode::Leaf(leaf) => match leaf.body() {
NodeBody::Image { bytes, mime, token_count, .. } => {
flush(buf, out);
// Bookends (VISION_START + VISION_END) add 2
// to the expanded length; token_count is the
// IMAGE_PAD run. 0 means count is still
// unknown (no AppendImage yet) — don't claim
// a length the server will disagree with.
let expanded = if *token_count == 0 {
0
} else {
*token_count + 2
};
out.push(WireChunk::Image {
bytes: bytes.clone(),
mime: mime.clone(),
known_expanded_len: expanded,
});
}
_ => buf.extend_from_slice(leaf.token_ids()),
},
AstNode::Branch { role, children, .. } => {
buf.push(tokenizer::IM_START);
buf.extend(tokenizer::encode(&format!("{}\n", role.as_str())));
for c in children {
visit(c, buf, out);
}
buf.push(tokenizer::IM_END);
buf.extend(tokenizer::encode("\n"));
}
}
}
for node in self.system() { visit(node, &mut buf, &mut out); }
for node in self.identity() {
if skip(node) { continue; }
visit(node, &mut buf, &mut out);
}
for node in self.journal() { visit(node, &mut buf, &mut out); }
for node in &self.conversation()[conv_range] {
if skip(node) { continue; }
visit(node, &mut buf, &mut out);
}
flush(&mut buf, &mut out);
out
}
}
impl ContextState {

View file

@ -329,35 +329,32 @@ impl Agent {
})
}
pub async fn assemble_prompt_tokens(&self) -> Vec<u32> {
self.assemble_prompt().await.0
}
/// Assemble a ready-to-send prompt: token stream in wire form (each
/// image collapsed to a single `<|image_pad|>`) paired with the
/// images to attach as multi_modal_data.
///
/// Pre-send size check: if the context has grown past budget since the
/// last compact (accumulation between turns, a fork's context getting
/// bigger than expected, etc.), trim here rather than letting vLLM
/// reject the request. Client-side tokenization means we already know
/// the exact token count so there's no reason to round-trip an
/// oversize request.
pub async fn assemble_prompt(&self) -> (Vec<u32>, Vec<context::WireImage>) {
/// Assemble a ready-to-send prompt as interleaved wire chunks for
/// the gRPC session path. Text runs are batched; each Image leaf
/// becomes its own chunk. Also trims the conversation to budget
/// first so we don't build a prompt the server will reject for
/// length.
pub async fn assemble_prompt(&self) -> Vec<context::WireChunk> {
let mut ctx = self.context.lock().await;
if ctx.total_tokens() > context::context_budget_tokens() {
ctx.trim_conversation();
}
let st = self.state.lock().await;
let (mut tokens, images, _) =
ctx.wire_prompt(0..ctx.conversation().len(), |_| false);
tokens.push(tokenizer::IM_START);
let conv_len = ctx.conversation().len();
let mut chunks = ctx.wire_chunks(0..conv_len, |_| false);
// Assistant-turn prologue. Merge into the trailing Tokens
// chunk if there is one, else push as a new chunk.
let mut prologue = vec![tokenizer::IM_START];
if st.think_native {
tokens.extend(tokenizer::encode("assistant\n<think>\n"));
prologue.extend(tokenizer::encode("assistant\n<think>\n"));
} else {
tokens.extend(tokenizer::encode("assistant\n"));
prologue.extend(tokenizer::encode("assistant\n"));
}
(tokens, images)
match chunks.last_mut() {
Some(context::WireChunk::Tokens(last)) => last.extend(prologue),
_ => chunks.push(context::WireChunk::Tokens(prologue)),
}
chunks
}
/// Rebuild the tools section of the system prompt from the current tools list.
@ -417,18 +414,23 @@ impl Agent {
let _thinking = start_activity(&agent, "thinking...").await;
let (rx, _stream_guard) = {
let (prompt_tokens, images) = agent.assemble_prompt().await;
let chunks = agent.assemble_prompt().await;
let st = agent.state.lock().await;
let readout_shape = agent.readout.lock().ok().and_then(|buf| {
buf.manifest.as_ref().map(|m| {
(m.layers.len() as u32, m.concepts.len() as u32)
})
});
agent.client.stream_session_mm(
agent.grpc_session.clone(),
&prompt_tokens,
&images,
chunks,
api::SamplingParams {
temperature: st.temperature,
top_p: st.top_p,
top_k: st.top_k,
},
st.priority,
readout_shape,
)
};

View file

@ -419,7 +419,9 @@ impl Mind {
let subconscious = Arc::new(crate::Mutex::new(Subconscious::new()));
subconscious.lock().await.init_output_tool(subconscious.clone());
let unconscious = Arc::new(crate::Mutex::new(Unconscious::new()));
let unconscious = Arc::new(crate::Mutex::new(
Unconscious::new(agent.client.clone()),
));
// Spawn the unconscious loop on its own task
if !config.no_agents {
@ -467,8 +469,11 @@ impl Mind {
};
// Spawn agents outside lock
let client = unc.lock().await.client.clone();
for (idx, name, auto) in to_spawn {
match crate::mind::unconscious::prepare_spawn(&name, auto, wake.clone()).await {
match crate::mind::unconscious::prepare_spawn(
&name, auto, wake.clone(), client.clone(),
).await {
Ok(result) => unc.lock().await.complete_spawn(idx, result),
Err(auto) => unc.lock().await.abort_spawn(idx, auto),
}

View file

@ -73,10 +73,15 @@ pub struct Unconscious {
last_health_check: Option<Instant>,
/// Notified when agent state changes (finished, toggled)
pub wake: std::sync::Arc<tokio::sync::Notify>,
/// Shared API client — cloned (cheap) into each spawned agent's
/// Agent::new call so they all share the manifest cache and
/// gRPC endpoint state. Override `.model` on the clone when a
/// per-agent backend differs from the default.
pub client: crate::agent::api::ApiClient,
}
impl Unconscious {
pub fn new() -> Self {
pub fn new(client: crate::agent::api::ApiClient) -> Self {
let enabled_map = load_enabled_config();
// Scan all .agent files, exclude subconscious-* and surface-observe
@ -120,6 +125,7 @@ impl Unconscious {
graph_health: None,
last_health_check: None,
wake: std::sync::Arc::new(tokio::sync::Notify::new()),
client,
}
}
@ -134,7 +140,8 @@ impl Unconscious {
let agent_name = self.agents[idx].name.clone();
let auto = self.agents[idx].auto.take().unwrap();
let wake = self.wake.clone();
match prepare_spawn(&agent_name, auto, wake).await {
let client = self.client.clone();
match prepare_spawn(&agent_name, auto, wake, client).await {
Ok(result) => self.complete_spawn(idx, result),
Err(auto) => self.abort_spawn(idx, auto),
}
@ -250,7 +257,12 @@ pub struct SpawnResult {
/// Called outside the Unconscious lock.
/// On success, auto is consumed (moved into spawned task).
/// On failure, auto is returned so it can be restored.
pub async fn prepare_spawn(name: &str, mut auto: AutoAgent, wake: std::sync::Arc<tokio::sync::Notify>) -> Result<SpawnResult, AutoAgent> {
pub async fn prepare_spawn(
name: &str,
mut auto: AutoAgent,
wake: std::sync::Arc<tokio::sync::Notify>,
base_client: crate::agent::api::ApiClient,
) -> Result<SpawnResult, AutoAgent> {
dbglog!("[unconscious] spawning {}", name);
let def = match defs::get_def(name) {
@ -295,8 +307,10 @@ pub async fn prepare_spawn(name: &str, mut auto: AutoAgent, wake: std::sync::Arc
};
// Unconscious agents have self-contained prompts — no standard context.
let client = crate::agent::api::ApiClient::new(
&resolved.api_base, &resolved.api_key, &resolved.model_id);
// Clone the shared client so we inherit the manifest cache and
// only override the model id per-agent.
let mut client = base_client;
client.model = resolved.model_id.clone();
let agent = crate::agent::Agent::new(
client, Vec::new(),
app, None,
@ -329,8 +343,9 @@ impl Unconscious {
self.reap_finished();
let to_spawn = self.select_to_spawn();
let wake = self.wake.clone();
let client = self.client.clone();
for (idx, name, auto) in to_spawn {
match prepare_spawn(&name, auto, wake.clone()).await {
match prepare_spawn(&name, auto, wake.clone(), client.clone()).await {
Ok(result) => self.complete_spawn(idx, result),
Err(auto) => self.abort_spawn(idx, auto),
}

View file

@ -7,7 +7,7 @@
use std::sync::Arc;
use crate::agent::api::{ApiClient, SamplingParams, StreamToken};
use crate::agent::context::{AstNode, ContextState};
use crate::agent::context::{AstNode, ContextState, WireChunk};
use crate::agent::tokenizer;
/// Generate an assistant continuation from the context up to `entry_idx`,
@ -26,10 +26,18 @@ pub async fn gen_continuation<F>(
) -> anyhow::Result<String>
where F: FnMut(&AstNode) -> bool,
{
let (mut prompt, images, _) = context.wire_prompt(0..entry_idx, skip);
let mut chunks = context.wire_chunks(0..entry_idx, skip);
prompt.push(tokenizer::IM_START);
prompt.extend(tokenizer::encode("assistant\n"));
// Assistant-turn prologue.
let prologue = {
let mut t = vec![tokenizer::IM_START];
t.extend(tokenizer::encode("assistant\n"));
t
};
match chunks.last_mut() {
Some(WireChunk::Tokens(last)) => last.extend(prologue),
_ => chunks.push(WireChunk::Tokens(prologue)),
}
let sampling = SamplingParams {
temperature: 0.6,
@ -41,13 +49,19 @@ where F: FnMut(&AstNode) -> bool,
// `_guard` drops at function end.
let session_lock = Arc::new(crate::Mutex::new(None));
let (mut rx, _guard) = client.stream_session_mm(
session_lock, &prompt, &images, sampling, Some(-5),
session_lock, chunks, sampling, Some(-5), None,
);
let mut tokens = Vec::new();
while let Some(tok) = rx.recv().await {
match tok {
StreamToken::Token { id, .. } => tokens.push(id),
StreamToken::ImageAppended { .. } => {
// subconscious/generate uses wire_chunks over an AST
// slice that shouldn't have unsized images — but if
// it ever does, we just don't care about updating the
// ephemeral session's AST view.
}
StreamToken::Done { .. } => break,
StreamToken::Error(e) => anyhow::bail!("generation error: {}", e),
}