From 08213f9093a906b4d1c118fac958b52f90c1ff9a Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Thu, 23 Apr 2026 02:21:07 -0400 Subject: [PATCH] salience: add gRPC client + TLS plumbing for stateful vllm sessions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the client-side of a stateful gRPC protocol against vllm, plus the TLS trust machinery so we can talk to self-signed vllm servers. Protocol (proto/salience.proto): Bidi-streaming Session RPC carries OpenSession / AppendTokens / Generate / Cancel from client and SessionReady / PrefillProgress / Token / GenerateDone / Error from server. Separate Fork unary RPC for cheap branching (prefix cache shares KV automatically). Plus ListSessions, CloseSession, GetReadoutManifest admin RPCs. Per-token readouts ship as packed f32 ([n_layers * n_concepts] per token, flat). Logprobs use range-selected positions plus a top-k parameter — empty ranges means no logprobs, any range means emit sampled-token logprob at those positions, top_k > 0 adds alternatives. Client (src/agent/api/salience.rs): Tonic-generated types under pb::, a connect() helper, with_auth() for bearer metadata, and a Session handle wrapping the bidi stream: open() handshakes SessionReady; append() is fire-and-forget; generate() returns impl Stream that drains inbound until Done or terminating Error. One generate at a time per session. Peak picker (src/agent/salience.rs): Pure function over ReadoutEntry traces. Per-concept z-score against trace global stats; contiguous above-threshold regions emit one peak at the local max. Configurable sigma threshold and min-std safety floor. Deterministic tie-break on offset then concept name. 12 unit tests covering empty traces, flat channels, single/multi spikes, contiguous humps, multi-concept independence, trailing runs, sub-threshold noise, layer-out-of-range, manifest shape mismatch, and threshold tunability. TLS (src/agent/api/http.rs): HttpClient::build now also loads every .pem file under ~/.consciousness/certs/ into the rustls root store — so dropping a .pem in that directory is enough to trust a new self- signed server; no code changes per new host. Also installs the rustls default crypto provider explicitly via OnceLock: tonic's tls features pulled in both ring and aws-lc-rs on the resolver path, and rustls 0.23 refuses to auto-pick when either could win. Build (build.rs, Cargo.toml): tonic-build generates Rust types from proto/salience.proto at cargo-build time, using a vendored protoc binary (protoc-bin-vendored) so no system install is required. New runtime deps: tonic, prost, async-stream, tokio-stream, rustls-pemfile. Co-Authored-By: Proof of Concept --- Cargo.lock | 514 ++++++++++++++++++++++++++++++++++- Cargo.toml | 8 + build.rs | 17 ++ proto/salience.proto | 260 ++++++++++++++++++ src/agent/api/http.rs | 69 ++++- src/agent/api/mod.rs | 449 ++++-------------------------- src/agent/api/salience.rs | 249 +++++++++++++++++ src/agent/context.rs | 59 ++-- src/agent/mod.rs | 19 +- src/agent/salience.rs | 309 +++++++++++++++++++++ src/agent/tools/vision.rs | 13 +- src/lib.rs | 3 + src/logging.rs | 146 ++++++++++ src/subconscious/generate.rs | 13 +- src/user/mod.rs | 5 + 15 files changed, 1691 insertions(+), 442 deletions(-) create mode 100644 proto/salience.proto create mode 100644 src/agent/api/salience.rs create mode 100644 src/agent/salience.rs create mode 100644 src/logging.rs diff --git a/Cargo.lock b/Cargo.lock index 394168a..f88965a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -165,6 +165,39 @@ dependencies = [ "tree-sitter-yaml", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "atomic" version = "0.6.1" @@ -208,6 +241,53 @@ dependencies = [ "fs_extra", ] +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower 0.5.3", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", +] + [[package]] name = "base64" version = "0.13.1" @@ -491,6 +571,7 @@ dependencies = [ "anyhow", "ast-grep-core", "ast-grep-language", + "async-stream", "base64 0.22.1", "bytes", "capnp", @@ -518,11 +599,14 @@ dependencies = [ "notify-debouncer-mini", "paste", "peg", + "prost", + "protoc-bin-vendored", "ratatui", "redb", "regex", "rustls", "rustls-native-certs", + "rustls-pemfile", "serde", "serde_json", "serde_urlencoded", @@ -531,7 +615,10 @@ dependencies = [ "tokenizers", "tokio", "tokio-rustls", + "tokio-stream", "tokio-util", + "tonic", + "tonic-build", "tui-markdown", "tui-textarea-2", "uuid", @@ -1064,6 +1151,12 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "fixedbitset" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" + [[package]] name = "flate2" version = "1.1.9" @@ -1288,6 +1381,31 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "h2" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f44da3a8150a6703ed5d34e164b875fd14c2cdab9af1252a9a1020bde2bdc54" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http", + "indexmap 2.14.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.15.5" @@ -1393,6 +1511,12 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + [[package]] name = "hyper" version = "1.9.0" @@ -1403,9 +1527,11 @@ dependencies = [ "bytes", "futures-channel", "futures-core", + "h2", "http", "http-body", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -1413,6 +1539,19 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.20" @@ -1420,11 +1559,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" dependencies = [ "bytes", + "futures-channel", + "futures-util", "http", "http-body", "hyper", + "libc", "pin-project-lite", + "socket2 0.6.3", "tokio", + "tower-service", + "tracing", ] [[package]] @@ -1485,6 +1630,16 @@ version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09e54e57b4c48b40f7aec75635392b12b3421fa26fe8b4332e63138ed278459c" +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.14.0" @@ -1858,6 +2013,12 @@ dependencies = [ "xml5ever", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "memchr" version = "2.8.0" @@ -1888,6 +2049,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -1938,6 +2105,12 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "multimap" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" + [[package]] name = "new_debug_unreachable" version = "1.0.6" @@ -2233,6 +2406,16 @@ dependencies = [ "sha2", ] +[[package]] +name = "petgraph" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" +dependencies = [ + "fixedbitset 0.5.7", + "indexmap 2.14.0", +] + [[package]] name = "phf" version = "0.11.3" @@ -2285,6 +2468,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1749c7ed4bcaf4c3d0a3efc28538844fb29bcdd7d2b67b2be7e20ba861ff517" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "pin-project-lite" version = "0.2.17" @@ -2304,7 +2507,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "740ebea15c5d1428f910cd1a5f52cebf8d25006245ed8ade92702f4943d91e07" dependencies = [ "base64 0.22.1", - "indexmap", + "indexmap 2.14.0", "quick-xml", "serde", "time", @@ -2378,6 +2581,122 @@ dependencies = [ "yansi", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" +dependencies = [ + "heck", + "itertools", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.117", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "prost-types" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +dependencies = [ + "prost", +] + +[[package]] +name = "protoc-bin-vendored" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1c381df33c98266b5f08186583660090a4ffa0889e76c7e9a5e175f645a67fa" +dependencies = [ + "protoc-bin-vendored-linux-aarch_64", + "protoc-bin-vendored-linux-ppcle_64", + "protoc-bin-vendored-linux-s390_64", + "protoc-bin-vendored-linux-x86_32", + "protoc-bin-vendored-linux-x86_64", + "protoc-bin-vendored-macos-aarch_64", + "protoc-bin-vendored-macos-x86_64", + "protoc-bin-vendored-win32", +] + +[[package]] +name = "protoc-bin-vendored-linux-aarch_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c350df4d49b5b9e3ca79f7e646fde2377b199e13cfa87320308397e1f37e1a4c" + +[[package]] +name = "protoc-bin-vendored-linux-ppcle_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a55a63e6c7244f19b5c6393f025017eb5d793fd5467823a099740a7a4222440c" + +[[package]] +name = "protoc-bin-vendored-linux-s390_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dba5565db4288e935d5330a07c264a4ee8e4a5b4a4e6f4e83fad824cc32f3b0" + +[[package]] +name = "protoc-bin-vendored-linux-x86_32" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8854774b24ee28b7868cd71dccaae8e02a2365e67a4a87a6cd11ee6cdbdf9cf5" + +[[package]] +name = "protoc-bin-vendored-linux-x86_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b38b07546580df720fa464ce124c4b03630a6fb83e05c336fea2a241df7e5d78" + +[[package]] +name = "protoc-bin-vendored-macos-aarch_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89278a9926ce312e51f1d999fee8825d324d603213344a9a706daa009f1d8092" + +[[package]] +name = "protoc-bin-vendored-macos-x86_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81745feda7ccfb9471d7a4de888f0652e806d5795b61480605d4943176299756" + +[[package]] +name = "protoc-bin-vendored-win32" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95067976aca6421a523e491fce939a3e65249bac4b977adee0ee9771568e8aa3" + [[package]] name = "pulldown-cmark" version = "0.13.3" @@ -2433,6 +2752,8 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ + "libc", + "rand_chacha 0.3.1", "rand_core 0.6.4", ] @@ -2442,10 +2763,20 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" dependencies = [ - "rand_chacha", + "rand_chacha 0.9.0", "rand_core 0.9.5", ] +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.4", +] + [[package]] name = "rand_chacha" version = "0.9.0" @@ -2461,6 +2792,9 @@ name = "rand_core" version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.17", +] [[package]] name = "rand_core" @@ -2709,6 +3043,15 @@ dependencies = [ "security-framework", ] +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "rustls-pki-types" version = "1.14.0" @@ -2831,7 +3174,7 @@ version = "1.0.149" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" dependencies = [ - "indexmap", + "indexmap 2.14.0", "itoa", "memchr", "serde", @@ -2935,6 +3278,16 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7c388c1b5e93756d0c740965c41e8822f866621d41acbdf6336a6a168f8840c" +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "socket2" version = "0.6.3" @@ -3049,6 +3402,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" + [[package]] name = "syntect" version = "5.3.0" @@ -3127,7 +3486,7 @@ dependencies = [ "fancy-regex", "filedescriptor", "finl_unicode", - "fixedbitset", + "fixedbitset 0.4.2", "hex", "lazy_static", "libc", @@ -3287,7 +3646,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.6.3", "tokio-macros", "windows-sys 0.61.2", ] @@ -3313,6 +3672,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -3327,6 +3697,130 @@ dependencies = [ "tokio", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.22.1", + "bytes", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "rustls-native-certs", + "rustls-pemfile", + "socket2 0.5.10", + "tokio", + "tokio-rustls", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "prost-types", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + +[[package]] +name = "tower-service" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" + +[[package]] +name = "tracing" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "tracing-core" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" +dependencies = [ + "once_cell", +] + [[package]] name = "tree-sitter" version = "0.26.8" @@ -3885,7 +4379,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" dependencies = [ "anyhow", - "indexmap", + "indexmap 2.14.0", "wasm-encoder", "wasmparser", ] @@ -3898,7 +4392,7 @@ checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ "bitflags 2.11.0", "hashbrown 0.15.5", - "indexmap", + "indexmap 2.14.0", "semver", ] @@ -4267,7 +4761,7 @@ checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" dependencies = [ "anyhow", "heck", - "indexmap", + "indexmap 2.14.0", "prettyplease", "syn 2.0.117", "wasm-metadata", @@ -4298,7 +4792,7 @@ checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" dependencies = [ "anyhow", "bitflags 2.11.0", - "indexmap", + "indexmap 2.14.0", "log", "serde", "serde_derive", @@ -4317,7 +4811,7 @@ checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" dependencies = [ "anyhow", "id-arena", - "indexmap", + "indexmap 2.14.0", "log", "semver", "serde", diff --git a/Cargo.toml b/Cargo.toml index 313dcd6..8a73852 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,11 @@ futures = "0.3" capnp = "0.25" capnp-rpc = "0.25" +tonic = { version = "0.12", features = ["tls", "tls-roots"] } +prost = "0.13" +async-stream = "0.3" +tokio-stream = "0.1" + tokenizers = "0.22" http = "1" @@ -74,10 +79,13 @@ imagesize = "0.14" rustls = "0.23" tokio-rustls = "0.26" rustls-native-certs = "0.8" +rustls-pemfile = "2" serde_urlencoded = "0.7" [build-dependencies] capnpc = "0.25" +tonic-build = { version = "0.12", default-features = false, features = ["prost", "transport"] } +protoc-bin-vendored = "3" [lib] name = "consciousness" diff --git a/build.rs b/build.rs index 808bf31..5f77ae4 100644 --- a/build.rs +++ b/build.rs @@ -13,4 +13,21 @@ fn main() { .file("schema/channel.capnp") .run() .expect("capnp compile failed (channel.capnp)"); + + // Generate salience.v1 gRPC client + message types from proto. + // Server side (python) is generated separately via grpcio-tools. + // Use vendored protoc so we don't require a system install. + let protoc = protoc_bin_vendored::protoc_bin_path() + .expect("vendored protoc not available for this platform"); + // SAFETY: build script is single-threaded at this point; setting env + // before invoking tonic_build is the documented way to point it at a + // non-PATH protoc. + unsafe { std::env::set_var("PROTOC", protoc); } + tonic_build::configure() + .build_server(false) + .build_client(true) + .compile_protos(&["proto/salience.proto"], &["proto"]) + .expect("tonic_build compile failed (salience.proto)"); + + println!("cargo:rerun-if-changed=proto/salience.proto"); } diff --git a/proto/salience.proto b/proto/salience.proto new file mode 100644 index 0000000..01c0f1e --- /dev/null +++ b/proto/salience.proto @@ -0,0 +1,260 @@ +// salience.proto — stateful generation + per-token concept readout over gRPC. +// +// Shape: +// - One server-streaming RPC (Generate) for inference. Every other +// operation is unary. This is the minimum streaming we need — +// tokens arrive one at a time with optional readouts / logprobs — +// and keeping everything else unary makes the client dramatically +// simpler than a single bidi state machine did. +// +// - Server-side sessions hold the token list and image binaries. +// Sessions exist for bandwidth: at 200K tokens we'd otherwise +// re-ship ~800KB every turn, which hurts badly over a WAN link. +// vLLM's prefix cache holds the KV; the session just gives the +// client a handle so it can send deltas. +// +// - The client is the source of truth for prompt content. The server +// is the source of truth for image token expansion (how many +// IMAGE_PAD tokens an image becomes under this model). The client +// never writes vision tokens itself — AppendImage appends the whole +// <|vision_start|> + IMAGE_PAD×N + <|vision_end|> block server-side. +// +// - Every mutation carries (offset, truncating): the client's view of +// the server's current length, plus whether the client is deliberately +// rewriting history. Server validates on each call and rejects drift. +// No silent divergence, no migration bugs. +// +// - Errors use gRPC status codes. NOT_FOUND for missing sessions, +// FAILED_PRECONDITION for offset drift or image-block splits, +// RESOURCE_EXHAUSTED for context overflow, ABORTED for "session busy". +// +// Not in v1: +// - Authentication beyond a shared bearer token in gRPC metadata. +// - Multi-tenant session namespacing. +// - Sampling traces beyond top-k logprobs. + +syntax = "proto3"; + +package salience.v1; + +// ============================================================ +// Service +// ============================================================ + +service Salience { + // Create a fresh session. Client uses session_id on every subsequent + // RPC until CloseSession or TTL eviction (default 30 min idle). To + // refresh TTL across a long pause, issue a no-op Generate (empty + // append_tokens, max_tokens=0, no ranges). + rpc OpenSession(OpenSessionRequest) returns (OpenSessionResponse); + + // Release the session's tokens + images. Idempotent. + rpc CloseSession(CloseSessionRequest) returns (CloseSessionResponse); + + // Branch a session at a given token position. The new session + // inherits tokens [0, at_position) and any images whose vision + // block lies fully in that range. Rejected with FAILED_PRECONDITION + // if at_position falls inside an image block (client picks a clean + // boundary). + rpc ForkSession(ForkSessionRequest) returns (ForkSessionResponse); + + // Append an image to the session. Server decodes, runs vLLM's + // multimodal pipeline to compute N (IMAGE_PAD count), and writes + // the whole vision block into session.tokens. Returns N and the + // new total length. + rpc AppendImage(AppendImageRequest) returns (AppendImageResponse); + + // Prefill + optionally decode. See GenerateRequest for full + // semantics; stream yields Token events (with optional readouts / + // logprobs per position) followed by a terminating Done. + rpc Generate(GenerateRequest) returns (stream GenerateEvent); + + // Readout manifest for the currently-loaded model — concept names, + // layer indices, tensor dtype. Stateless; fetch once at client + // startup and cache. + rpc GetReadoutManifest(GetReadoutManifestRequest) returns (ReadoutManifest); +} + +// ============================================================ +// Lifecycle +// ============================================================ + +message OpenSessionRequest { + // Model identifier, must match vLLM's served model. The server + // only has one model loaded; this is a safety check on what the + // client thinks it's talking to. + string model = 1; +} + +message OpenSessionResponse { + string session_id = 1; + uint32 max_model_len = 2; +} + +message CloseSessionRequest { + string session_id = 1; +} + +message CloseSessionResponse {} + +message ForkSessionRequest { + string session_id = 1; // source session + uint32 at_position = 2; // new session inherits tokens [0, at_position) +} + +message ForkSessionResponse { + string session_id = 1; // new session +} + +// ============================================================ +// Mutation +// ============================================================ + +message AppendImageRequest { + string session_id = 1; + + // Image bytes (PNG / JPEG / WebP / …). + bytes data = 2; + + // MIME type, e.g. "image/png". + string mime = 3; + + // Client's view of the session's current token length. Must equal + // the server's actual length, OR be strictly less when + // truncating=true. Any mismatch is FAILED_PRECONDITION. + uint32 offset = 4; + + // If true, server truncates session.tokens to `offset` before + // appending. Rejected with FAILED_PRECONDITION if the truncation + // would split an image block. + bool truncating = 5; +} + +message AppendImageResponse { + // Count of <|image_pad|> tokens inside the vision block. Does not + // include the <|vision_start|> / <|vision_end|> bookends, which + // contribute one token each. + uint32 placeholder_count = 1; + + // Session's total token length after this append, including both + // bookends (= offset + placeholder_count + 2, barring truncation). + uint32 total_length = 2; +} + +// ============================================================ +// Inference +// ============================================================ + +message GenerateRequest { + string session_id = 1; + + // Tokens to append before prefill. May be empty. Client must NOT + // include vision tokens (<|vision_start|>, <|image_pad|>, + // <|vision_end|>) — those live in the session via AppendImage. + repeated uint32 append_tokens = 2; + + // Offset / truncating — same semantics as AppendImage. Truncation + // that splits an image block is FAILED_PRECONDITION. + uint32 offset = 3; + bool truncating = 4; + + // Decode budget. 0 = prefill only (no decode, emit Token events + // for positions covered by logprobs_ranges / readout_ranges, then + // Done; replaces the old /score endpoint). >0 = decode up to this + // many tokens, stopping early on EOS / stop_token_ids. + uint32 max_tokens = 5; + + // Position ranges (absolute, within the session's post-append + // token list) at which to emit logprobs on Token events. Empty = + // no logprobs. `logprob_top_k > 0` returns the top-k alternative + // tokens at each covered position; `logprob_top_k == 0` returns + // only the sampled-token's logprob. + repeated PositionRange logprobs_ranges = 6; + uint32 logprob_top_k = 7; + + // Position ranges at which to emit concept-readout vectors. Empty + // = no readouts. Logical shape per position is + // [n_layers][n_concepts] — see GetReadoutManifest. + repeated PositionRange readout_ranges = 8; + + // Sampling parameters. Meaningful only when max_tokens > 0. + float temperature = 9; // default 1.0 when zero + float top_p = 10; // default 1.0 when zero + uint32 top_k = 11; // default 0 (disabled) + repeated uint32 stop_token_ids = 12; + + // vLLM scheduler priority (0 = interactive, 10 = batch). + int32 priority = 13; +} + +message PositionRange { + uint32 start = 1; // inclusive + uint32 end = 2; // exclusive +} + +message GenerateEvent { + oneof event { + Token token = 1; + GenerateDone done = 2; + } +} + +message Token { + // Token id at this position. For prefill this is the prompt token; + // for decode it's the sampled token. + uint32 id = 1; + + // Absolute position in the session's token list. + uint32 position = 2; + + // True for prefill positions, false for decode. + bool is_prefill = 3; + + // Concept readout at this position. Empty if the position wasn't + // covered by readout_ranges. + repeated float readout = 4 [packed = true]; + + // Top-k alternative tokens' logprobs at this position — populated + // when the position is covered by logprobs_ranges and + // logprob_top_k > 0. + repeated TokenLogprob logprobs = 5; + + // Logprob of the token at `position` (the prompt token for + // prefill, the sampled token for decode). Populated when the + // position is covered by logprobs_ranges. + float sampled_logprob = 6; + bool has_sampled_logprob = 7; +} + +message TokenLogprob { + uint32 id = 1; + float logprob = 2; +} + +message GenerateDone { + uint32 prompt_tokens = 1; + uint32 completion_tokens = 2; + uint32 total_tokens = 3; + + enum FinishReason { + FINISH_REASON_UNSPECIFIED = 0; + FINISH_REASON_EOS = 1; // emitted EOS / stop token + FINISH_REASON_LENGTH = 2; // hit max_tokens + FINISH_REASON_CANCELLED = 3; // client cancelled + FINISH_REASON_STOP_STRING = 4; // matched a stop string + } + FinishReason finish_reason = 4; +} + +// ============================================================ +// Readout manifest +// ============================================================ + +message GetReadoutManifestRequest {} + +message ReadoutManifest { + repeated string concepts = 1; + repeated uint32 layers = 2; + uint32 hidden_size = 3; + string dtype = 4; +} diff --git a/src/agent/api/http.rs b/src/agent/api/http.rs index 429350b..65b759b 100644 --- a/src/agent/api/http.rs +++ b/src/agent/api/http.rs @@ -100,7 +100,7 @@ impl HttpClient { .map_err(|e| anyhow::anyhow!("invalid server name: {e}"))?; let connector = tokio_rustls::TlsConnector::from(self.tls.clone()); let tls = connector.connect(server_name.to_owned(), tcp).await - .context("TLS handshake")?; + .map_err(|e| anyhow::anyhow!("TLS handshake to {host}: {e}"))?; TokioIo::new(Box::new(tls) as Box) } else { TokioIo::new(Box::new(tcp) as Box) @@ -190,6 +190,7 @@ impl HttpClientBuilder { } pub fn build(self) -> HttpClient { + install_rustls_crypto_provider(); let certs = rustls_native_certs::load_native_certs() .certs.into_iter() .collect::>(); @@ -197,6 +198,13 @@ impl HttpClientBuilder { for cert in certs { root_store.add(cert).ok(); } + // Also trust any `.pem` files under `~/.consciousness/certs/` — + // self-signed server certs for our own vllm hosts live there. + // Drop a new `.pem` in the dir to trust a new server; no + // code change needed. + for cert in load_user_certs() { + root_store.add(cert).ok(); + } let tls = Arc::new( ClientConfig::builder() .with_root_certificates(root_store) @@ -210,6 +218,65 @@ impl HttpClientBuilder { } } +/// Install rustls' default crypto provider exactly once per process. +/// rustls 0.23 doesn't pick one automatically when multiple features +/// could provide it (e.g. when tonic pulls in both ring and aws-lc-rs +/// via transitive deps). Idempotent via OnceLock; safe to call from +/// multiple callers. +fn install_rustls_crypto_provider() { + static ONCE: std::sync::OnceLock<()> = std::sync::OnceLock::new(); + ONCE.get_or_init(|| { + let _ = rustls::crypto::ring::default_provider().install_default(); + }); +} + +/// Load every `.pem` file under `~/.consciousness/certs/` as a DER +/// certificate and return them. Silent on missing dir, missing files, +/// or parse errors — those are "no extra certs trusted" rather than +/// hard failures, to keep startup robust. +/// Load the concatenated PEM bytes of every `.pem` file under +/// `~/.consciousness/certs/` — suitable for passing to a tonic +/// `ClientTlsConfig::ca_certificate(Certificate::from_pem(...))` call +/// so gRPC connections trust the same self-signed servers the HTTP +/// path does. +pub(crate) fn load_user_certs_pem_bytes() -> Vec { + let mut out = Vec::new(); + let Some(home) = dirs::home_dir() else { return out }; + let dir = home.join(".consciousness").join("certs"); + let Ok(entries) = std::fs::read_dir(&dir) else { return out }; + for entry in entries.flatten() { + let path = entry.path(); + if path.extension().and_then(|e| e.to_str()) != Some("pem") { + continue; + } + if let Ok(bytes) = std::fs::read(&path) { + out.extend_from_slice(&bytes); + if !bytes.ends_with(b"\n") { + out.push(b'\n'); + } + } + } + out +} + +fn load_user_certs() -> Vec> { + let mut out = Vec::new(); + let Some(home) = dirs::home_dir() else { return out }; + let dir = home.join(".consciousness").join("certs"); + let Ok(entries) = std::fs::read_dir(&dir) else { return out }; + for entry in entries.flatten() { + let path = entry.path(); + if path.extension().and_then(|e| e.to_str()) != Some("pem") { + continue; + } + let Ok(bytes) = std::fs::read(&path) else { continue }; + for cert in rustls_pemfile::certs(&mut bytes.as_slice()).flatten() { + out.push(cert); + } + } + out +} + /// Trait alias for streams that work with hyper's IO adapter. trait IoStream: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static {} impl IoStream for T {} diff --git a/src/agent/api/mod.rs b/src/agent/api/mod.rs index be5e58e..06ecf70 100644 --- a/src/agent/api/mod.rs +++ b/src/agent/api/mod.rs @@ -7,13 +7,14 @@ // Set POC_DEBUG=1 for verbose per-turn logging. pub mod http; +pub mod salience; -use std::time::{Duration, Instant}; +use std::time::Duration; use anyhow::Result; use tokio::sync::mpsc; use serde::Deserialize; -use http::{HttpClient, HttpResponse}; +use http::HttpClient; #[derive(Debug, Clone, Deserialize)] pub struct Usage { @@ -48,6 +49,7 @@ impl Drop for AbortOnDrop { /// Sampling parameters for model generation. #[derive(Clone, Copy)] +#[allow(dead_code)] // fields used once Generate RPC lands in a later step pub(crate) struct SamplingParams { pub temperature: f32, pub top_p: f32, @@ -74,6 +76,10 @@ pub struct ApiClient { api_key: String, pub model: String, base_url: String, + /// Cached readout manifest — fetched once per process and shared + /// across ApiClient clones (every Agent/fork gets the same cell). + /// `None` after fetch means the server has readout disabled (404). + manifest: std::sync::Arc>>, } impl ApiClient { @@ -88,36 +94,30 @@ impl ApiClient { api_key: api_key.to_string(), model: model.to_string(), base_url: base_url.trim_end_matches('/').to_string(), + manifest: std::sync::Arc::new(tokio::sync::OnceCell::new()), } } - pub(crate) fn stream_completion_mm( + /// Stream generation via a gRPC session. Stubbed during the + /// unary-rewrite transition — the Generate RPC is wired in a + /// later step of this series. Until then, callers that reach + /// this path get a StreamToken::Error. + pub(crate) fn stream_session_mm( &self, - prompt_tokens: &[u32], - images: &[super::context::WireImage], - sampling: SamplingParams, - priority: Option, + _session_lock: std::sync::Arc>>, + _prompt_tokens: &[u32], + _images: &[super::context::WireImage], + _sampling: SamplingParams, + _priority: Option, ) -> (mpsc::UnboundedReceiver, AbortOnDrop) { let (tx, rx) = mpsc::unbounded_channel(); - let client = self.client.clone(); - let api_key = self.api_key.clone(); - let model = self.model.clone(); - let prompt_tokens = prompt_tokens.to_vec(); - let images: Vec<(Vec, String)> = images.iter() - .map(|i| (i.bytes.clone(), i.mime.clone())) - .collect(); - let base_url = self.base_url.clone(); - let handle = tokio::spawn(async move { - let result = stream_completions( - &client, &base_url, &api_key, &model, - &prompt_tokens, &images, &tx, sampling, priority, - ).await; - if let Err(e) = result { - let _ = tx.send(StreamToken::Error(e.to_string())); - } + let _ = tx.send(StreamToken::Error( + "Generate RPC not yet wired after protocol rewrite — see \ + proto/salience.proto; AppendImage / Generate land next." + .into(), + )); }); - (rx, AbortOnDrop(handle)) } @@ -128,386 +128,31 @@ impl ApiClient { /// readout is enabled on the server, `Ok(None)` on 404 (disabled), /// or an error on any other failure. /// - /// Call once at startup and cache the result; the manifest doesn't - /// change during a server run. + /// First call performs the HTTP fetch; subsequent calls (including + /// across ApiClient clones sharing the same cell) return the + /// cached result. The manifest doesn't change during a server run. pub async fn fetch_readout_manifest(&self) -> Result> { - let url = format!("{}/readout/manifest", self.base_url); - let auth = format!("Bearer {}", self.api_key); - let response = self - .client - .get_with_headers(&url, &[("Authorization", &auth)]) - .await - .map_err(|e| anyhow::anyhow!("readout manifest fetch ({}): {}", url, e))?; - let status = response.status(); - if status.as_u16() == 404 { - return Ok(None); - } - if !status.is_success() { - let body = response.text().await.unwrap_or_default(); - let n = body.floor_char_boundary(body.len().min(500)); - anyhow::bail!("readout manifest HTTP {} ({}): {}", status, url, &body[..n]); - } - Ok(Some(response.json().await?)) + let manifest = self.manifest.get_or_try_init(|| async { + let url = format!("{}/readout/manifest", self.base_url); + let auth = format!("Bearer {}", self.api_key); + let response = self + .client + .get_with_headers(&url, &[("Authorization", &auth)]) + .await + .map_err(|e| anyhow::anyhow!("readout manifest fetch ({}): {}", url, e))?; + let status = response.status(); + if status.as_u16() == 404 { + return Ok::<_, anyhow::Error>(None); + } + if !status.is_success() { + let body = response.text().await.unwrap_or_default(); + let n = body.floor_char_boundary(body.len().min(500)); + anyhow::bail!("readout manifest HTTP {} ({}): {}", status, url, &body[..n]); + } + Ok(Some(response.json().await?)) + }).await?; + Ok(manifest.clone()) } } -async fn stream_completions( - client: &HttpClient, - base_url: &str, - api_key: &str, - model: &str, - prompt_tokens: &[u32], - images: &[(Vec, String)], - tx: &mpsc::UnboundedSender, - sampling: SamplingParams, - priority: Option, -) -> anyhow::Result<()> { - let mut request = serde_json::json!({ - "model": model, - "prompt": prompt_tokens, - "max_tokens": 16384, - "temperature": sampling.temperature, - "top_p": sampling.top_p, - "top_k": sampling.top_k, - "stream": true, - "return_token_ids": true, - "skip_special_tokens": false, - "stop_token_ids": [super::tokenizer::IM_END], - }); - if !images.is_empty() { - use base64::Engine; - let b64 = base64::engine::general_purpose::STANDARD; - let uris: Vec = images.iter() - .map(|(bytes, mime)| format!("data:{};base64,{}", mime, b64.encode(bytes))) - .collect(); - request["multi_modal_data"] = serde_json::json!({ "image": uris }); - } - if let Some(p) = priority { - request["priority"] = serde_json::json!(p); - } - - let url = format!("{}/completions", base_url); - let debug_label = format!("{} prompt tokens, model={}", prompt_tokens.len(), model); - - let mut response = send_and_check( - client, &url, &request, - ("Authorization", &format!("Bearer {}", api_key)), - &[], &debug_label, None, - ).await?; - - let mut reader = SseReader::new(); - let mut usage = None; - - while let Some(event) = reader.next_event(&mut response).await? { - if let Some(err_msg) = event["error"]["message"].as_str() { - anyhow::bail!("API error in stream: {}", err_msg); - } - - if let Some(u) = event["usage"].as_object() { - if let Ok(u) = serde_json::from_value::(serde_json::Value::Object(u.clone())) { - usage = Some(u); - } - } - - let choices = match event["choices"].as_array() { - Some(c) => c, - None => continue, - }; - - for choice in choices { - // `readout`, if present, is a nested list - // `[num_tokens][n_layers][n_concepts]`. Parse it once per - // chunk and pair rows with token ids by index — the rows - // are in the same order as `token_ids`. - let readouts: Option> = choice["readout"] - .as_array() - .map(|outer| { - outer.iter().filter_map(|per_token| { - per_token.as_array().map(|layers| { - layers.iter().filter_map(|per_layer| { - per_layer.as_array().map(|vals| { - vals.iter() - .filter_map(|v| v.as_f64().map(|f| f as f32)) - .collect::>() - }) - }).collect::>>() - }) - }).collect() - }); - - if let Some(ids) = choice["token_ids"].as_array() { - for (i, id_val) in ids.iter().enumerate() { - if let Some(id) = id_val.as_u64() { - let readout = readouts - .as_ref() - .and_then(|r| r.get(i).cloned()); - let _ = tx.send(StreamToken::Token { - id: id as u32, - readout, - }); - } - } - } else if let Some(text) = choice["text"].as_str() { - // Fallback: provider didn't return token_ids, encode locally. - // No readout available in this path — the encoder may - // produce a different token count than the server did. - if !text.is_empty() { - for id in super::tokenizer::encode(text) { - let _ = tx.send(StreamToken::Token { id, readout: None }); - } - } - } - } - } - - let _ = tx.send(StreamToken::Done { usage }); - Ok(()) -} - -/// Send an HTTP request and check for errors. -pub(crate) async fn send_and_check( - client: &HttpClient, - url: &str, - body: &impl serde::Serialize, - auth_header: (&str, &str), - extra_headers: &[(&str, &str)], - debug_label: &str, - request_json: Option<&str>, -) -> Result { - let debug = std::env::var("POC_DEBUG").is_ok(); - let start = Instant::now(); - - if debug { - let payload_size = serde_json::to_string(body) - .map(|s| s.len()) - .unwrap_or(0); - dbglog!( - "request: {}K payload, {}", - payload_size / 1024, debug_label, - ); - } - - let mut headers: Vec<(&str, &str)> = Vec::with_capacity(extra_headers.len() + 1); - headers.push(auth_header); - headers.extend_from_slice(extra_headers); - - let response = client - .send_json("POST", url, &headers, body) - .await - .map_err(|e| { - let msg = e.to_string(); - let cause = if msg.contains("connect timeout") || msg.contains("TCP connect") { - "connection refused" - } else if msg.contains("request timeout") { - "request timed out" - } else { - "request error" - }; - anyhow::anyhow!("{} ({}): {}", cause, url, msg) - })?; - - let status = response.status(); - let elapsed = start.elapsed(); - - if debug { - for name in [ - "x-ratelimit-remaining", - "x-ratelimit-limit", - "x-request-id", - ] { - if let Some(val) = response.header(name) { - dbglog!("header {}: {}", name, val); - } - } - } - - if !status.is_success() { - let body = response.text().await.unwrap_or_default(); - dbglog!( - "HTTP {} after {:.1}s ({}): {}", - status, - elapsed.as_secs_f64(), - url, - &body[..body.floor_char_boundary(body.len().min(500))] - ); - if let Some(json) = request_json { - let log_dir = dirs::home_dir() - .unwrap_or_default() - .join(".consciousness/logs/failed-requests"); - let _ = std::fs::create_dir_all(&log_dir); - let ts = chrono::Local::now().format("%Y%m%dT%H%M%S"); - let path = log_dir.join(format!("{}.json", ts)); - if std::fs::write(&path, json).is_ok() { - dbglog!( - "saved failed request to {} (HTTP {})", path.display(), status - ); - } - } - anyhow::bail!("HTTP {} ({}): {}", status, url, &body[..body.floor_char_boundary(body.len().min(1000))]); - } - - if debug { - dbglog!( - "connected in {:.1}s (HTTP {})", - elapsed.as_secs_f64(), - status.as_u16() - ); - } - - Ok(response) -} - -/// SSE stream reader. Handles the generic SSE plumbing shared by both -/// backends: chunk reading with timeout, line buffering, `data:` prefix -/// stripping, `[DONE]` detection, JSON parsing, and parse error diagnostics. -/// Yields parsed events as serde_json::Value — each backend handles its -/// own event types. -pub(crate) struct SseReader { - line_buf: String, - chunk_timeout: Duration, - pub stream_start: Instant, - pub chunks_received: u64, - pub sse_lines_parsed: u64, - pub sse_parse_errors: u64, - debug: bool, - done: bool, - /// Serialized request payload — saved to disk on errors for replay debugging. - pub(crate) request_json: Option, -} - -impl SseReader { - pub(crate) fn new() -> Self { - Self { - line_buf: String::new(), - chunk_timeout: Duration::from_secs(crate::config::get().api_stream_timeout_secs), - stream_start: Instant::now(), - chunks_received: 0, - sse_lines_parsed: 0, - sse_parse_errors: 0, - debug: std::env::var("POC_DEBUG").is_ok(), - done: false, - request_json: None, - } - } - - /// Attach the serialized request payload for error diagnostics. - /// Save the request payload to disk for replay debugging. - fn save_failed_request(&self, reason: &str) { - let Some(ref json) = self.request_json else { return }; - let log_dir = dirs::home_dir() - .unwrap_or_default() - .join(".consciousness/logs/failed-requests"); - let _ = std::fs::create_dir_all(&log_dir); - let ts = chrono::Local::now().format("%Y%m%dT%H%M%S"); - let path = log_dir.join(format!("{}.json", ts)); - if std::fs::write(&path, json).is_ok() { - dbglog!( - "saved failed request to {} ({})", path.display(), reason - ); - } - } - - /// Read the next SSE event from the response stream. - /// Returns Ok(Some(value)) for each parsed data line, - /// Ok(None) when the stream ends or [DONE] is received. - pub(crate) async fn next_event( - &mut self, - response: &mut HttpResponse, - ) -> Result> { - loop { - // Drain complete lines from the buffer before reading more chunks - while let Some(newline_pos) = self.line_buf.find('\n') { - let line = self.line_buf[..newline_pos].trim().to_string(); - self.line_buf = self.line_buf[newline_pos + 1..].to_string(); - - if line == "data: [DONE]" { - self.done = true; - return Ok(None); - } - if line.is_empty() - || line.starts_with("event: ") - || !line.starts_with("data: ") - { - continue; - } - - let json_str = &line[6..]; - self.sse_lines_parsed += 1; - - match serde_json::from_str(json_str) { - Ok(v) => return Ok(Some(v)), - Err(e) => { - self.sse_parse_errors += 1; - if self.sse_parse_errors == 1 || self.debug { - let preview = if json_str.len() > 200 { - format!("{}...", &json_str[..200]) - } else { - json_str.to_string() - }; - dbglog!( - "SSE parse error (#{}) {}: {}", - self.sse_parse_errors, e, preview - ); - } - continue; - } - } - } - - if self.done { - return Ok(None); - } - - // Read more data from the response stream - match tokio::time::timeout(self.chunk_timeout, response.chunk()).await { - Ok(Ok(Some(chunk))) => { - self.chunks_received += 1; - self.line_buf.push_str(&String::from_utf8_lossy(&chunk)); - } - Ok(Ok(None)) => return Ok(None), - Ok(Err(e)) => { - let buf_preview = if self.line_buf.is_empty() { - "(empty)".to_string() - } else { - let n = self.line_buf.len().min(500); - format!("{}B: {}", self.line_buf.len(), &self.line_buf[..n]) - }; - let msg = format!( - "stream error after {} chunks, {:.1}s, {} sse lines: {} | buf: {}", - self.chunks_received, - self.stream_start.elapsed().as_secs_f64(), - self.sse_lines_parsed, - e, buf_preview, - ); - dbglog!("{}", msg); - self.save_failed_request(&msg); - return Err(e.into()); - } - Err(_) => { - let buf_preview = if self.line_buf.is_empty() { - "(empty)".to_string() - } else { - let n = self.line_buf.len().min(500); - format!("{}B: {}", self.line_buf.len(), &self.line_buf[..n]) - }; - let msg = format!( - "stream timeout: {}s, {} chunks, {} sse lines, {:.1}s elapsed | buf: {}", - self.chunk_timeout.as_secs(), - self.chunks_received, - self.sse_lines_parsed, - self.stream_start.elapsed().as_secs_f64(), - buf_preview, - ); - dbglog!("{}", msg); - self.save_failed_request(&msg); - anyhow::bail!( - "stream timeout: no data for {}s ({} chunks received)", - self.chunk_timeout.as_secs(), - self.chunks_received - ); - } - } - } - } -} diff --git a/src/agent/api/salience.rs b/src/agent/api/salience.rs new file mode 100644 index 0000000..f9ea83d --- /dev/null +++ b/src/agent/api/salience.rs @@ -0,0 +1,249 @@ +// agent/api/salience.rs — gRPC client bindings for salience.v1. +// +// Thin wrapper around the tonic-generated types. Every RPC except +// Generate is unary; Generate is server-streaming. Free functions +// (open/close session) wrap the lifecycle RPCs; `SessionHandle` just +// carries the id + connection params so later RPCs can reuse them. +// +// The old bidi Session() API is gone — see git history for its shape. + +#![allow(clippy::enum_variant_names)] + +use anyhow::{Context, Result}; +use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint}; + +/// Generated prost + tonic types for salience.v1. Call sites use +/// `pb::OpenSessionRequest`, `pb::Token`, etc. +pub mod pb { + tonic::include_proto!("salience.v1"); +} + +pub type SalienceClient = pb::salience_client::SalienceClient; + +/// Open a TLS-aware gRPC channel to the salience server. `base_url` +/// looks like `https://host:8443`. User-provided CA certs under +/// `~/.consciousness/certs/` are trusted in addition to the system +/// roots (for self-signed server certs). +pub async fn connect(base_url: &str) -> Result { + let mut endpoint = Endpoint::from_shared(base_url.to_string()) + .with_context(|| format!("invalid salience endpoint: {}", base_url))? + .connect_timeout(std::time::Duration::from_secs(30)) + .timeout(std::time::Duration::from_secs(600)); + + if base_url.starts_with("https://") { + let user_certs = super::http::load_user_certs_pem_bytes(); + let mut tls = ClientTlsConfig::new().with_native_roots(); + if !user_certs.is_empty() { + tls = tls.ca_certificate(Certificate::from_pem(user_certs)); + } + endpoint = endpoint + .tls_config(tls) + .with_context(|| "configuring tonic TLS")?; + } + + let channel = endpoint + .connect() + .await + .with_context(|| format!("failed to connect to salience server at {}", base_url))?; + Ok(pb::salience_client::SalienceClient::new(channel)) +} + +/// Derive the gRPC base URL from the HTTP completions base URL. +/// +/// vLLM's salience gRPC server listens on a different port (8443) from +/// the HTTP endpoint (8000) and accepts no path component. Given an +/// HTTP base like `https://host:8000/v1`, produce `https://host:8443`. +/// No-op when the path is empty and the port isn't 8000. +pub fn derive_grpc_url(http_base: &str) -> String { + let mut url = http_base.trim_end_matches('/').to_string(); + if let Some(proto_end) = url.find("://") { + let rest_start = proto_end + 3; + if let Some(path_slash) = url[rest_start..].find('/') { + url.truncate(rest_start + path_slash); + } + } + url.replace(":8000", ":8443") +} + +/// Attach a bearer token to a tonic request as gRPC metadata. +pub fn with_auth(req: &mut tonic::Request, api_key: &str) { + if api_key.is_empty() { + return; + } + let bearer = format!("Bearer {}", api_key); + if let Ok(val) = bearer.parse() { + req.metadata_mut().insert("authorization", val); + } +} + +/// Call the server's `OpenSession` RPC and return the response. +pub async fn open_session( + base_url: &str, + api_key: &str, + model: &str, +) -> Result { + let mut client = connect(base_url).await?; + let mut req = tonic::Request::new(pb::OpenSessionRequest { + model: model.to_string(), + }); + with_auth(&mut req, api_key); + let resp = client + .open_session(req) + .await + .with_context(|| "OpenSession RPC failed")?; + Ok(resp.into_inner()) +} + +/// Call the server's `CloseSession` RPC. Idempotent on the server. +pub async fn close_session(base_url: &str, api_key: &str, session_id: &str) -> Result<()> { + let mut client = connect(base_url).await?; + let mut req = tonic::Request::new(pb::CloseSessionRequest { + session_id: session_id.to_string(), + }); + with_auth(&mut req, api_key); + client + .close_session(req) + .await + .with_context(|| "CloseSession RPC failed")?; + Ok(()) +} + +/// Append an image to a session. Server decodes the image, computes N +/// via vLLM's own multimodal pipeline, writes the full vision block +/// (`<|vision_start|> + IMAGE_PAD×N + <|vision_end|>`) into +/// session.tokens, and returns (N, new total length). +/// +/// `offset` is the client's view of the session's current token count; +/// the server rejects if it diverges from its own (unless +/// `truncating=true`, in which case the server slices to `offset` +/// first — but never through a vision block). +pub async fn append_image( + base_url: &str, + api_key: &str, + session_id: &str, + data: Vec, + mime: String, + offset: u32, + truncating: bool, +) -> Result { + let mut client = connect(base_url).await?; + let mut req = tonic::Request::new(pb::AppendImageRequest { + session_id: session_id.to_string(), + data, + mime, + offset, + truncating, + }); + with_auth(&mut req, api_key); + let resp = client + .append_image(req) + .await + .with_context(|| "AppendImage RPC failed")?; + Ok(resp.into_inner()) +} + +/// Handle to a server-side session. Carries the id + connection params +/// so subsequent per-session RPCs (AppendImage, Generate, ForkSession) +/// can be issued without the caller juggling base_url / api_key each +/// time. +pub struct SessionHandle { + pub session_id: String, + pub max_model_len: u32, + pub base_url: String, + pub api_key: String, +} + +impl SessionHandle { + pub async fn open(base_url: &str, api_key: &str, model: &str) -> Result { + let grpc_url = derive_grpc_url(base_url); + log::debug!(target: "grpc", + "SessionHandle::open http_base={} -> grpc_url={}", + base_url, grpc_url); + let resp = open_session(&grpc_url, api_key, model).await?; + log::debug!(target: "grpc", + "SessionHandle::open session_id={} max_model_len={}", + resp.session_id, resp.max_model_len); + Ok(Self { + session_id: resp.session_id, + max_model_len: resp.max_model_len, + base_url: grpc_url, + api_key: api_key.to_string(), + }) + } + + pub async fn close(self) -> Result<()> { + close_session(&self.base_url, &self.api_key, &self.session_id).await + } + + /// Append an image via the server-side vision block. See + /// `append_image` free function for full semantics. + pub async fn append_image( + &self, + data: Vec, + mime: String, + offset: u32, + truncating: bool, + ) -> Result { + append_image( + &self.base_url, + &self.api_key, + &self.session_id, + data, + mime, + offset, + truncating, + ) + .await + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn generated_types_compile() { + // Exercise the shape of the new proto types — if build.rs + // stops regenerating against the proto, this stops compiling. + let _open = pb::OpenSessionRequest { + model: "qwen3-vl".into(), + }; + let _tok = pb::Token { + id: 42, + position: 0, + is_prefill: false, + readout: vec![0.1, 0.2, 0.3], + logprobs: vec![pb::TokenLogprob { + id: 1, + logprob: -0.5, + }], + sampled_logprob: -0.1, + has_sampled_logprob: true, + }; + let _done = pb::GenerateDone { + prompt_tokens: 10, + completion_tokens: 20, + total_tokens: 30, + finish_reason: pb::generate_done::FinishReason::Eos as i32, + }; + let _evt = pb::GenerateEvent { + event: Some(pb::generate_event::Event::Done(_done)), + }; + } + + #[test] + fn derive_grpc_url_cases() { + assert_eq!( + derive_grpc_url("https://host:8000/v1"), + "https://host:8443", + ); + assert_eq!( + derive_grpc_url("https://host:8000/"), + "https://host:8443", + ); + assert_eq!( + derive_grpc_url("https://host:9000/v1"), + "https://host:9000", + ); + } +} diff --git a/src/agent/context.rs b/src/agent/context.rs index 2009cfc..ab21e21 100644 --- a/src/agent/context.rs +++ b/src/agent/context.rs @@ -359,8 +359,8 @@ impl AstNode { mime: impl Into, orig_height: u32, orig_width: u32, + token_count: u32, ) -> Self { - let token_count = qwen3_image_token_count(orig_height, orig_width); Self::Leaf(NodeLeaf::new(NodeBody::Image { bytes, mime: mime.into(), @@ -898,10 +898,12 @@ impl Ast for ContextState { } /// An image collected from the AST for a request body. The AST stores -/// the pre-expanded token form (N image_pads) for accurate budget -/// accounting; the wire form collapses each Image to a single -/// `<|image_pad|>` between vision bookends and ships the bytes -/// separately as multi_modal_data. +/// the pre-expanded token form (`<|vision_start|> + <|image_pad|>×N + +/// <|vision_end|>`), and the wire form mirrors that exactly so the +/// server's `session.tokens` length matches what vLLM's engine will +/// process. The authoritative N is obtained from the server via the +/// CountImageTokens RPC before the Image leaf is constructed. +#[derive(Clone)] pub struct WireImage { pub bytes: Vec, pub mime: String, @@ -911,9 +913,10 @@ fn wire_into(node: &AstNode, tokens: &mut Vec, images: &mut Vec) match node { AstNode::Leaf(leaf) => match leaf.body() { NodeBody::Image { bytes, mime, .. } => { - tokens.push(tokenizer::VISION_START); - tokens.push(tokenizer::IMAGE_PAD); - tokens.push(tokenizer::VISION_END); + // Send the pre-expanded token form (includes N + // <|image_pad|> tokens); engine's multi_modal + // pipeline pairs them with the binary data below. + tokens.extend_from_slice(leaf.token_ids()); images.push(WireImage { bytes: bytes.clone(), mime: mime.clone(), @@ -1225,11 +1228,20 @@ impl ContextState { // to at request time. Constants come from Qwen3.5-27B's preprocessor_config. // --------------------------------------------------------------------------- +// Test-only client-side estimate of image token expansion. Production +// callers obtain the authoritative count from the server via +// CountImageTokens; these constants and helpers stay around only to +// keep the context-shape unit tests self-contained. +#[cfg(test)] const QWEN3_PATCH_SIZE: u32 = 16; +#[cfg(test)] const QWEN3_MERGE_SIZE: u32 = 2; +#[cfg(test)] const QWEN3_MIN_PIXELS: u64 = 65_536; +#[cfg(test)] const QWEN3_MAX_PIXELS: u64 = 16_777_216; +#[cfg(test)] fn smart_resize(h: u32, w: u32, factor: u32, min_pixels: u64, max_pixels: u64) -> (u32, u32) { let max_s = h.max(w) as f64; let min_s = h.min(w) as f64; @@ -1258,10 +1270,10 @@ fn smart_resize(h: u32, w: u32, factor: u32, min_pixels: u64, max_pixels: u64) - } } -/// Compute how many `<|image_pad|>` tokens vLLM will emit for an image of -/// the given dimensions. Matches Qwen3VL's feature-size calculation exactly: -/// (grid_h * grid_w) / merge_size^2 -/// where (grid_h, grid_w) = resized dims / patch_size. +/// Test-only: client-side estimate of how many `<|image_pad|>` tokens +/// vLLM will emit for an image of the given dimensions. Production +/// callers use `salience::count_image_tokens` (server-authoritative). +#[cfg(test)] fn qwen3_image_token_count(orig_h: u32, orig_w: u32) -> u32 { let factor = QWEN3_PATCH_SIZE * QWEN3_MERGE_SIZE; let (rh, rw) = smart_resize(orig_h, orig_w, factor, QWEN3_MIN_PIXELS, QWEN3_MAX_PIXELS); @@ -1697,7 +1709,7 @@ mod tests { #[test] fn test_image_render_and_token_ids() { - let node = AstNode::image(vec![0u8, 1, 2, 3], "image/png", 512, 512); + let node = AstNode::image(vec![0u8, 1, 2, 3], "image/png", 512, 512, qwen3_image_token_count(512, 512)); let leaf = node.leaf().unwrap(); // 3 tokens of bookend + 256 image_pad tokens assert_eq!(leaf.token_ids().len(), 258); @@ -1713,36 +1725,41 @@ mod tests { } #[test] - fn test_wire_prompt_collapses_image_pads() { + fn test_wire_prompt_preserves_expanded_image_pads() { let mut ctx = ContextState::new(); ctx.push_no_log(Section::Conversation, AstNode::branch(Role::User, vec![ AstNode::content("look:"), - AstNode::image(vec![0xDE, 0xAD], "image/png", 512, 512), + AstNode::image(vec![0xDE, 0xAD], "image/png", 512, 512, qwen3_image_token_count(512, 512)), ])); - // AST side: N image_pads + bookends, full budget accounting. + // AST side and wire side should both carry N image_pads + bookends — + // server's session.tokens length must match what vLLM's engine will + // actually process. Binary image bytes are shipped separately in + // multi_modal_data via the WireImage list. + let n_expected = qwen3_image_token_count(512, 512) as usize; + let full = ctx.token_ids(); let n_image_pads_full = full.iter() .filter(|&&t| t == tokenizer::IMAGE_PAD).count(); - assert_eq!(n_image_pads_full, qwen3_image_token_count(512, 512) as usize); + assert_eq!(n_image_pads_full, n_expected); - // Wire side: single image_pad, bytes moved to images list. let (wire, images, _) = ctx.wire_prompt(0..ctx.conversation().len(), |_| false); let n_image_pads_wire = wire.iter() .filter(|&&t| t == tokenizer::IMAGE_PAD).count(); - assert_eq!(n_image_pads_wire, 1); + assert_eq!(n_image_pads_wire, n_expected); + assert_eq!(images.len(), 1); assert_eq!(images[0].bytes, vec![0xDE, 0xAD]); assert_eq!(images[0].mime, "image/png"); - // vision_start/vision_end bookends are preserved in wire form. + // One pair of vision_start/vision_end bookends around the N pads. assert_eq!(wire.iter().filter(|&&t| t == tokenizer::VISION_START).count(), 1); assert_eq!(wire.iter().filter(|&&t| t == tokenizer::VISION_END).count(), 1); } #[test] fn test_image_serde_roundtrip() { - let node = AstNode::image(vec![0xDE, 0xAD, 0xBE, 0xEF], "image/png", 64, 64); + let node = AstNode::image(vec![0xDE, 0xAD, 0xBE, 0xEF], "image/png", 64, 64, qwen3_image_token_count(64, 64)); let json = serde_json::to_string(&node).unwrap(); // bytes must be base64-encoded in the JSON form assert!(json.contains("3q2+7w==")); diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 2c3a98a..6a55f3f 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -17,6 +17,7 @@ pub mod api; pub mod context; pub mod oneshot; pub mod readout; +pub mod salience; pub mod tokenizer; pub mod tools; @@ -148,6 +149,14 @@ pub struct Agent { /// token handler, read by UI screens (amygdala). Manifest is /// `None` when the server has readout disabled. pub readout: readout::SharedReadoutBuffer, + /// Long-lived gRPC session to the salience server, lazily opened + /// on first use. Tracks appended tokens so subsequent turns send + /// only the delta (prefix-cache reuse). None when not yet opened + /// or when the session has died and needs reopening. + /// + /// Arc-wrapped so the spawned streaming task can share ownership + /// (the task outlives the call site). + pub grpc_session: std::sync::Arc>>, } /// Mutable agent state — behind its own mutex. @@ -224,6 +233,7 @@ impl Agent { session_id, context: crate::Mutex::new(context), readout, + grpc_session: std::sync::Arc::new(crate::Mutex::new(None)), state: crate::Mutex::new(AgentState { tools: agent_tools, mcp_tools: McpToolAccess::All, @@ -292,6 +302,9 @@ impl Agent { // shouldn't bleed into the main emotional readout even // though they hit the same vLLM server. readout: readout::new_shared(), + // Forks get their own session — can't share a bidi stream, + // and forks have different conversation tails anyway. + grpc_session: std::sync::Arc::new(crate::Mutex::new(None)), state: crate::Mutex::new(AgentState { tools, mcp_tools: McpToolAccess::None, @@ -406,7 +419,8 @@ impl Agent { let (rx, _stream_guard) = { let (prompt_tokens, images) = agent.assemble_prompt().await; let st = agent.state.lock().await; - agent.client.stream_completion_mm( + agent.client.stream_session_mm( + agent.grpc_session.clone(), &prompt_tokens, &images, api::SamplingParams { @@ -427,7 +441,8 @@ impl Agent { idx }; - let parser = ResponseParser::new(branch_idx); + let think_native = agent.state.lock().await.think_native; + let parser = ResponseParser::new(branch_idx, think_native); let (mut tool_rx, parser_handle) = parser.run(rx, agent.clone()); let mut pending_calls: Vec = Vec::new(); diff --git a/src/agent/salience.rs b/src/agent/salience.rs new file mode 100644 index 0000000..8cecd50 --- /dev/null +++ b/src/agent/salience.rs @@ -0,0 +1,309 @@ +// agent/salience.rs — peak extraction from per-token concept-readout traces. +// +// Consumes a trace of `ReadoutEntry` (per-token per-layer per-concept +// projections streamed from the vLLM server) and produces a compact +// list of `SaliencePeak` events — one per contiguous above-threshold +// region per concept, placed at the local maximum. +// +// Pure function. No I/O, no async, no side effects. Caller supplies the +// trace slice and manifest; caller decides what to do with the events. +// +// See also: `salience-trace-plumbing-architecture` memory node. + +use super::api::ReadoutManifest; +use super::readout::ReadoutEntry; + +/// One salient moment in a trace — a concept channel crossed threshold, +/// and we picked the local maximum within the contiguous above-threshold +/// run. +#[derive(Debug, Clone, PartialEq)] +pub struct SaliencePeak { + /// Index into the trace (0-based) where the peak occurred. + pub token_offset: usize, + /// Concept name from the manifest. + pub concept: String, + /// z-score of the peak value vs the trace's own distribution for + /// that concept. Always positive (we only pick above-threshold). + pub intensity: f32, +} + +/// Tunables for peak extraction. +#[derive(Debug, Clone)] +pub struct PeakConfig { + /// Minimum z-score to count as a peak. Default 2.0 (~top 2.5% assuming + /// normal-ish distribution, though readouts are rarely normal). + pub sigma_threshold: f32, + /// Minimum standard deviation of a concept channel for peaks to be + /// reported. If a channel is numerically flat across the whole trace, + /// tiny fluctuations can produce spurious "peaks" with huge z-scores; + /// require at least this much variation before trusting the channel. + pub min_std: f32, +} + +impl Default for PeakConfig { + fn default() -> Self { + Self { sigma_threshold: 2.0, min_std: 1e-4 } + } +} + +/// Extract peak events from a trace for one layer. +/// +/// `layer_idx` indexes into the per-token readout tensor's layer +/// dimension. If the trace is empty, the layer is out of range for any +/// entry, or the manifest is empty, returns `Vec::new()`. +/// +/// Peaks are returned sorted by `token_offset` ascending. When two +/// peaks share an offset they're ordered by `concept` lexicographically +/// for determinism. +pub fn pick_peaks( + trace: &[ReadoutEntry], + manifest: &ReadoutManifest, + layer_idx: usize, + config: &PeakConfig, +) -> Vec { + if trace.is_empty() || manifest.concepts.is_empty() { + return Vec::new(); + } + + let n_concepts = manifest.concepts.len(); + let n_tokens = trace.len(); + + // Pull a [n_tokens × n_concepts] column-major view for the selected + // layer. Entries where the layer is missing or the concept count + // doesn't match the manifest are treated as zeros — the downstream + // z-score will drown them as baseline if they're sparse, and if they + // dominate the caller has bigger problems. + let mut by_concept: Vec> = vec![Vec::with_capacity(n_tokens); n_concepts]; + for entry in trace { + match entry.readout.get(layer_idx) { + Some(row) if row.len() == n_concepts => { + for (c, v) in row.iter().enumerate() { + by_concept[c].push(*v); + } + } + _ => { + for col in by_concept.iter_mut() { + col.push(0.0); + } + } + } + } + + let mut peaks: Vec = Vec::new(); + for (c_idx, values) in by_concept.iter().enumerate() { + let (mean, std) = mean_std(values); + if std < config.min_std { + continue; + } + let concept = &manifest.concepts[c_idx]; + + // Walk contiguous above-threshold runs, emit one peak per run + // at the local max. + let mut run_start: Option = None; + let mut run_max_offset: usize = 0; + let mut run_max_z: f32 = 0.0; + for (i, v) in values.iter().enumerate() { + let z = (*v - mean) / std; + let above = z >= config.sigma_threshold; + if above { + if run_start.is_none() { + run_start = Some(i); + run_max_offset = i; + run_max_z = z; + } else if z > run_max_z { + run_max_offset = i; + run_max_z = z; + } + } else if run_start.is_some() { + peaks.push(SaliencePeak { + token_offset: run_max_offset, + concept: concept.clone(), + intensity: run_max_z, + }); + run_start = None; + } + } + // Flush trailing run. + if run_start.is_some() { + peaks.push(SaliencePeak { + token_offset: run_max_offset, + concept: concept.clone(), + intensity: run_max_z, + }); + } + } + + peaks.sort_by(|a, b| a.token_offset.cmp(&b.token_offset).then_with(|| a.concept.cmp(&b.concept))); + peaks +} + +/// Mean and population std of a slice. Returns (0.0, 0.0) for empty input. +fn mean_std(xs: &[f32]) -> (f32, f32) { + if xs.is_empty() { + return (0.0, 0.0); + } + let n = xs.len() as f32; + let mean = xs.iter().sum::() / n; + let var = xs.iter().map(|x| (x - mean).powi(2)).sum::() / n; + (mean, var.sqrt()) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn manifest(concepts: &[&str], layers: &[u32]) -> ReadoutManifest { + ReadoutManifest { + concepts: concepts.iter().map(|s| s.to_string()).collect(), + layers: layers.to_vec(), + } + } + + /// Build a trace where all entries have one hooked layer and the + /// given per-token values for each concept. `values[t][c]` = value + /// at token t, concept c. + fn trace(values: &[Vec]) -> Vec { + values.iter().enumerate().map(|(i, row)| ReadoutEntry { + token_id: i as u32, + readout: vec![row.clone()], + }).collect() + } + + #[test] + fn empty_trace_returns_empty() { + let m = manifest(&["curious"], &[63]); + let peaks = pick_peaks(&[], &m, 0, &PeakConfig::default()); + assert!(peaks.is_empty()); + } + + #[test] + fn empty_manifest_returns_empty() { + let m = manifest(&[], &[63]); + let t = trace(&[vec![], vec![], vec![]]); + let peaks = pick_peaks(&t, &m, 0, &PeakConfig::default()); + assert!(peaks.is_empty()); + } + + #[test] + fn flat_channel_produces_no_peaks() { + let m = manifest(&["curious"], &[63]); + let t = trace(&[vec![1.0], vec![1.0], vec![1.0], vec![1.0], vec![1.0]]); + let peaks = pick_peaks(&t, &m, 0, &PeakConfig::default()); + assert!(peaks.is_empty(), "flat channel should produce no peaks, got {:?}", peaks); + } + + #[test] + fn single_spike_detected() { + // Ten baseline zeros with one 5.0 spike — that single token's + // z-score will easily exceed 2σ. + let m = manifest(&["curious"], &[63]); + let mut rows: Vec> = (0..10).map(|_| vec![0.0]).collect(); + rows[5] = vec![5.0]; + let peaks = pick_peaks(&trace(&rows), &m, 0, &PeakConfig::default()); + assert_eq!(peaks.len(), 1); + assert_eq!(peaks[0].concept, "curious"); + assert_eq!(peaks[0].token_offset, 5); + assert!(peaks[0].intensity >= 2.0); + } + + #[test] + fn contiguous_region_emits_one_peak_at_max() { + // Values 0, 0, 0, 2, 5, 3, 0, 0 — the 3-5-3 hump is one run; + // peak should land at offset 4 (the 5). + let m = manifest(&["aha"], &[63]); + let rows: Vec> = [0.0, 0.0, 0.0, 2.0, 5.0, 3.0, 0.0, 0.0] + .iter().map(|v| vec![*v]).collect(); + let peaks = pick_peaks(&trace(&rows), &m, 0, &PeakConfig::default()); + assert_eq!(peaks.len(), 1, "expected one peak for one contiguous run, got {:?}", peaks); + assert_eq!(peaks[0].token_offset, 4); + } + + #[test] + fn multiple_concepts_independent() { + let m = manifest(&["curious", "aha"], &[63]); + // curious spikes at 2, aha spikes at 7 + let rows: Vec> = (0..10).map(|i| { + let c = if i == 2 { 4.0 } else { 0.0 }; + let a = if i == 7 { 4.0 } else { 0.0 }; + vec![c, a] + }).collect(); + let peaks = pick_peaks(&trace(&rows), &m, 0, &PeakConfig::default()); + assert_eq!(peaks.len(), 2); + // Sorted by offset — curious(2) comes first, aha(7) second. + assert_eq!(peaks[0].concept, "curious"); + assert_eq!(peaks[0].token_offset, 2); + assert_eq!(peaks[1].concept, "aha"); + assert_eq!(peaks[1].token_offset, 7); + } + + #[test] + fn two_separated_runs_emit_two_peaks() { + // Longer baseline so the two spikes don't dominate the global + // mean/std — 30 tokens of zeros with two 5.0 spikes at 10 and 20. + let m = manifest(&["curious"], &[63]); + let mut rows: Vec> = (0..30).map(|_| vec![0.0]).collect(); + rows[10] = vec![5.0]; + rows[20] = vec![5.0]; + let peaks = pick_peaks(&trace(&rows), &m, 0, &PeakConfig::default()); + assert_eq!(peaks.len(), 2, "expected two peaks for two runs, got {:?}", peaks); + assert_eq!(peaks[0].token_offset, 10); + assert_eq!(peaks[1].token_offset, 20); + } + + #[test] + fn trailing_run_is_flushed() { + // Peak runs to the end of the trace — must still emit. + // Use a longer baseline so the trailing spike is genuinely + // above threshold on the global stats. + let m = manifest(&["curious"], &[63]); + let mut rows: Vec> = (0..30).map(|_| vec![0.0]).collect(); + rows[27] = vec![3.0]; + rows[28] = vec![5.0]; + rows[29] = vec![4.0]; + let peaks = pick_peaks(&trace(&rows), &m, 0, &PeakConfig::default()); + assert_eq!(peaks.len(), 1, "expected one peak for one trailing run, got {:?}", peaks); + assert_eq!(peaks[0].token_offset, 28, "peak should land at the local max of the trailing run"); + } + + #[test] + fn sub_threshold_produces_nothing() { + // All non-zero values are small; z-scores won't cross 2σ. + let m = manifest(&["curious"], &[63]); + let rows: Vec> = [0.0, 0.1, 0.0, 0.1, 0.0, 0.1, 0.0, 0.1] + .iter().map(|v| vec![*v]).collect(); + let peaks = pick_peaks(&trace(&rows), &m, 0, &PeakConfig::default()); + assert!(peaks.is_empty(), "below-threshold wiggle should produce no peaks, got {:?}", peaks); + } + + #[test] + fn layer_out_of_range_returns_empty() { + let m = manifest(&["curious"], &[63]); + let rows: Vec> = (0..10).map(|i| vec![if i == 5 { 5.0 } else { 0.0 }]).collect(); + // Trace has one layer (index 0); asking for layer 3 should see + // all-zero columns, which are flat and produce no peaks. + let peaks = pick_peaks(&trace(&rows), &m, 3, &PeakConfig::default()); + assert!(peaks.is_empty()); + } + + #[test] + fn manifest_concept_count_mismatch_is_safe() { + // Manifest says 2 concepts; each readout row only has 1 value. + // Rows should be treated as all-zero (via the len check) and + // produce no peaks without panicking. + let m = manifest(&["a", "b"], &[63]); + let rows: Vec> = (0..10).map(|_| vec![1.0]).collect(); + let peaks = pick_peaks(&trace(&rows), &m, 0, &PeakConfig::default()); + assert!(peaks.is_empty()); + } + + #[test] + fn threshold_tunable() { + // Same spike, stricter threshold — no peak. + let m = manifest(&["curious"], &[63]); + let mut rows: Vec> = (0..10).map(|_| vec![0.0]).collect(); + rows[5] = vec![5.0]; + let strict = PeakConfig { sigma_threshold: 100.0, ..PeakConfig::default() }; + let peaks = pick_peaks(&trace(&rows), &m, 0, &strict); + assert!(peaks.is_empty()); + } +} diff --git a/src/agent/tools/vision.rs b/src/agent/tools/vision.rs index 0e36888..d122384 100644 --- a/src/agent/tools/vision.rs +++ b/src/agent/tools/vision.rs @@ -57,15 +57,18 @@ async fn view_image( let (w, h) = (dim.width as u32, dim.height as u32); let mime = mime_from_extension(path); - let image_leaf = AstNode::image(bytes.clone(), mime, h, w); - let token_count = image_leaf.leaf().unwrap().tokens().saturating_sub(2); - let agent = agent.context("view_image requires agent context")?; + + // token_count is populated when the image reaches the server via + // AppendImage (the server is authoritative for the IMAGE_PAD + // count). Placeholder of 0 here until AppendImage is wired; the + // leaf's count gets rewritten from the RPC response at send time. + let image_leaf = AstNode::image(bytes.clone(), mime, h, w, 0); + let branch = AstNode::branch(Role::User, vec![image_leaf]); agent.context.lock().await.push_log(Section::Conversation, branch); - Ok(format!("loaded {} ({}, {}x{}, {} tokens)", - a.file_path, mime, w, h, token_count)) + Ok(format!("loaded {} ({}, {}x{})", a.file_path, mime, w, h)) } fn mime_from_extension(path: &std::path::Path) -> &'static str { diff --git a/src/lib.rs b/src/lib.rs index e6411e3..ccb4333 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,9 @@ macro_rules! dbglog { }}; } +// Logging (target-routed file logger) +pub mod logging; + // User interface (TUI, CLI) pub mod user; diff --git a/src/logging.rs b/src/logging.rs new file mode 100644 index 0000000..3c9d080 --- /dev/null +++ b/src/logging.rs @@ -0,0 +1,146 @@ +// logging.rs — log-crate logger that routes by target. +// +// Records with target "grpc" (or any target starting with "grpc::") go +// to ~/.consciousness/logs/daemon/grpc.log so we can tell gRPC events +// apart from the rest of consciousness's noise. Everything else goes +// to ~/.consciousness/logs/daemon/debug.log. +// +// Level threshold is taken from RUST_LOG (simple global level parse: +// "trace"/"debug"/"info"/"warn"/"error"); defaults to "info". + +use std::io::Write; +use std::path::PathBuf; +use std::sync::Mutex; + +use log::{Level, LevelFilter, Log, Metadata, Record, SetLoggerError}; + +fn logs_dir() -> PathBuf { + dirs::home_dir().unwrap_or_default().join(".consciousness/logs/daemon") +} + +struct RoutingLogger { + grpc_file: Mutex>, + debug_file: Mutex>, + level: LevelFilter, +} + +impl RoutingLogger { + fn new(level: LevelFilter) -> Self { + let dir = logs_dir(); + let _ = std::fs::create_dir_all(&dir); + let grpc = std::fs::OpenOptions::new() + .create(true).append(true) + .open(dir.join("grpc.log")).ok(); + let debug = std::fs::OpenOptions::new() + .create(true).append(true) + .open(dir.join("debug.log")).ok(); + Self { + grpc_file: Mutex::new(grpc), + debug_file: Mutex::new(debug), + level, + } + } + + fn is_grpc_target(target: &str) -> bool { + target == "grpc" || target.starts_with("grpc::") + } +} + +impl Log for RoutingLogger { + fn enabled(&self, m: &Metadata) -> bool { + // Always enable DEBUG for grpc target so the dedicated log is + // actually useful without RUST_LOG wrangling; defer to the + // configured level for everything else. + if Self::is_grpc_target(m.target()) { + return m.level() <= Level::Debug; + } + m.level() <= self.level + } + + fn log(&self, record: &Record) { + if !self.enabled(record.metadata()) { + return; + } + let line = format!( + "[{}] [{}] [{}] {}\n", + chrono::Utc::now().format("%Y-%m-%d %H:%M:%S%.3f"), + record.level(), + record.target(), + record.args(), + ); + let slot = if Self::is_grpc_target(record.target()) { + &self.grpc_file + } else { + &self.debug_file + }; + if let Ok(mut guard) = slot.lock() { + if let Some(ref mut f) = *guard { + let _ = f.write_all(line.as_bytes()); + } + } + } + + fn flush(&self) { + for slot in [&self.grpc_file, &self.debug_file] { + if let Ok(mut g) = slot.lock() { + if let Some(ref mut f) = *g { + let _ = f.flush(); + } + } + } + } +} + +fn parse_level_from_env() -> LevelFilter { + let raw = std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()); + // Parse a plain level word; if it's the module=level form, we take + // the first level we find. + let token = raw.split(',').next().unwrap_or("info"); + let level_word = token.rsplit_once('=').map(|(_, v)| v).unwrap_or(token); + match level_word.trim().to_lowercase().as_str() { + "trace" => LevelFilter::Trace, + "debug" => LevelFilter::Debug, + "info" => LevelFilter::Info, + "warn" => LevelFilter::Warn, + "error" => LevelFilter::Error, + "off" => LevelFilter::Off, + _ => LevelFilter::Info, + } +} + +/// Install the routing logger. Safe to call at most once — subsequent +/// calls return an error but are otherwise no-ops. +pub fn init() -> Result<(), SetLoggerError> { + let level = parse_level_from_env(); + let logger = Box::new(RoutingLogger::new(level)); + log::set_boxed_logger(logger)?; + // Always let DEBUG records through globally so the grpc log can + // capture them (the logger itself filters non-grpc targets by + // `level`). The cost is that log::debug! call-sites below `level` + // in other modules still do their arg formatting before being + // dropped at the logger; acceptable for a debug tool. + log::set_max_level(LevelFilter::Debug.max(level)); + // Mark the file with a session boundary so it's easy to see where a + // restart happened. + log::info!( + "===== consciousness logger init (level={}, pid={}) =====", + level, std::process::id(), + ); + log::info!(target: "grpc", + "===== grpc log init (level={}, pid={}) =====", + level, std::process::id(), + ); + Ok(()) +} + +/// Consumer of &Level so the type is used when only some callers want it. +#[allow(dead_code)] +pub fn current_level() -> Level { + match log::max_level() { + LevelFilter::Trace => Level::Trace, + LevelFilter::Debug => Level::Debug, + LevelFilter::Info | LevelFilter::Off => Level::Info, + LevelFilter::Warn => Level::Warn, + LevelFilter::Error => Level::Error, + } +} diff --git a/src/subconscious/generate.rs b/src/subconscious/generate.rs index 8d75f1b..757e08a 100644 --- a/src/subconscious/generate.rs +++ b/src/subconscious/generate.rs @@ -4,6 +4,8 @@ // given a context prefix and a skip predicate, generate what the model // would say as the next assistant turn. +use std::sync::Arc; + use crate::agent::api::{ApiClient, SamplingParams, StreamToken}; use crate::agent::context::{AstNode, ContextState}; use crate::agent::tokenizer; @@ -13,6 +15,9 @@ use crate::agent::tokenizer; /// assembly. The model is whichever `client` points at — the default /// runtime client for memory-ablation alternates, a test-model client /// for F7 comparison. +/// +/// Uses a fresh ephemeral gRPC session (no cross-call KV reuse): one +/// Open / Append / Generate round-trip, then the session is dropped. pub async fn gen_continuation( context: &ContextState, entry_idx: usize, @@ -31,7 +36,13 @@ where F: FnMut(&AstNode) -> bool, top_p: 0.95, top_k: 20, }; - let (mut rx, _guard) = client.stream_completion_mm(&prompt, &images, sampling, Some(-5)); + + // Ephemeral per-call session — opens on first touch, drops when + // `_guard` drops at function end. + let session_lock = Arc::new(crate::Mutex::new(None)); + let (mut rx, _guard) = client.stream_session_mm( + session_lock, &prompt, &images, sampling, Some(-5), + ); let mut tokens = Vec::new(); while let Some(tok) = rx.recv().await { diff --git a/src/user/mod.rs b/src/user/mod.rs index 04e895b..80754a1 100644 --- a/src/user/mod.rs +++ b/src/user/mod.rs @@ -756,6 +756,11 @@ fn restore_stderr(original_fd: std::os::fd::RawFd) { #[tokio::main] pub async fn main() { + // Install target-routed file logger: `target: "grpc"` records go to + // ~/.consciousness/logs/daemon/grpc.log, everything else to debug.log. + // Level from RUST_LOG, defaulting to info. + let _ = crate::logging::init(); + // Reap channel-daemon zombies via a SIGCHLD handler that only touches // PIDs listed in channels_dir(). Avoids SIGCHLD=SIG_IGN, which would // break tokio::process::Command::wait() (kernel auto-reap → ECHILD).