vLLM priority scheduling for agents
Thread request priority through the API call chain to vLLM's priority scheduler. Lower value = higher priority, with preemption. Priority is set per-agent in the .agent header: - interactive (runner): 0 (default, highest) - surface-observe: 1 (near-realtime, watches conversation) - all other agents: 10 (batch, default if not specified) Requires vLLM started with --scheduling-policy priority. Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
503e2995c1
commit
c72eb4d528
8 changed files with 27 additions and 7 deletions
|
|
@ -103,6 +103,7 @@ impl ApiClient {
|
||||||
ui_tx: &UiSender,
|
ui_tx: &UiSender,
|
||||||
reasoning_effort: &str,
|
reasoning_effort: &str,
|
||||||
temperature: Option<f32>,
|
temperature: Option<f32>,
|
||||||
|
priority: Option<i32>,
|
||||||
) -> mpsc::UnboundedReceiver<StreamEvent> {
|
) -> mpsc::UnboundedReceiver<StreamEvent> {
|
||||||
let (tx, rx) = mpsc::unbounded_channel();
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
let client = self.client.clone();
|
let client = self.client.clone();
|
||||||
|
|
@ -123,7 +124,7 @@ impl ApiClient {
|
||||||
openai::stream_events(
|
openai::stream_events(
|
||||||
&client, base_url, &api_key, &model,
|
&client, base_url, &api_key, &model,
|
||||||
&messages, tools.as_deref(), &tx, &ui_tx,
|
&messages, tools.as_deref(), &tx, &ui_tx,
|
||||||
&reasoning_effort, temperature,
|
&reasoning_effort, temperature, priority,
|
||||||
).await
|
).await
|
||||||
}
|
}
|
||||||
Backend::Anthropic => {
|
Backend::Anthropic => {
|
||||||
|
|
@ -155,7 +156,7 @@ impl ApiClient {
|
||||||
ui_tx: &UiSender,
|
ui_tx: &UiSender,
|
||||||
reasoning_effort: &str,
|
reasoning_effort: &str,
|
||||||
) -> Result<(Message, Option<Usage>)> {
|
) -> Result<(Message, Option<Usage>)> {
|
||||||
self.chat_completion_stream_temp(messages, tools, ui_tx, reasoning_effort, None).await
|
self.chat_completion_stream_temp(messages, tools, ui_tx, reasoning_effort, None, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn chat_completion_stream_temp(
|
pub async fn chat_completion_stream_temp(
|
||||||
|
|
@ -165,9 +166,10 @@ impl ApiClient {
|
||||||
ui_tx: &UiSender,
|
ui_tx: &UiSender,
|
||||||
reasoning_effort: &str,
|
reasoning_effort: &str,
|
||||||
temperature: Option<f32>,
|
temperature: Option<f32>,
|
||||||
|
priority: Option<i32>,
|
||||||
) -> Result<(Message, Option<Usage>)> {
|
) -> Result<(Message, Option<Usage>)> {
|
||||||
// Use the event stream and accumulate into a message.
|
// Use the event stream and accumulate into a message.
|
||||||
let mut rx = self.start_stream(messages, tools, ui_tx, reasoning_effort, temperature);
|
let mut rx = self.start_stream(messages, tools, ui_tx, reasoning_effort, temperature, priority);
|
||||||
let mut content = String::new();
|
let mut content = String::new();
|
||||||
let mut tool_calls: Vec<ToolCall> = Vec::new();
|
let mut tool_calls: Vec<ToolCall> = Vec::new();
|
||||||
let mut usage = None;
|
let mut usage = None;
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,7 @@ pub async fn stream_events(
|
||||||
ui_tx: &UiSender,
|
ui_tx: &UiSender,
|
||||||
reasoning_effort: &str,
|
reasoning_effort: &str,
|
||||||
temperature: Option<f32>,
|
temperature: Option<f32>,
|
||||||
|
priority: Option<i32>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let request = ChatRequest {
|
let request = ChatRequest {
|
||||||
model: model.to_string(),
|
model: model.to_string(),
|
||||||
|
|
@ -44,6 +45,7 @@ pub async fn stream_events(
|
||||||
None
|
None
|
||||||
},
|
},
|
||||||
chat_template_kwargs: None,
|
chat_template_kwargs: None,
|
||||||
|
priority,
|
||||||
};
|
};
|
||||||
|
|
||||||
let url = format!("{}/chat/completions", base_url);
|
let url = format!("{}/chat/completions", base_url);
|
||||||
|
|
|
||||||
|
|
@ -261,6 +261,7 @@ impl Agent {
|
||||||
ui_tx,
|
ui_tx,
|
||||||
&self.reasoning_effort,
|
&self.reasoning_effort,
|
||||||
None,
|
None,
|
||||||
|
None, // priority: interactive
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut content = String::new();
|
let mut content = String::new();
|
||||||
|
|
|
||||||
|
|
@ -132,6 +132,10 @@ pub struct ChatRequest {
|
||||||
/// vllm chat template kwargs — used to disable thinking on Qwen 3.5
|
/// vllm chat template kwargs — used to disable thinking on Qwen 3.5
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub chat_template_kwargs: Option<serde_json::Value>,
|
pub chat_template_kwargs: Option<serde_json::Value>,
|
||||||
|
/// vllm request priority (lower = higher priority).
|
||||||
|
/// 0 = interactive, 1 = surface-observe, 10 = batch agents.
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub priority: Option<i32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
{"agent":"surface-observe","query":"","count":1,"bail":"bail-no-competing.sh"}
|
{"agent":"surface-observe","query":"","count":1,"priority":1,"bail":"bail-no-competing.sh"}
|
||||||
|
|
||||||
=== PROMPT phase:surface ===
|
=== PROMPT phase:surface ===
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,7 @@ pub async fn call_api_with_tools(
|
||||||
prompts: &[String],
|
prompts: &[String],
|
||||||
phases: &[String],
|
phases: &[String],
|
||||||
temperature: Option<f32>,
|
temperature: Option<f32>,
|
||||||
|
priority: i32,
|
||||||
tools: &[String],
|
tools: &[String],
|
||||||
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
|
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
|
||||||
log: &dyn Fn(&str),
|
log: &dyn Fn(&str),
|
||||||
|
|
@ -82,6 +83,7 @@ pub async fn call_api_with_tools(
|
||||||
&ui_tx,
|
&ui_tx,
|
||||||
&reasoning,
|
&reasoning,
|
||||||
temperature,
|
temperature,
|
||||||
|
Some(priority),
|
||||||
).await {
|
).await {
|
||||||
Ok((msg, usage)) => {
|
Ok((msg, usage)) => {
|
||||||
msg_opt = Some(msg);
|
msg_opt = Some(msg);
|
||||||
|
|
@ -233,6 +235,7 @@ pub fn call_api_with_tools_sync(
|
||||||
prompts: &[String],
|
prompts: &[String],
|
||||||
phases: &[String],
|
phases: &[String],
|
||||||
temperature: Option<f32>,
|
temperature: Option<f32>,
|
||||||
|
priority: i32,
|
||||||
tools: &[String],
|
tools: &[String],
|
||||||
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
|
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
|
||||||
log: &(dyn Fn(&str) + Sync),
|
log: &(dyn Fn(&str) + Sync),
|
||||||
|
|
@ -244,7 +247,7 @@ pub fn call_api_with_tools_sync(
|
||||||
.build()
|
.build()
|
||||||
.map_err(|e| format!("tokio runtime: {}", e))?;
|
.map_err(|e| format!("tokio runtime: {}", e))?;
|
||||||
rt.block_on(
|
rt.block_on(
|
||||||
call_api_with_tools(agent, prompts, phases, temperature, tools, bail_fn, log)
|
call_api_with_tools(agent, prompts, phases, temperature, priority, tools, bail_fn, log)
|
||||||
)
|
)
|
||||||
}).join().unwrap()
|
}).join().unwrap()
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -46,6 +46,9 @@ pub struct AgentDef {
|
||||||
pub chunk_size: Option<usize>,
|
pub chunk_size: Option<usize>,
|
||||||
pub chunk_overlap: Option<usize>,
|
pub chunk_overlap: Option<usize>,
|
||||||
pub temperature: Option<f32>,
|
pub temperature: Option<f32>,
|
||||||
|
/// vLLM scheduling priority (lower = higher priority).
|
||||||
|
/// 0 = interactive, 1 = near-realtime, 10 = batch (default).
|
||||||
|
pub priority: i32,
|
||||||
/// Bail check command — run between steps with pid file path as $1,
|
/// Bail check command — run between steps with pid file path as $1,
|
||||||
/// cwd = state dir. Non-zero exit = stop the pipeline.
|
/// cwd = state dir. Non-zero exit = stop the pipeline.
|
||||||
pub bail: Option<String>,
|
pub bail: Option<String>,
|
||||||
|
|
@ -75,6 +78,9 @@ struct AgentHeader {
|
||||||
/// LLM temperature override
|
/// LLM temperature override
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
temperature: Option<f32>,
|
temperature: Option<f32>,
|
||||||
|
/// vLLM scheduling priority (lower = higher priority, default 10 = batch)
|
||||||
|
#[serde(default = "default_priority")]
|
||||||
|
priority: i32,
|
||||||
/// Bail check command — run between steps with pid file path as $1,
|
/// Bail check command — run between steps with pid file path as $1,
|
||||||
/// cwd = state dir. Non-zero exit = stop the pipeline.
|
/// cwd = state dir. Non-zero exit = stop the pipeline.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
|
@ -82,6 +88,7 @@ struct AgentHeader {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_model() -> String { "sonnet".into() }
|
fn default_model() -> String { "sonnet".into() }
|
||||||
|
fn default_priority() -> i32 { 10 }
|
||||||
|
|
||||||
/// Parse an agent file: first line is JSON config, rest is the prompt(s).
|
/// Parse an agent file: first line is JSON config, rest is the prompt(s).
|
||||||
/// Multiple prompts are separated by `=== PROMPT [phase:name] ===` lines.
|
/// Multiple prompts are separated by `=== PROMPT [phase:name] ===` lines.
|
||||||
|
|
@ -149,6 +156,7 @@ fn parse_agent_file(content: &str) -> Option<AgentDef> {
|
||||||
chunk_size: header.chunk_size,
|
chunk_size: header.chunk_size,
|
||||||
chunk_overlap: header.chunk_overlap,
|
chunk_overlap: header.chunk_overlap,
|
||||||
temperature: header.temperature,
|
temperature: header.temperature,
|
||||||
|
priority: header.priority,
|
||||||
bail: header.bail,
|
bail: header.bail,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ pub(crate) fn call_simple(caller: &str, prompt: &str) -> Result<String, String>
|
||||||
|
|
||||||
let prompts = vec![prompt.to_string()];
|
let prompts = vec![prompt.to_string()];
|
||||||
let phases = vec![];
|
let phases = vec![];
|
||||||
super::api::call_api_with_tools_sync(caller, &prompts, &phases, None, &[], None, &log)
|
super::api::call_api_with_tools_sync(caller, &prompts, &phases, None, 10, &[], None, &log)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Call a model using an agent definition's configuration (multi-step).
|
/// Call a model using an agent definition's configuration (multi-step).
|
||||||
|
|
@ -34,7 +34,7 @@ pub(crate) fn call_for_def_multi(
|
||||||
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
|
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
|
||||||
log: &(dyn Fn(&str) + Sync),
|
log: &(dyn Fn(&str) + Sync),
|
||||||
) -> Result<String, String> {
|
) -> Result<String, String> {
|
||||||
super::api::call_api_with_tools_sync(&def.agent, prompts, phases, def.temperature, &def.tools, bail_fn, log)
|
super::api::call_api_with_tools_sync(&def.agent, prompts, phases, def.temperature, def.priority, &def.tools, bail_fn, log)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue