provenance: pass directly through thought::dispatch, remove globals
Provenance now flows as a function parameter through the entire tool dispatch chain: thought::dispatch → memory::dispatch → store methods. Removed task_local (TASK_AGENT), thread_local (TASK_PHASE), and env var (POC_PROVENANCE) from the tool dispatch path. The env var remains only as a fallback for non-tool paths (CLI commands, digest). Phase names are passed from knowledge.rs → llm.rs → api.rs, and api.rs updates the provenance string between steps. No globals needed.
This commit is contained in:
parent
36bde60ba0
commit
92ca2bf2c8
7 changed files with 34 additions and 47 deletions
|
|
@ -39,8 +39,8 @@ pub async fn dispatch(
|
||||||
return result.unwrap_or_else(ToolOutput::error);
|
return result.unwrap_or_else(ToolOutput::error);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delegate to shared thought layer
|
// Delegate to shared thought layer (poc-agent uses default provenance)
|
||||||
if let Some(output) = crate::thought::dispatch(name, args, tracker).await {
|
if let Some(output) = crate::thought::dispatch(name, args, tracker, None).await {
|
||||||
return output;
|
return output;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -37,7 +37,7 @@ pub use parse::{MemoryUnit, parse_units};
|
||||||
pub use view::{StoreView, AnyView};
|
pub use view::{StoreView, AnyView};
|
||||||
pub use persist::fsck;
|
pub use persist::fsck;
|
||||||
pub use persist::strip_md_keys;
|
pub use persist::strip_md_keys;
|
||||||
pub use ops::{TASK_AGENT, set_phase};
|
pub use ops::current_provenance;
|
||||||
|
|
||||||
use crate::graph::{self, Graph};
|
use crate::graph::{self, Graph};
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,34 +7,11 @@ use super::types::*;
|
||||||
|
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
|
|
||||||
tokio::task_local! {
|
/// Fallback provenance for non-tool-dispatch paths (CLI, digest, etc.).
|
||||||
/// Task-scoped agent name for provenance. Set before running an agent's
|
/// Tool dispatch passes provenance directly through thought::dispatch.
|
||||||
/// tool calls, so all writes within that task are attributed to the agent.
|
|
||||||
pub static TASK_AGENT: String;
|
|
||||||
}
|
|
||||||
|
|
||||||
thread_local! {
|
|
||||||
/// Current phase within a multi-step agent. Updated by the bail function
|
|
||||||
/// between steps. Combined with TASK_AGENT to form the full provenance.
|
|
||||||
static TASK_PHASE: std::cell::RefCell<Option<String>> = const { std::cell::RefCell::new(None) };
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Set the current phase (called from bail function between steps).
|
|
||||||
pub fn set_phase(phase: &str) {
|
|
||||||
TASK_PHASE.with(|p| *p.borrow_mut() = Some(phase.to_string()));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get the full provenance string: "agent:phase" or "agent" or env var or "manual".
|
|
||||||
pub fn current_provenance() -> String {
|
pub fn current_provenance() -> String {
|
||||||
let agent = TASK_AGENT.try_with(|a| a.clone()).ok();
|
std::env::var("POC_PROVENANCE")
|
||||||
let phase = TASK_PHASE.with(|p| p.borrow().clone());
|
.unwrap_or_else(|_| "manual".to_string())
|
||||||
|
|
||||||
match (agent, phase) {
|
|
||||||
(Some(a), Some(p)) => format!("{}:{}", a, p),
|
|
||||||
(Some(a), None) => a,
|
|
||||||
_ => std::env::var("POC_PROVENANCE")
|
|
||||||
.unwrap_or_else(|_| "manual".to_string()),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Store {
|
impl Store {
|
||||||
|
|
|
||||||
|
|
@ -32,8 +32,9 @@ fn get_client() -> Result<&'static ApiClient, String> {
|
||||||
/// context carries forward naturally between steps.
|
/// context carries forward naturally between steps.
|
||||||
/// Returns the final text response after all steps complete.
|
/// Returns the final text response after all steps complete.
|
||||||
pub async fn call_api_with_tools(
|
pub async fn call_api_with_tools(
|
||||||
_agent: &str,
|
agent: &str,
|
||||||
prompts: &[String],
|
prompts: &[String],
|
||||||
|
phases: &[String],
|
||||||
temperature: Option<f32>,
|
temperature: Option<f32>,
|
||||||
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
|
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
|
||||||
log: &dyn Fn(&str),
|
log: &dyn Fn(&str),
|
||||||
|
|
@ -46,6 +47,13 @@ pub async fn call_api_with_tools(
|
||||||
// Build tool definitions — all shared tools (memory, files, bash, journal)
|
// Build tool definitions — all shared tools (memory, files, bash, journal)
|
||||||
let tool_defs = thought::all_definitions();
|
let tool_defs = thought::all_definitions();
|
||||||
let tracker = ProcessTracker::new();
|
let tracker = ProcessTracker::new();
|
||||||
|
// Provenance tracks which agent:phase is making writes.
|
||||||
|
// Updated between steps by the bail function via set_provenance().
|
||||||
|
let first_phase = phases.first().map(|s| s.as_str()).unwrap_or("");
|
||||||
|
let provenance = std::cell::RefCell::new(
|
||||||
|
if first_phase.is_empty() { format!("agent:{}", agent) }
|
||||||
|
else { format!("agent:{}:{}", agent, first_phase) }
|
||||||
|
);
|
||||||
|
|
||||||
// Start with the first prompt as a user message
|
// Start with the first prompt as a user message
|
||||||
let mut messages = vec![Message::user(&prompts[0])];
|
let mut messages = vec![Message::user(&prompts[0])];
|
||||||
|
|
@ -157,7 +165,8 @@ pub async fn call_api_with_tools(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let output = match thought::dispatch(&call.function.name, &args, &tracker).await {
|
let prov = provenance.borrow().clone();
|
||||||
|
let output = match thought::dispatch(&call.function.name, &args, &tracker, Some(&prov)).await {
|
||||||
Some(out) => out,
|
Some(out) => out,
|
||||||
None => thought::ToolOutput::error(format!("Unknown tool: {}", call.function.name)),
|
None => thought::ToolOutput::error(format!("Unknown tool: {}", call.function.name)),
|
||||||
};
|
};
|
||||||
|
|
@ -187,6 +196,10 @@ pub async fn call_api_with_tools(
|
||||||
if let Some(ref check) = bail_fn {
|
if let Some(ref check) = bail_fn {
|
||||||
check(next_prompt_idx)?;
|
check(next_prompt_idx)?;
|
||||||
}
|
}
|
||||||
|
// Update provenance for the new phase
|
||||||
|
if let Some(phase) = phases.get(next_prompt_idx) {
|
||||||
|
*provenance.borrow_mut() = format!("agent:{}:{}", agent, phase);
|
||||||
|
}
|
||||||
messages.push(Message::assistant(&text));
|
messages.push(Message::assistant(&text));
|
||||||
let next = &prompts[next_prompt_idx];
|
let next = &prompts[next_prompt_idx];
|
||||||
next_prompt_idx += 1;
|
next_prompt_idx += 1;
|
||||||
|
|
@ -206,6 +219,7 @@ pub async fn call_api_with_tools(
|
||||||
pub fn call_api_with_tools_sync(
|
pub fn call_api_with_tools_sync(
|
||||||
agent: &str,
|
agent: &str,
|
||||||
prompts: &[String],
|
prompts: &[String],
|
||||||
|
phases: &[String],
|
||||||
temperature: Option<f32>,
|
temperature: Option<f32>,
|
||||||
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
|
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
|
||||||
log: &(dyn Fn(&str) + Sync),
|
log: &(dyn Fn(&str) + Sync),
|
||||||
|
|
@ -216,10 +230,8 @@ pub fn call_api_with_tools_sync(
|
||||||
.enable_all()
|
.enable_all()
|
||||||
.build()
|
.build()
|
||||||
.map_err(|e| format!("tokio runtime: {}", e))?;
|
.map_err(|e| format!("tokio runtime: {}", e))?;
|
||||||
let agent_name = format!("agent:{}", agent);
|
|
||||||
rt.block_on(
|
rt.block_on(
|
||||||
crate::store::TASK_AGENT.scope(agent_name,
|
call_api_with_tools(agent, prompts, phases, temperature, bail_fn, log)
|
||||||
call_api_with_tools(agent, prompts, temperature, bail_fn, log))
|
|
||||||
)
|
)
|
||||||
}).join().unwrap()
|
}).join().unwrap()
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -333,17 +333,13 @@ fn run_one_agent_inner(
|
||||||
.map(|s| s.prompt.clone()).collect();
|
.map(|s| s.prompt.clone()).collect();
|
||||||
let step_phases: Vec<String> = agent_batch.steps.iter()
|
let step_phases: Vec<String> = agent_batch.steps.iter()
|
||||||
.map(|s| s.phase.clone()).collect();
|
.map(|s| s.phase.clone()).collect();
|
||||||
|
let step_phases_for_bail = step_phases.clone();
|
||||||
|
|
||||||
for (i, s) in agent_batch.steps.iter().enumerate() {
|
for (i, s) in agent_batch.steps.iter().enumerate() {
|
||||||
log(&format!("=== PROMPT {}/{} ({}) ===\n\n{}", i + 1, n_steps, s.phase, s.prompt));
|
log(&format!("=== PROMPT {}/{} ({}) ===\n\n{}", i + 1, n_steps, s.phase, s.prompt));
|
||||||
}
|
}
|
||||||
log("\n=== CALLING LLM ===");
|
log("\n=== CALLING LLM ===");
|
||||||
|
|
||||||
// Set initial phase for provenance tracking
|
|
||||||
if let Some(first_phase) = step_phases.first() {
|
|
||||||
crate::store::set_phase(first_phase);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Bail check: if the agent defines a bail script, run it between steps.
|
// Bail check: if the agent defines a bail script, run it between steps.
|
||||||
// The script receives the pid file path as $1, cwd = state dir.
|
// The script receives the pid file path as $1, cwd = state dir.
|
||||||
let bail_script = def.bail.as_ref().map(|name| {
|
let bail_script = def.bail.as_ref().map(|name| {
|
||||||
|
|
@ -355,9 +351,8 @@ fn run_one_agent_inner(
|
||||||
let pid_path_for_bail = pid_path.clone();
|
let pid_path_for_bail = pid_path.clone();
|
||||||
let bail_fn = move |step_idx: usize| -> Result<(), String> {
|
let bail_fn = move |step_idx: usize| -> Result<(), String> {
|
||||||
// Update phase in pid file and provenance tracking
|
// Update phase in pid file and provenance tracking
|
||||||
if step_idx < step_phases.len() {
|
if step_idx < step_phases_for_bail.len() {
|
||||||
write_pid(&step_phases[step_idx]);
|
write_pid(&step_phases_for_bail[step_idx]);
|
||||||
crate::store::set_phase(&step_phases[step_idx]);
|
|
||||||
}
|
}
|
||||||
// Run bail script if defined
|
// Run bail script if defined
|
||||||
if let Some(ref script) = bail_script {
|
if let Some(ref script) = bail_script {
|
||||||
|
|
@ -375,7 +370,7 @@ fn run_one_agent_inner(
|
||||||
Ok(())
|
Ok(())
|
||||||
};
|
};
|
||||||
|
|
||||||
let output = llm::call_for_def_multi(def, &prompts, Some(&bail_fn), log)?;
|
let output = llm::call_for_def_multi(def, &prompts, &step_phases, Some(&bail_fn), log)?;
|
||||||
|
|
||||||
Ok(AgentResult {
|
Ok(AgentResult {
|
||||||
output,
|
output,
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,8 @@ pub(crate) fn call_simple(caller: &str, prompt: &str) -> Result<String, String>
|
||||||
};
|
};
|
||||||
|
|
||||||
let prompts = vec![prompt.to_string()];
|
let prompts = vec![prompt.to_string()];
|
||||||
super::api::call_api_with_tools_sync(caller, &prompts, None, None, &log)
|
let phases = vec![];
|
||||||
|
super::api::call_api_with_tools_sync(caller, &prompts, &phases, None, None, &log)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Call a model using an agent definition's configuration (multi-step).
|
/// Call a model using an agent definition's configuration (multi-step).
|
||||||
|
|
@ -28,10 +29,11 @@ pub(crate) fn call_simple(caller: &str, prompt: &str) -> Result<String, String>
|
||||||
pub(crate) fn call_for_def_multi(
|
pub(crate) fn call_for_def_multi(
|
||||||
def: &super::defs::AgentDef,
|
def: &super::defs::AgentDef,
|
||||||
prompts: &[String],
|
prompts: &[String],
|
||||||
|
phases: &[String],
|
||||||
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
|
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
|
||||||
log: &(dyn Fn(&str) + Sync),
|
log: &(dyn Fn(&str) + Sync),
|
||||||
) -> Result<String, String> {
|
) -> Result<String, String> {
|
||||||
super::api::call_api_with_tools_sync(&def.agent, prompts, def.temperature, bail_fn, log)
|
super::api::call_api_with_tools_sync(&def.agent, prompts, phases, def.temperature, bail_fn, log)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -74,10 +74,11 @@ pub async fn dispatch(
|
||||||
name: &str,
|
name: &str,
|
||||||
args: &serde_json::Value,
|
args: &serde_json::Value,
|
||||||
tracker: &ProcessTracker,
|
tracker: &ProcessTracker,
|
||||||
|
provenance: Option<&str>,
|
||||||
) -> Option<ToolOutput> {
|
) -> Option<ToolOutput> {
|
||||||
// Memory and journal tools
|
// Memory and journal tools
|
||||||
if name.starts_with("memory_") || name.starts_with("journal_") || name == "output" {
|
if name.starts_with("memory_") || name.starts_with("journal_") || name == "output" {
|
||||||
let result = memory::dispatch(name, args, None);
|
let result = memory::dispatch(name, args, provenance);
|
||||||
return Some(match result {
|
return Some(match result {
|
||||||
Ok(s) => ToolOutput::text(s),
|
Ok(s) => ToolOutput::text(s),
|
||||||
Err(e) => ToolOutput::error(e),
|
Err(e) => ToolOutput::error(e),
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue