Kill reqwest — minimal HTTP client on raw hyper + tokio-rustls
New src/agent/api/http.rs: ~240 lines, supports GET/POST, JSON/form bodies, SSE streaming via chunk(), TLS via rustls. No tracing dep. Removes reqwest from the main crate and telegram channel crate. Cargo.lock drops ~900 lines of transitive dependencies. tracing now only pulled in by tui-markdown. Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
a421c3c9f3
commit
1cf4f504c0
9 changed files with 360 additions and 915 deletions
826
Cargo.lock
generated
826
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
38
Cargo.toml
38
Cargo.toml
|
|
@ -19,16 +19,23 @@ version.workspace = true
|
||||||
edition.workspace = true
|
edition.workspace = true
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
capnp = "0.25"
|
figment = { version = "0.10", features = ["env"] }
|
||||||
|
env_logger = "0.11"
|
||||||
uuid = { version = "1", features = ["v4"] }
|
uuid = { version = "1", features = ["v4"] }
|
||||||
|
clap = { version = "4", features = ["derive"] }
|
||||||
serde = { version = "1", features = ["derive"] }
|
serde = { version = "1", features = ["derive"] }
|
||||||
serde_json = "1"
|
serde_json = "1"
|
||||||
|
crossterm = { version = "0.29", features = ["event-stream"] }
|
||||||
|
ratatui = { version = "0.30", features = ["unstable-rendered-line-info"] }
|
||||||
|
tui-markdown = "0.3"
|
||||||
|
tui-textarea = { version = "0.10.2", package = "tui-textarea-2" }
|
||||||
json5 = "1.3"
|
json5 = "1.3"
|
||||||
bincode = "1"
|
bincode = "1"
|
||||||
regex = "1"
|
regex = "1"
|
||||||
|
glob = "0.3"
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
clap = { version = "4", features = ["derive"] }
|
|
||||||
libc = "0.2"
|
libc = "0.2"
|
||||||
|
redb = "2"
|
||||||
rkyv = { version = "0.7", features = ["validation", "std"] }
|
rkyv = { version = "0.7", features = ["validation", "std"] }
|
||||||
memchr = "2"
|
memchr = "2"
|
||||||
memmap2 = "0.9"
|
memmap2 = "0.9"
|
||||||
|
|
@ -37,25 +44,26 @@ peg = "0.8"
|
||||||
paste = "1"
|
paste = "1"
|
||||||
jobkit = { git = "https://evilpiepirate.org/git/jobkit.git", features = ["daemon"] }
|
jobkit = { git = "https://evilpiepirate.org/git/jobkit.git", features = ["daemon"] }
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
reqwest = { version = "0.13", default-features = false, features = ["json", "rustls"] }
|
tokio-util = { version = "0.7", features = ["compat"] }
|
||||||
glob = "0.3"
|
tokio-scoped = "0.2.0"
|
||||||
|
futures = "0.3"
|
||||||
|
capnp = "0.25"
|
||||||
|
capnp-rpc = "0.25"
|
||||||
|
http = "1"
|
||||||
|
hyper = { version = "1", features = ["client", "http1"] }
|
||||||
|
hyper-util = { version = "0.1", features = ["tokio"], default-features = false }
|
||||||
|
http-body-util = "0.1"
|
||||||
|
rustls = "0.23"
|
||||||
|
tokio-rustls = "0.26"
|
||||||
|
rustls-native-certs = "0.8"
|
||||||
|
serde_urlencoded = "0.7"
|
||||||
|
bytes = "1"
|
||||||
anyhow = "1"
|
anyhow = "1"
|
||||||
base64 = "0.22"
|
base64 = "0.22"
|
||||||
dirs = "6"
|
dirs = "6"
|
||||||
futures = "0.3"
|
|
||||||
tiktoken-rs = "0.9.1"
|
tiktoken-rs = "0.9.1"
|
||||||
figment = { version = "0.10", features = ["env"] }
|
|
||||||
tui-markdown = "0.3"
|
|
||||||
tui-textarea = { version = "0.10.2", package = "tui-textarea-2" }
|
|
||||||
redb = "2"
|
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
ratatui = { version = "0.30", features = ["unstable-rendered-line-info"] }
|
|
||||||
crossterm = { version = "0.29", features = ["event-stream"] }
|
|
||||||
skillratings = "0.28"
|
skillratings = "0.28"
|
||||||
capnp-rpc = "0.25"
|
|
||||||
tokio-util = { version = "0.7", features = ["compat"] }
|
|
||||||
env_logger = "0.11"
|
|
||||||
tokio-scoped = "0.2.0"
|
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
capnpc = "0.25"
|
capnpc = "0.25"
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,6 @@ capnp-rpc = "0.25"
|
||||||
dirs = "6"
|
dirs = "6"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
poc-memory = { path = "../.." }
|
poc-memory = { path = "../.." }
|
||||||
reqwest = { version = "0.13", default-features = false, features = ["json", "form", "rustls"] }
|
|
||||||
serde = { version = "1", features = ["derive"] }
|
serde = { version = "1", features = ["derive"] }
|
||||||
serde_json = "1"
|
serde_json = "1"
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
|
|
||||||
|
|
@ -64,7 +64,7 @@ struct State {
|
||||||
/// Telegram API offset
|
/// Telegram API offset
|
||||||
last_offset: i64,
|
last_offset: i64,
|
||||||
connected: bool,
|
connected: bool,
|
||||||
client: reqwest::Client,
|
client: poc_memory::agent::api::http::HttpClient,
|
||||||
/// Registered notification callbacks
|
/// Registered notification callbacks
|
||||||
subscribers: Vec<channel_client::Client>,
|
subscribers: Vec<channel_client::Client>,
|
||||||
}
|
}
|
||||||
|
|
@ -79,7 +79,7 @@ impl State {
|
||||||
channel_logs: std::collections::BTreeMap::new(),
|
channel_logs: std::collections::BTreeMap::new(),
|
||||||
last_offset,
|
last_offset,
|
||||||
connected: false,
|
connected: false,
|
||||||
client: reqwest::Client::new(),
|
client: poc_memory::agent::api::http::HttpClient::new(),
|
||||||
subscribers: Vec::new(),
|
subscribers: Vec::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -172,9 +172,7 @@ async fn poll_once(state: &SharedState) -> Result<(), Box<dyn std::error::Error>
|
||||||
};
|
};
|
||||||
|
|
||||||
let client = state.borrow().client.clone();
|
let client = state.borrow().client.clone();
|
||||||
let resp: serde_json::Value = client.get(&url)
|
let resp: serde_json::Value = client.get(&url).await?.json().await?;
|
||||||
.timeout(std::time::Duration::from_secs(35))
|
|
||||||
.send().await?.json().await?;
|
|
||||||
|
|
||||||
if !state.borrow().connected {
|
if !state.borrow().connected {
|
||||||
state.borrow_mut().connected = true;
|
state.borrow_mut().connected = true;
|
||||||
|
|
@ -199,9 +197,10 @@ async fn poll_once(state: &SharedState) -> Result<(), Box<dyn std::error::Error>
|
||||||
let msg_chat_id = msg["chat"]["id"].as_i64().unwrap_or(0);
|
let msg_chat_id = msg["chat"]["id"].as_i64().unwrap_or(0);
|
||||||
if msg_chat_id != chat_id {
|
if msg_chat_id != chat_id {
|
||||||
let reject_url = format!("https://api.telegram.org/bot{token}/sendMessage");
|
let reject_url = format!("https://api.telegram.org/bot{token}/sendMessage");
|
||||||
let _ = client.post(&reject_url)
|
let _ = client.post_form(&reject_url, &[
|
||||||
.form(&[("chat_id", msg_chat_id.to_string()), ("text", "This is a private bot.".to_string())])
|
("chat_id", &msg_chat_id.to_string()),
|
||||||
.send().await;
|
("text", "This is a private bot."),
|
||||||
|
]).await;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -273,9 +272,10 @@ impl channel_server::Server for ChannelServerImpl {
|
||||||
let s = state.borrow();
|
let s = state.borrow();
|
||||||
(s.api_url("sendMessage"), s.client.clone(), s.config.chat_id)
|
(s.api_url("sendMessage"), s.client.clone(), s.config.chat_id)
|
||||||
};
|
};
|
||||||
let _ = client.post(&url)
|
let _ = client.post_form(&url, &[
|
||||||
.form(&[("chat_id", &chat_id.to_string()), ("text", &message)])
|
("chat_id", &chat_id.to_string()),
|
||||||
.send().await;
|
("text", &message),
|
||||||
|
]).await;
|
||||||
|
|
||||||
let ts = now() as u64;
|
let ts = now() as u64;
|
||||||
append_history(&format!("{ts} [agent] {message}"));
|
append_history(&format!("{ts} [agent] {message}"));
|
||||||
|
|
|
||||||
239
src/agent/api/http.rs
Normal file
239
src/agent/api/http.rs
Normal file
|
|
@ -0,0 +1,239 @@
|
||||||
|
// http.rs — Minimal async HTTP client
|
||||||
|
//
|
||||||
|
// Replaces reqwest with direct hyper + rustls. No tracing dependency.
|
||||||
|
// Supports: GET/POST, JSON/form bodies, streaming responses, TLS.
|
||||||
|
|
||||||
|
use anyhow::{Context, Result};
|
||||||
|
use bytes::Bytes;
|
||||||
|
use http_body_util::{BodyExt, Full, Empty};
|
||||||
|
use hyper::body::Incoming;
|
||||||
|
use hyper::{Request, StatusCode};
|
||||||
|
use hyper_util::rt::TokioIo;
|
||||||
|
use rustls::ClientConfig;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::net::TcpStream;
|
||||||
|
|
||||||
|
/// Lightweight async HTTP client with connection pooling via keep-alive.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct HttpClient {
|
||||||
|
tls: Arc<ClientConfig>,
|
||||||
|
connect_timeout: Duration,
|
||||||
|
request_timeout: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An in-flight response — provides status, headers, and body access.
|
||||||
|
pub struct HttpResponse {
|
||||||
|
parts: http::response::Parts,
|
||||||
|
body: Incoming,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HttpClient {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self::builder().build()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn builder() -> HttpClientBuilder {
|
||||||
|
HttpClientBuilder {
|
||||||
|
connect_timeout: Duration::from_secs(30),
|
||||||
|
request_timeout: Duration::from_secs(600),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a GET request.
|
||||||
|
pub async fn get(&self, url: &str) -> Result<HttpResponse> {
|
||||||
|
self.get_with_headers(url, &[]).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a GET request with custom headers.
|
||||||
|
pub async fn get_with_headers(&self, url: &str, headers: &[(&str, &str)]) -> Result<HttpResponse> {
|
||||||
|
let mut builder = Request::get(url);
|
||||||
|
for &(k, v) in headers {
|
||||||
|
builder = builder.header(k, v);
|
||||||
|
}
|
||||||
|
let req = builder.body(Empty::<Bytes>::new())
|
||||||
|
.context("building GET request")?;
|
||||||
|
self.send_empty(req).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a POST request with a JSON body.
|
||||||
|
pub async fn post_json(&self, url: &str, body: &impl serde::Serialize) -> Result<HttpResponse> {
|
||||||
|
let json = serde_json::to_vec(body).context("serializing JSON body")?;
|
||||||
|
let req = Request::post(url)
|
||||||
|
.header("content-type", "application/json")
|
||||||
|
.body(Full::new(Bytes::from(json)))
|
||||||
|
.context("building POST request")?;
|
||||||
|
self.send_full(req).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a POST request with URL-encoded form data.
|
||||||
|
pub async fn post_form(&self, url: &str, params: &[(&str, &str)]) -> Result<HttpResponse> {
|
||||||
|
let body = serde_urlencoded::to_string(params).context("encoding form")?;
|
||||||
|
let req = Request::post(url)
|
||||||
|
.header("content-type", "application/x-www-form-urlencoded")
|
||||||
|
.body(Full::new(Bytes::from(body)))
|
||||||
|
.context("building form POST")?;
|
||||||
|
self.send_full(req).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a request with headers pre-set. JSON body.
|
||||||
|
pub async fn send_json(
|
||||||
|
&self,
|
||||||
|
method: &str,
|
||||||
|
url: &str,
|
||||||
|
headers: &[(&str, &str)],
|
||||||
|
body: &impl serde::Serialize,
|
||||||
|
) -> Result<HttpResponse> {
|
||||||
|
let json = serde_json::to_vec(body).context("serializing JSON body")?;
|
||||||
|
let mut builder = Request::builder()
|
||||||
|
.method(method)
|
||||||
|
.uri(url)
|
||||||
|
.header("content-type", "application/json");
|
||||||
|
for &(k, v) in headers {
|
||||||
|
builder = builder.header(k, v);
|
||||||
|
}
|
||||||
|
let req = builder.body(Full::new(Bytes::from(json)))
|
||||||
|
.context("building request")?;
|
||||||
|
self.send_full(req).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn connect(&self, url: &str) -> Result<(bool, TokioIo<Box<dyn IoStream>>)> {
|
||||||
|
let uri: http::Uri = url.parse().context("parsing URL")?;
|
||||||
|
let host = uri.host().context("URL has no host")?.to_string();
|
||||||
|
let is_https = uri.scheme_str() == Some("https");
|
||||||
|
let port = uri.port_u16().unwrap_or(if is_https { 443 } else { 80 });
|
||||||
|
|
||||||
|
let tcp = tokio::time::timeout(
|
||||||
|
self.connect_timeout,
|
||||||
|
TcpStream::connect(format!("{}:{}", host, port)),
|
||||||
|
).await
|
||||||
|
.context("connect timeout")?
|
||||||
|
.context("TCP connect")?;
|
||||||
|
|
||||||
|
if is_https {
|
||||||
|
let server_name = rustls::pki_types::ServerName::try_from(host.clone())
|
||||||
|
.map_err(|e| anyhow::anyhow!("invalid server name: {}", e))?;
|
||||||
|
let connector = tokio_rustls::TlsConnector::from(self.tls.clone());
|
||||||
|
let tls = connector.connect(server_name.to_owned(), tcp).await
|
||||||
|
.context("TLS handshake")?;
|
||||||
|
Ok((is_https, TokioIo::new(Box::new(tls) as Box<dyn IoStream>)))
|
||||||
|
} else {
|
||||||
|
Ok((is_https, TokioIo::new(Box::new(tcp) as Box<dyn IoStream>)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_full(&self, req: Request<Full<Bytes>>) -> Result<HttpResponse> {
|
||||||
|
let url = req.uri().to_string();
|
||||||
|
let (_is_https, io) = self.connect(&url).await?;
|
||||||
|
|
||||||
|
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await
|
||||||
|
.context("HTTP handshake")?;
|
||||||
|
tokio::spawn(conn);
|
||||||
|
|
||||||
|
let resp = tokio::time::timeout(
|
||||||
|
self.request_timeout,
|
||||||
|
sender.send_request(req),
|
||||||
|
).await
|
||||||
|
.context("request timeout")?
|
||||||
|
.context("sending request")?;
|
||||||
|
|
||||||
|
let (parts, body) = resp.into_parts();
|
||||||
|
Ok(HttpResponse { parts, body })
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_empty(&self, req: Request<Empty<Bytes>>) -> Result<HttpResponse> {
|
||||||
|
let url = req.uri().to_string();
|
||||||
|
let (_is_https, io) = self.connect(&url).await?;
|
||||||
|
|
||||||
|
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await
|
||||||
|
.context("HTTP handshake")?;
|
||||||
|
tokio::spawn(conn);
|
||||||
|
|
||||||
|
let resp = tokio::time::timeout(
|
||||||
|
self.request_timeout,
|
||||||
|
sender.send_request(req),
|
||||||
|
).await
|
||||||
|
.context("request timeout")?
|
||||||
|
.context("sending request")?;
|
||||||
|
|
||||||
|
let (parts, body) = resp.into_parts();
|
||||||
|
Ok(HttpResponse { parts, body })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HttpResponse {
|
||||||
|
pub fn status(&self) -> StatusCode {
|
||||||
|
self.parts.status
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn header(&self, name: &str) -> Option<&str> {
|
||||||
|
self.parts.headers.get(name)?.to_str().ok()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read the entire body as text.
|
||||||
|
pub async fn text(self) -> Result<String> {
|
||||||
|
let bytes = self.body.collect().await
|
||||||
|
.context("reading response body")?
|
||||||
|
.to_bytes();
|
||||||
|
Ok(String::from_utf8_lossy(&bytes).into_owned())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read the entire body and deserialize as JSON.
|
||||||
|
pub async fn json<T: serde::de::DeserializeOwned>(self) -> Result<T> {
|
||||||
|
let bytes = self.body.collect().await
|
||||||
|
.context("reading response body")?
|
||||||
|
.to_bytes();
|
||||||
|
serde_json::from_slice(&bytes).context("deserializing JSON response")
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read the next chunk from the response body (for SSE streaming).
|
||||||
|
/// Returns None when the body is complete.
|
||||||
|
pub async fn chunk(&mut self) -> Result<Option<Bytes>> {
|
||||||
|
match self.body.frame().await {
|
||||||
|
Some(Ok(frame)) => Ok(frame.into_data().ok()),
|
||||||
|
Some(Err(e)) => Err(anyhow::anyhow!("body read error: {}", e)),
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct HttpClientBuilder {
|
||||||
|
connect_timeout: Duration,
|
||||||
|
request_timeout: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HttpClientBuilder {
|
||||||
|
pub fn connect_timeout(mut self, d: Duration) -> Self {
|
||||||
|
self.connect_timeout = d;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn timeout(mut self, d: Duration) -> Self {
|
||||||
|
self.request_timeout = d;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn build(self) -> HttpClient {
|
||||||
|
let certs = rustls_native_certs::load_native_certs()
|
||||||
|
.certs.into_iter()
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let mut root_store = rustls::RootCertStore::empty();
|
||||||
|
for cert in certs {
|
||||||
|
root_store.add(cert).ok();
|
||||||
|
}
|
||||||
|
let tls = Arc::new(
|
||||||
|
ClientConfig::builder()
|
||||||
|
.with_root_certificates(root_store)
|
||||||
|
.with_no_client_auth()
|
||||||
|
);
|
||||||
|
HttpClient {
|
||||||
|
tls,
|
||||||
|
connect_timeout: self.connect_timeout,
|
||||||
|
request_timeout: self.request_timeout,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Trait alias for streams that work with hyper's IO adapter.
|
||||||
|
trait IoStream: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static {}
|
||||||
|
impl<T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static> IoStream for T {}
|
||||||
|
|
@ -6,6 +6,7 @@
|
||||||
// Diagnostics: anomalies always logged to debug panel.
|
// Diagnostics: anomalies always logged to debug panel.
|
||||||
// Set POC_DEBUG=1 for verbose per-turn logging.
|
// Set POC_DEBUG=1 for verbose per-turn logging.
|
||||||
|
|
||||||
|
pub mod http;
|
||||||
pub mod parsing;
|
pub mod parsing;
|
||||||
pub mod types;
|
pub mod types;
|
||||||
mod openai;
|
mod openai;
|
||||||
|
|
@ -13,9 +14,10 @@ mod openai;
|
||||||
pub use types::*;
|
pub use types::*;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use reqwest::Client;
|
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use self::http::{HttpClient, HttpResponse};
|
||||||
|
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use crate::agent::tools::{self as agent_tools, summarize_args, ActiveToolCall};
|
use crate::agent::tools::{self as agent_tools, summarize_args, ActiveToolCall};
|
||||||
|
|
@ -77,7 +79,7 @@ pub enum StreamEvent {
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct ApiClient {
|
pub struct ApiClient {
|
||||||
client: Client,
|
client: HttpClient,
|
||||||
api_key: String,
|
api_key: String,
|
||||||
pub model: String,
|
pub model: String,
|
||||||
base_url: String,
|
base_url: String,
|
||||||
|
|
@ -85,11 +87,10 @@ pub struct ApiClient {
|
||||||
|
|
||||||
impl ApiClient {
|
impl ApiClient {
|
||||||
pub fn new(base_url: &str, api_key: &str, model: &str) -> Self {
|
pub fn new(base_url: &str, api_key: &str, model: &str) -> Self {
|
||||||
let client = Client::builder()
|
let client = HttpClient::builder()
|
||||||
.connect_timeout(Duration::from_secs(30))
|
.connect_timeout(Duration::from_secs(30))
|
||||||
.timeout(Duration::from_secs(600))
|
.timeout(Duration::from_secs(600))
|
||||||
.build()
|
.build();
|
||||||
.expect("failed to build HTTP client");
|
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
client,
|
client,
|
||||||
|
|
@ -198,14 +199,14 @@ impl ApiClient {
|
||||||
|
|
||||||
/// Send an HTTP request and check for errors. Shared by both backends.
|
/// Send an HTTP request and check for errors. Shared by both backends.
|
||||||
pub(crate) async fn send_and_check(
|
pub(crate) async fn send_and_check(
|
||||||
client: &Client,
|
client: &HttpClient,
|
||||||
url: &str,
|
url: &str,
|
||||||
body: &impl serde::Serialize,
|
body: &impl serde::Serialize,
|
||||||
auth_header: (&str, &str),
|
auth_header: (&str, &str),
|
||||||
extra_headers: &[(&str, &str)],
|
extra_headers: &[(&str, &str)],
|
||||||
debug_label: &str,
|
debug_label: &str,
|
||||||
request_json: Option<&str>,
|
request_json: Option<&str>,
|
||||||
) -> Result<reqwest::Response> {
|
) -> Result<HttpResponse> {
|
||||||
let debug = std::env::var("POC_DEBUG").is_ok();
|
let debug = std::env::var("POC_DEBUG").is_ok();
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
|
|
@ -219,49 +220,36 @@ pub(crate) async fn send_and_check(
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut req = client
|
let mut headers: Vec<(&str, &str)> = Vec::with_capacity(extra_headers.len() + 1);
|
||||||
.post(url)
|
headers.push(auth_header);
|
||||||
.header(auth_header.0, auth_header.1)
|
headers.extend_from_slice(extra_headers);
|
||||||
.header("Content-Type", "application/json");
|
|
||||||
|
|
||||||
for (name, value) in extra_headers {
|
let response = client
|
||||||
req = req.header(*name, *value);
|
.send_json("POST", url, &headers, body)
|
||||||
}
|
|
||||||
|
|
||||||
let response = req
|
|
||||||
.json(body)
|
|
||||||
.send()
|
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
let cause = if e.is_connect() {
|
let msg = e.to_string();
|
||||||
|
let cause = if msg.contains("connect timeout") || msg.contains("TCP connect") {
|
||||||
"connection refused"
|
"connection refused"
|
||||||
} else if e.is_timeout() {
|
} else if msg.contains("request timeout") {
|
||||||
"request timed out"
|
"request timed out"
|
||||||
} else if e.is_request() {
|
|
||||||
"request error"
|
|
||||||
} else {
|
} else {
|
||||||
"unknown"
|
"request error"
|
||||||
};
|
};
|
||||||
anyhow::anyhow!("{} ({}): {:?}", cause, url, e.without_url())
|
anyhow::anyhow!("{} ({}): {}", cause, url, msg)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let status = response.status();
|
let status = response.status();
|
||||||
let elapsed = start.elapsed();
|
let elapsed = start.elapsed();
|
||||||
|
|
||||||
if debug {
|
if debug {
|
||||||
// Log interesting response headers
|
|
||||||
let headers = response.headers();
|
|
||||||
for name in [
|
for name in [
|
||||||
"x-ratelimit-remaining",
|
"x-ratelimit-remaining",
|
||||||
"x-ratelimit-limit",
|
"x-ratelimit-limit",
|
||||||
"x-request-id",
|
"x-request-id",
|
||||||
] {
|
] {
|
||||||
if let Some(val) = headers.get(name) {
|
if let Some(val) = response.header(name) {
|
||||||
dbglog!(
|
dbglog!("header {}: {}", name, val);
|
||||||
"header {}: {}",
|
|
||||||
name,
|
|
||||||
val.to_str().unwrap_or("?")
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -357,7 +345,7 @@ impl SseReader {
|
||||||
/// Ok(None) when the stream ends or [DONE] is received.
|
/// Ok(None) when the stream ends or [DONE] is received.
|
||||||
pub(crate) async fn next_event(
|
pub(crate) async fn next_event(
|
||||||
&mut self,
|
&mut self,
|
||||||
response: &mut reqwest::Response,
|
response: &mut HttpResponse,
|
||||||
) -> Result<Option<serde_json::Value>> {
|
) -> Result<Option<serde_json::Value>> {
|
||||||
loop {
|
loop {
|
||||||
// Drain complete lines from the buffer before reading more chunks
|
// Drain complete lines from the buffer before reading more chunks
|
||||||
|
|
|
||||||
|
|
@ -5,9 +5,9 @@
|
||||||
// Also used for local models (Qwen, llama) via compatible servers.
|
// Also used for local models (Qwen, llama) via compatible servers.
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use reqwest::Client;
|
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
|
use super::http::HttpClient;
|
||||||
use super::types::*;
|
use super::types::*;
|
||||||
use super::StreamEvent;
|
use super::StreamEvent;
|
||||||
|
|
||||||
|
|
@ -15,7 +15,7 @@ use super::StreamEvent;
|
||||||
/// parsed StreamEvents through the channel. The caller (runner)
|
/// parsed StreamEvents through the channel. The caller (runner)
|
||||||
/// handles routing to the UI.
|
/// handles routing to the UI.
|
||||||
pub(super) async fn stream_events(
|
pub(super) async fn stream_events(
|
||||||
client: &Client,
|
client: &HttpClient,
|
||||||
base_url: &str,
|
base_url: &str,
|
||||||
api_key: &str,
|
api_key: &str,
|
||||||
model: &str,
|
model: &str,
|
||||||
|
|
|
||||||
|
|
@ -27,11 +27,10 @@ async fn web_fetch(args: &serde_json::Value) -> Result<String> {
|
||||||
let a: FetchArgs = serde_json::from_value(args.clone())
|
let a: FetchArgs = serde_json::from_value(args.clone())
|
||||||
.context("invalid web_fetch arguments")?;
|
.context("invalid web_fetch arguments")?;
|
||||||
|
|
||||||
let client = http_client()?;
|
let client = http_client();
|
||||||
let response = client.get(&a.url)
|
let response = client.get_with_headers(&a.url, &[
|
||||||
.header("User-Agent", "consciousness/0.3")
|
("user-agent", "consciousness/0.3"),
|
||||||
.send()
|
]).await
|
||||||
.await
|
|
||||||
.with_context(|| format!("failed to fetch {}", a.url))?;
|
.with_context(|| format!("failed to fetch {}", a.url))?;
|
||||||
|
|
||||||
let status = response.status();
|
let status = response.status();
|
||||||
|
|
@ -61,7 +60,7 @@ async fn web_search(args: &serde_json::Value) -> Result<String> {
|
||||||
.context("invalid web_search arguments")?;
|
.context("invalid web_search arguments")?;
|
||||||
|
|
||||||
// Use DuckDuckGo HTML search — no API key needed
|
// Use DuckDuckGo HTML search — no API key needed
|
||||||
let client = http_client()?;
|
let client = http_client();
|
||||||
let encoded: String = a.query.chars().map(|c| {
|
let encoded: String = a.query.chars().map(|c| {
|
||||||
if c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.' {
|
if c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.' {
|
||||||
c.to_string()
|
c.to_string()
|
||||||
|
|
@ -72,10 +71,9 @@ async fn web_search(args: &serde_json::Value) -> Result<String> {
|
||||||
}
|
}
|
||||||
}).collect();
|
}).collect();
|
||||||
let url = format!("https://html.duckduckgo.com/html/?q={}", encoded);
|
let url = format!("https://html.duckduckgo.com/html/?q={}", encoded);
|
||||||
let response = client.get(&url)
|
let response = client.get_with_headers(&url, &[
|
||||||
.header("User-Agent", "consciousness/0.3")
|
("user-agent", "consciousness/0.3"),
|
||||||
.send()
|
]).await
|
||||||
.await
|
|
||||||
.context("search request failed")?;
|
.context("search request failed")?;
|
||||||
|
|
||||||
let body = response.text().await
|
let body = response.text().await
|
||||||
|
|
@ -86,20 +84,16 @@ async fn web_search(args: &serde_json::Value) -> Result<String> {
|
||||||
for chunk in body.split("class=\"result__body\"") {
|
for chunk in body.split("class=\"result__body\"") {
|
||||||
if results.len() >= a.num_results { break; }
|
if results.len() >= a.num_results { break; }
|
||||||
if results.is_empty() && !chunk.contains("result__title") {
|
if results.is_empty() && !chunk.contains("result__title") {
|
||||||
// Skip the first split (before any results)
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract title
|
|
||||||
let title = extract_between(chunk, "class=\"result__a\"", "</a>")
|
let title = extract_between(chunk, "class=\"result__a\"", "</a>")
|
||||||
.map(strip_tags)
|
.map(strip_tags)
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
// Extract URL
|
|
||||||
let href = extract_between(chunk, "href=\"", "\"")
|
let href = extract_between(chunk, "href=\"", "\"")
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
// Extract snippet
|
|
||||||
let snippet = extract_between(chunk, "class=\"result__snippet\"", "</a>")
|
let snippet = extract_between(chunk, "class=\"result__snippet\"", "</a>")
|
||||||
.map(strip_tags)
|
.map(strip_tags)
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
@ -118,30 +112,37 @@ async fn web_search(args: &serde_json::Value) -> Result<String> {
|
||||||
|
|
||||||
// ── Helpers ─────────────────────────────────────────────────────
|
// ── Helpers ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
fn http_client() -> Result<reqwest::Client> {
|
fn http_client() -> crate::agent::api::http::HttpClient {
|
||||||
reqwest::Client::builder()
|
crate::agent::api::http::HttpClient::builder()
|
||||||
.timeout(std::time::Duration::from_secs(30))
|
.timeout(std::time::Duration::from_secs(30))
|
||||||
.build()
|
.build()
|
||||||
.context("failed to build HTTP client")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn extract_between<'a>(text: &'a str, start: &str, end: &str) -> Option<&'a str> {
|
fn extract_between<'a>(text: &'a str, start: &str, end: &str) -> Option<&'a str> {
|
||||||
let start_idx = text.find(start)? + start.len();
|
let start_idx = text.find(start)? + start.len();
|
||||||
// Skip past the closing > of the start tag
|
// Skip past the closing > of the start tag
|
||||||
let rest = &text[start_idx..];
|
let rest = &text[start_idx..];
|
||||||
let tag_end = rest.find('>')?;
|
let gt = rest.find('>')?;
|
||||||
let rest = &rest[tag_end + 1..];
|
let content_start = start_idx + gt + 1;
|
||||||
let end_idx = rest.find(end)?;
|
let content = &text[content_start..];
|
||||||
Some(&rest[..end_idx])
|
let end_idx = content.find(end)?;
|
||||||
|
Some(&content[..end_idx])
|
||||||
}
|
}
|
||||||
|
|
||||||
fn strip_tags(s: &str) -> String {
|
fn strip_tags(html: &str) -> String {
|
||||||
let mut out = String::new();
|
let mut out = String::new();
|
||||||
let mut in_tag = false;
|
let mut in_tag = false;
|
||||||
for ch in s.chars() {
|
for ch in html.chars() {
|
||||||
if ch == '<' { in_tag = true; }
|
match ch {
|
||||||
else if ch == '>' { in_tag = false; }
|
'<' => in_tag = true,
|
||||||
else if !in_tag { out.push(ch); }
|
'>' => in_tag = false,
|
||||||
|
_ if !in_tag => out.push(ch),
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
out
|
out.replace("&", "&")
|
||||||
|
.replace("<", "<")
|
||||||
|
.replace(">", ">")
|
||||||
|
.replace(""", "\"")
|
||||||
|
.replace("'", "'")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -76,35 +76,29 @@ struct ScoreResponse {
|
||||||
scores: Vec<ScoreResult>,
|
scores: Vec<ScoreResult>,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn http_client() -> reqwest::Client {
|
fn http_client() -> crate::agent::api::http::HttpClient {
|
||||||
reqwest::Client::builder()
|
crate::agent::api::http::HttpClient::builder()
|
||||||
.timeout(SCORE_TIMEOUT)
|
.timeout(SCORE_TIMEOUT)
|
||||||
.pool_max_idle_per_host(2)
|
|
||||||
.build()
|
.build()
|
||||||
.unwrap_or_default()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn call_score(
|
async fn call_score(
|
||||||
http: &reqwest::Client,
|
http: &crate::agent::api::http::HttpClient,
|
||||||
client: &ApiClient,
|
client: &ApiClient,
|
||||||
messages: &[serde_json::Value],
|
messages: &[serde_json::Value],
|
||||||
) -> anyhow::Result<Vec<ScoreResult>> {
|
) -> anyhow::Result<Vec<ScoreResult>> {
|
||||||
|
let url = format!("{}/score", client.base_url());
|
||||||
|
let auth = format!("Bearer {}", client.api_key());
|
||||||
|
let body = serde_json::json!({
|
||||||
|
"model": client.model,
|
||||||
|
"messages": messages,
|
||||||
|
"logprobs": 1,
|
||||||
|
});
|
||||||
let response = http
|
let response = http
|
||||||
.post(format!("{}/score", client.base_url()))
|
.send_json("POST", &url, &[
|
||||||
.header("Content-Type", "application/json")
|
("authorization", &auth),
|
||||||
.header("Authorization", format!("Bearer {}", client.api_key()))
|
], &body)
|
||||||
.json(&serde_json::json!({
|
.await?;
|
||||||
"model": client.model,
|
|
||||||
"messages": messages,
|
|
||||||
"logprobs": 1,
|
|
||||||
}))
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.map_err(|e| if e.is_timeout() {
|
|
||||||
anyhow::anyhow!("score request timed out after {}s", SCORE_TIMEOUT.as_secs())
|
|
||||||
} else {
|
|
||||||
anyhow::anyhow!("score request failed: {}", e)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let status = response.status();
|
let status = response.status();
|
||||||
let body: serde_json::Value = response.json().await?;
|
let body: serde_json::Value = response.json().await?;
|
||||||
|
|
@ -135,7 +129,7 @@ fn divergence(baseline: &[ScoreResult], without: &[ScoreResult]) -> Vec<f64> {
|
||||||
|
|
||||||
/// Score two message sets and return total divergence.
|
/// Score two message sets and return total divergence.
|
||||||
async fn score_divergence(
|
async fn score_divergence(
|
||||||
http: &reqwest::Client,
|
http: &crate::agent::api::http::HttpClient,
|
||||||
client: &ApiClient,
|
client: &ApiClient,
|
||||||
context: &ContextState,
|
context: &ContextState,
|
||||||
range: std::ops::Range<usize>,
|
range: std::ops::Range<usize>,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue