From c72eb4d528560dc7382366690512ab7377f16f03 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Wed, 1 Apr 2026 23:21:39 -0400 Subject: [PATCH] vLLM priority scheduling for agents Thread request priority through the API call chain to vLLM's priority scheduler. Lower value = higher priority, with preemption. Priority is set per-agent in the .agent header: - interactive (runner): 0 (default, highest) - surface-observe: 1 (near-realtime, watches conversation) - all other agents: 10 (batch, default if not specified) Requires vLLM started with --scheduling-policy priority. Co-Authored-By: Proof of Concept --- src/agent/api/mod.rs | 8 +++++--- src/agent/api/openai.rs | 2 ++ src/agent/runner.rs | 1 + src/agent/types.rs | 4 ++++ src/subconscious/agents/surface-observe.agent | 2 +- src/subconscious/api.rs | 5 ++++- src/subconscious/defs.rs | 8 ++++++++ src/subconscious/llm.rs | 4 ++-- 8 files changed, 27 insertions(+), 7 deletions(-) diff --git a/src/agent/api/mod.rs b/src/agent/api/mod.rs index 2cb133a..528ea8b 100644 --- a/src/agent/api/mod.rs +++ b/src/agent/api/mod.rs @@ -103,6 +103,7 @@ impl ApiClient { ui_tx: &UiSender, reasoning_effort: &str, temperature: Option, + priority: Option, ) -> mpsc::UnboundedReceiver { let (tx, rx) = mpsc::unbounded_channel(); let client = self.client.clone(); @@ -123,7 +124,7 @@ impl ApiClient { openai::stream_events( &client, base_url, &api_key, &model, &messages, tools.as_deref(), &tx, &ui_tx, - &reasoning_effort, temperature, + &reasoning_effort, temperature, priority, ).await } Backend::Anthropic => { @@ -155,7 +156,7 @@ impl ApiClient { ui_tx: &UiSender, reasoning_effort: &str, ) -> Result<(Message, Option)> { - self.chat_completion_stream_temp(messages, tools, ui_tx, reasoning_effort, None).await + self.chat_completion_stream_temp(messages, tools, ui_tx, reasoning_effort, None, None).await } pub async fn chat_completion_stream_temp( @@ -165,9 +166,10 @@ impl ApiClient { ui_tx: &UiSender, reasoning_effort: &str, temperature: Option, + priority: Option, ) -> Result<(Message, Option)> { // Use the event stream and accumulate into a message. - let mut rx = self.start_stream(messages, tools, ui_tx, reasoning_effort, temperature); + let mut rx = self.start_stream(messages, tools, ui_tx, reasoning_effort, temperature, priority); let mut content = String::new(); let mut tool_calls: Vec = Vec::new(); let mut usage = None; diff --git a/src/agent/api/openai.rs b/src/agent/api/openai.rs index 814bec6..68fc5a8 100644 --- a/src/agent/api/openai.rs +++ b/src/agent/api/openai.rs @@ -26,6 +26,7 @@ pub async fn stream_events( ui_tx: &UiSender, reasoning_effort: &str, temperature: Option, + priority: Option, ) -> Result<()> { let request = ChatRequest { model: model.to_string(), @@ -44,6 +45,7 @@ pub async fn stream_events( None }, chat_template_kwargs: None, + priority, }; let url = format!("{}/chat/completions", base_url); diff --git a/src/agent/runner.rs b/src/agent/runner.rs index b8a1e15..0becf81 100644 --- a/src/agent/runner.rs +++ b/src/agent/runner.rs @@ -261,6 +261,7 @@ impl Agent { ui_tx, &self.reasoning_effort, None, + None, // priority: interactive ); let mut content = String::new(); diff --git a/src/agent/types.rs b/src/agent/types.rs index be1c77e..ad1df93 100644 --- a/src/agent/types.rs +++ b/src/agent/types.rs @@ -132,6 +132,10 @@ pub struct ChatRequest { /// vllm chat template kwargs — used to disable thinking on Qwen 3.5 #[serde(skip_serializing_if = "Option::is_none")] pub chat_template_kwargs: Option, + /// vllm request priority (lower = higher priority). + /// 0 = interactive, 1 = surface-observe, 10 = batch agents. + #[serde(skip_serializing_if = "Option::is_none")] + pub priority: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/src/subconscious/agents/surface-observe.agent b/src/subconscious/agents/surface-observe.agent index ab658f4..067532a 100644 --- a/src/subconscious/agents/surface-observe.agent +++ b/src/subconscious/agents/surface-observe.agent @@ -1,4 +1,4 @@ -{"agent":"surface-observe","query":"","count":1,"bail":"bail-no-competing.sh"} +{"agent":"surface-observe","query":"","count":1,"priority":1,"bail":"bail-no-competing.sh"} === PROMPT phase:surface === diff --git a/src/subconscious/api.rs b/src/subconscious/api.rs index c1c044f..888bb51 100644 --- a/src/subconscious/api.rs +++ b/src/subconscious/api.rs @@ -35,6 +35,7 @@ pub async fn call_api_with_tools( prompts: &[String], phases: &[String], temperature: Option, + priority: i32, tools: &[String], bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, log: &dyn Fn(&str), @@ -82,6 +83,7 @@ pub async fn call_api_with_tools( &ui_tx, &reasoning, temperature, + Some(priority), ).await { Ok((msg, usage)) => { msg_opt = Some(msg); @@ -233,6 +235,7 @@ pub fn call_api_with_tools_sync( prompts: &[String], phases: &[String], temperature: Option, + priority: i32, tools: &[String], bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, log: &(dyn Fn(&str) + Sync), @@ -244,7 +247,7 @@ pub fn call_api_with_tools_sync( .build() .map_err(|e| format!("tokio runtime: {}", e))?; rt.block_on( - call_api_with_tools(agent, prompts, phases, temperature, tools, bail_fn, log) + call_api_with_tools(agent, prompts, phases, temperature, priority, tools, bail_fn, log) ) }).join().unwrap() }) diff --git a/src/subconscious/defs.rs b/src/subconscious/defs.rs index cd80d10..68ab385 100644 --- a/src/subconscious/defs.rs +++ b/src/subconscious/defs.rs @@ -46,6 +46,9 @@ pub struct AgentDef { pub chunk_size: Option, pub chunk_overlap: Option, pub temperature: Option, + /// vLLM scheduling priority (lower = higher priority). + /// 0 = interactive, 1 = near-realtime, 10 = batch (default). + pub priority: i32, /// Bail check command — run between steps with pid file path as $1, /// cwd = state dir. Non-zero exit = stop the pipeline. pub bail: Option, @@ -75,6 +78,9 @@ struct AgentHeader { /// LLM temperature override #[serde(default)] temperature: Option, + /// vLLM scheduling priority (lower = higher priority, default 10 = batch) + #[serde(default = "default_priority")] + priority: i32, /// Bail check command — run between steps with pid file path as $1, /// cwd = state dir. Non-zero exit = stop the pipeline. #[serde(default)] @@ -82,6 +88,7 @@ struct AgentHeader { } fn default_model() -> String { "sonnet".into() } +fn default_priority() -> i32 { 10 } /// Parse an agent file: first line is JSON config, rest is the prompt(s). /// Multiple prompts are separated by `=== PROMPT [phase:name] ===` lines. @@ -149,6 +156,7 @@ fn parse_agent_file(content: &str) -> Option { chunk_size: header.chunk_size, chunk_overlap: header.chunk_overlap, temperature: header.temperature, + priority: header.priority, bail: header.bail, }) } diff --git a/src/subconscious/llm.rs b/src/subconscious/llm.rs index 0643623..b8e552a 100644 --- a/src/subconscious/llm.rs +++ b/src/subconscious/llm.rs @@ -22,7 +22,7 @@ pub(crate) fn call_simple(caller: &str, prompt: &str) -> Result let prompts = vec![prompt.to_string()]; let phases = vec![]; - super::api::call_api_with_tools_sync(caller, &prompts, &phases, None, &[], None, &log) + super::api::call_api_with_tools_sync(caller, &prompts, &phases, None, 10, &[], None, &log) } /// Call a model using an agent definition's configuration (multi-step). @@ -34,7 +34,7 @@ pub(crate) fn call_for_def_multi( bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, log: &(dyn Fn(&str) + Sync), ) -> Result { - super::api::call_api_with_tools_sync(&def.agent, prompts, phases, def.temperature, &def.tools, bail_fn, log) + super::api::call_api_with_tools_sync(&def.agent, prompts, phases, def.temperature, def.priority, &def.tools, bail_fn, log) }