feat(agent): wire intent-first pipeline into all entrypoints
Replace preprocessor+runAgentLoop with runPipeline in both device.ts (WebSocket) and goals.ts (REST). The pipeline orchestrates: deterministic parser (stage 1) -> LLM classifier (stage 2) -> lean UI agent (stage 3). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
293
server/src/agent/pipeline.ts
Normal file
293
server/src/agent/pipeline.ts
Normal file
@@ -0,0 +1,293 @@
|
||||
/**
|
||||
* Intent-First Pipeline Orchestrator for DroidClaw.
|
||||
*
|
||||
* Goal → Parser (0 LLM) → Classifier (1 LLM) → UI Agent (3-8 LLM)
|
||||
* handles ~30% handles ~20% handles ~50%
|
||||
*/
|
||||
|
||||
import type {
|
||||
InstalledApp,
|
||||
PipelineResult,
|
||||
} from "@droidclaw/shared";
|
||||
import { sessions } from "../ws/sessions.js";
|
||||
import { db } from "../db.js";
|
||||
import {
|
||||
device as deviceTable,
|
||||
agentSession,
|
||||
agentStep,
|
||||
} from "../schema.js";
|
||||
import { eq } from "drizzle-orm";
|
||||
import { parseGoal, buildCapabilities } from "./parser.js";
|
||||
import { classifyGoal } from "./classifier.js";
|
||||
import { runAgentLoop, type AgentLoopOptions, type AgentResult } from "./loop.js";
|
||||
import type { LLMConfig } from "./llm.js";
|
||||
|
||||
// ─── Types ───────────────────────────────────────────────────
|
||||
|
||||
export interface PipelineOptions {
|
||||
deviceId: string;
|
||||
persistentDeviceId?: string;
|
||||
userId: string;
|
||||
goal: string;
|
||||
llmConfig: LLMConfig;
|
||||
maxSteps?: number;
|
||||
signal?: AbortSignal;
|
||||
onStep?: AgentLoopOptions["onStep"];
|
||||
onComplete?: AgentLoopOptions["onComplete"];
|
||||
}
|
||||
|
||||
export interface PipelineResultFinal {
|
||||
success: boolean;
|
||||
stepsUsed: number;
|
||||
sessionId: string;
|
||||
resolvedBy: "parser" | "classifier" | "ui_agent";
|
||||
}
|
||||
|
||||
// ─── Helpers ─────────────────────────────────────────────────
|
||||
|
||||
async function fetchInstalledApps(
|
||||
persistentDeviceId: string
|
||||
): Promise<InstalledApp[]> {
|
||||
try {
|
||||
const rows = await db
|
||||
.select({ info: deviceTable.deviceInfo })
|
||||
.from(deviceTable)
|
||||
.where(eq(deviceTable.id, persistentDeviceId))
|
||||
.limit(1);
|
||||
const info = rows[0]?.info as Record<string, unknown> | null;
|
||||
return (info?.installedApps as InstalledApp[]) ?? [];
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
async function executeResult(
|
||||
deviceId: string,
|
||||
result: PipelineResult
|
||||
): Promise<{ success: boolean; error?: string }> {
|
||||
try {
|
||||
switch (result.type) {
|
||||
case "intent": {
|
||||
const res = (await sessions.sendCommand(deviceId, {
|
||||
type: "intent",
|
||||
intentAction: result.intent.intentAction,
|
||||
intentUri: result.intent.uri,
|
||||
intentType: result.intent.intentType,
|
||||
intentExtras: result.intent.extras,
|
||||
packageName: result.intent.packageName,
|
||||
})) as { success?: boolean; error?: string };
|
||||
return { success: res.success !== false, error: res.error };
|
||||
}
|
||||
case "launch": {
|
||||
const res = (await sessions.sendCommand(deviceId, {
|
||||
type: "launch",
|
||||
packageName: result.packageName,
|
||||
})) as { success?: boolean; error?: string };
|
||||
return { success: res.success !== false, error: res.error };
|
||||
}
|
||||
case "open_url": {
|
||||
const res = (await sessions.sendCommand(deviceId, {
|
||||
type: "open_url",
|
||||
url: result.url,
|
||||
})) as { success?: boolean; error?: string };
|
||||
return { success: res.success !== false, error: res.error };
|
||||
}
|
||||
case "open_settings": {
|
||||
const res = (await sessions.sendCommand(deviceId, {
|
||||
type: "open_settings",
|
||||
setting: result.setting,
|
||||
})) as { success?: boolean; error?: string };
|
||||
return { success: res.success !== false, error: res.error };
|
||||
}
|
||||
default:
|
||||
return { success: false, error: "Unknown result type" };
|
||||
}
|
||||
} catch (err) {
|
||||
return { success: false, error: (err as Error).message };
|
||||
}
|
||||
}
|
||||
|
||||
async function persistQuickSession(
|
||||
userId: string,
|
||||
persistentDeviceId: string,
|
||||
goal: string,
|
||||
stage: string,
|
||||
action: Record<string, unknown>
|
||||
): Promise<string> {
|
||||
const sessionId = crypto.randomUUID();
|
||||
try {
|
||||
await db.insert(agentSession).values({
|
||||
id: sessionId,
|
||||
userId,
|
||||
deviceId: persistentDeviceId,
|
||||
goal,
|
||||
status: "completed",
|
||||
stepsUsed: 1,
|
||||
completedAt: new Date(),
|
||||
});
|
||||
await db.insert(agentStep).values({
|
||||
id: crypto.randomUUID(),
|
||||
sessionId,
|
||||
stepNumber: 1,
|
||||
action,
|
||||
reasoning: `${stage}: direct action`,
|
||||
result: "OK",
|
||||
});
|
||||
} catch (err) {
|
||||
console.error(`[Pipeline] Failed to persist session: ${err}`);
|
||||
}
|
||||
return sessionId;
|
||||
}
|
||||
|
||||
// ─── Main Pipeline ───────────────────────────────────────────
|
||||
|
||||
export async function runPipeline(
|
||||
options: PipelineOptions
|
||||
): Promise<PipelineResultFinal> {
|
||||
const {
|
||||
deviceId,
|
||||
persistentDeviceId,
|
||||
userId,
|
||||
goal,
|
||||
llmConfig,
|
||||
maxSteps,
|
||||
signal,
|
||||
onStep,
|
||||
onComplete,
|
||||
} = options;
|
||||
|
||||
// ── Load device capabilities ─────────────────────────────
|
||||
const apps = persistentDeviceId
|
||||
? await fetchInstalledApps(persistentDeviceId)
|
||||
: [];
|
||||
const caps = buildCapabilities(apps);
|
||||
|
||||
// ── Stage 1: Deterministic Parser ────────────────────────
|
||||
const parseResult = parseGoal(goal, caps);
|
||||
console.log(`[Pipeline] Stage 1 (Parser): ${parseResult.type}`);
|
||||
|
||||
if (parseResult.type !== "passthrough") {
|
||||
if (parseResult.type === "done") {
|
||||
const sessionId = persistentDeviceId
|
||||
? await persistQuickSession(userId, persistentDeviceId, goal, "parser", { done: true, reason: parseResult.reason })
|
||||
: crypto.randomUUID();
|
||||
onComplete?.({ success: true, stepsUsed: 0, sessionId });
|
||||
return { success: true, stepsUsed: 0, sessionId, resolvedBy: "parser" };
|
||||
}
|
||||
|
||||
const execResult = await executeResult(deviceId, parseResult);
|
||||
if (execResult.success) {
|
||||
await new Promise((r) => setTimeout(r, 1500));
|
||||
|
||||
const sessionId = persistentDeviceId
|
||||
? await persistQuickSession(userId, persistentDeviceId, goal, "parser", parseResult as unknown as Record<string, unknown>)
|
||||
: crypto.randomUUID();
|
||||
|
||||
onStep?.({
|
||||
stepNumber: 1,
|
||||
action: { action: parseResult.type, reason: `Parser: ${parseResult.type}` } as any,
|
||||
reasoning: `Parser: direct ${parseResult.type} action`,
|
||||
screenHash: "",
|
||||
});
|
||||
|
||||
sessions.notifyDashboard(userId, {
|
||||
type: "goal_completed",
|
||||
sessionId,
|
||||
success: true,
|
||||
stepsUsed: 1,
|
||||
});
|
||||
|
||||
onComplete?.({ success: true, stepsUsed: 1, sessionId });
|
||||
console.log(`[Pipeline] Goal resolved by parser: ${goal}`);
|
||||
return { success: true, stepsUsed: 1, sessionId, resolvedBy: "parser" };
|
||||
}
|
||||
|
||||
console.warn(`[Pipeline] Parser action failed: ${execResult.error}. Falling through to classifier.`);
|
||||
}
|
||||
|
||||
// ── Stage 2: LLM Classifier ──────────────────────────────
|
||||
const classResult = await classifyGoal(goal, caps, llmConfig);
|
||||
console.log(`[Pipeline] Stage 2 (Classifier): ${classResult.type}`);
|
||||
|
||||
if (classResult.type === "done") {
|
||||
const sessionId = persistentDeviceId
|
||||
? await persistQuickSession(userId, persistentDeviceId, goal, "classifier", { done: true, reason: classResult.reason })
|
||||
: crypto.randomUUID();
|
||||
onComplete?.({ success: true, stepsUsed: 1, sessionId });
|
||||
return { success: true, stepsUsed: 1, sessionId, resolvedBy: "classifier" };
|
||||
}
|
||||
|
||||
if (classResult.type === "intent") {
|
||||
const execResult = await executeResult(deviceId, classResult);
|
||||
if (execResult.success) {
|
||||
await new Promise((r) => setTimeout(r, 1500));
|
||||
|
||||
const sessionId = persistentDeviceId
|
||||
? await persistQuickSession(userId, persistentDeviceId, goal, "classifier", classResult.intent as unknown as Record<string, unknown>)
|
||||
: crypto.randomUUID();
|
||||
|
||||
onStep?.({
|
||||
stepNumber: 1,
|
||||
action: { action: "intent", reason: "Classifier: intent" } as any,
|
||||
reasoning: `Classifier: ${classResult.intent.intentAction}`,
|
||||
screenHash: "",
|
||||
});
|
||||
|
||||
sessions.notifyDashboard(userId, {
|
||||
type: "goal_completed",
|
||||
sessionId,
|
||||
success: true,
|
||||
stepsUsed: 1,
|
||||
});
|
||||
|
||||
onComplete?.({ success: true, stepsUsed: 1, sessionId });
|
||||
console.log(`[Pipeline] Goal resolved by classifier (intent): ${goal}`);
|
||||
return { success: true, stepsUsed: 1, sessionId, resolvedBy: "classifier" };
|
||||
}
|
||||
|
||||
console.warn(`[Pipeline] Classifier intent failed: ${execResult.error}. Falling through to UI agent.`);
|
||||
}
|
||||
|
||||
// ── Stage 3: Lean UI Agent ───────────────────────────────
|
||||
let effectiveGoal = goal;
|
||||
let appToLaunch: string | undefined;
|
||||
|
||||
if (classResult.type === "ui") {
|
||||
effectiveGoal = classResult.subGoal;
|
||||
appToLaunch = classResult.app;
|
||||
} else if (classResult.type === "launch") {
|
||||
appToLaunch = classResult.packageName;
|
||||
}
|
||||
|
||||
if (appToLaunch) {
|
||||
try {
|
||||
await sessions.sendCommand(deviceId, {
|
||||
type: "launch",
|
||||
packageName: appToLaunch,
|
||||
});
|
||||
await new Promise((r) => setTimeout(r, 1500));
|
||||
console.log(`[Pipeline] Launched ${appToLaunch} for UI agent`);
|
||||
} catch (err) {
|
||||
console.warn(`[Pipeline] Failed to launch ${appToLaunch}: ${err}`);
|
||||
}
|
||||
}
|
||||
|
||||
const loopResult = await runAgentLoop({
|
||||
deviceId,
|
||||
persistentDeviceId,
|
||||
userId,
|
||||
goal: effectiveGoal,
|
||||
originalGoal: effectiveGoal !== goal ? goal : undefined,
|
||||
llmConfig,
|
||||
maxSteps,
|
||||
signal,
|
||||
pipelineMode: true,
|
||||
onStep,
|
||||
onComplete,
|
||||
});
|
||||
|
||||
return {
|
||||
...loopResult,
|
||||
resolvedBy: "ui_agent",
|
||||
};
|
||||
}
|
||||
@@ -2,7 +2,7 @@ import { Hono } from "hono";
|
||||
import { eq } from "drizzle-orm";
|
||||
import { sessionMiddleware, type AuthEnv } from "../middleware/auth.js";
|
||||
import { sessions } from "../ws/sessions.js";
|
||||
import { runAgentLoop, type AgentLoopOptions } from "../agent/loop.js";
|
||||
import { runPipeline, type PipelineOptions } from "../agent/pipeline.js";
|
||||
import type { LLMConfig } from "../agent/llm.js";
|
||||
import { db } from "../db.js";
|
||||
import { llmConfig as llmConfigTable } from "../schema.js";
|
||||
@@ -84,42 +84,35 @@ goals.post("/", async (c) => {
|
||||
}
|
||||
}
|
||||
|
||||
const options: AgentLoopOptions = {
|
||||
const abort = new AbortController();
|
||||
|
||||
const pipelineOpts: PipelineOptions = {
|
||||
deviceId: device.deviceId,
|
||||
persistentDeviceId: device.persistentDeviceId,
|
||||
userId: user.id,
|
||||
goal: body.goal,
|
||||
llmConfig: llmCfg,
|
||||
maxSteps: body.maxSteps,
|
||||
signal: abort.signal,
|
||||
};
|
||||
|
||||
// Create abort controller for this session
|
||||
const abort = new AbortController();
|
||||
options.signal = abort.signal;
|
||||
|
||||
// Start the agent loop in the background (fire-and-forget).
|
||||
// The client observes progress via the /ws/dashboard WebSocket.
|
||||
const loopPromise = runAgentLoop(options);
|
||||
|
||||
// Track as active until it completes
|
||||
const sessionPlaceholder = { sessionId: "pending", goal: body.goal, abort };
|
||||
activeSessions.set(trackingKey, sessionPlaceholder);
|
||||
|
||||
const loopPromise = runPipeline(pipelineOpts);
|
||||
|
||||
loopPromise
|
||||
.then((result) => {
|
||||
activeSessions.delete(trackingKey);
|
||||
console.log(
|
||||
`[Agent] Completed on ${device.deviceId}: ${result.success ? "success" : "incomplete"} in ${result.stepsUsed} steps (session ${result.sessionId})`
|
||||
`[Pipeline] Completed on ${device.deviceId}: ${result.success ? "success" : "incomplete"} in ${result.stepsUsed} steps (resolved by ${result.resolvedBy})`
|
||||
);
|
||||
})
|
||||
.catch((err) => {
|
||||
activeSessions.delete(trackingKey);
|
||||
console.error(`[Agent] Error on ${device.deviceId}: ${err}`);
|
||||
console.error(`[Pipeline] Error on ${device.deviceId}: ${err}`);
|
||||
});
|
||||
|
||||
// We need the sessionId from the loop, but it's created inside runAgentLoop.
|
||||
// For immediate response, generate one here and let the dashboard events carry the real one.
|
||||
// The loop will emit goal_started with its sessionId momentarily.
|
||||
return c.json({
|
||||
deviceId: body.deviceId,
|
||||
goal: body.goal,
|
||||
|
||||
@@ -3,10 +3,9 @@ import type { DeviceMessage } from "@droidclaw/shared";
|
||||
import { eq, and } from "drizzle-orm";
|
||||
import { auth } from "../auth.js";
|
||||
import { db } from "../db.js";
|
||||
import { llmConfig, device, agentSession, agentStep } from "../schema.js";
|
||||
import { llmConfig, device } from "../schema.js";
|
||||
import { sessions, type WebSocketData } from "./sessions.js";
|
||||
import { runAgentLoop } from "../agent/loop.js";
|
||||
import { preprocessGoal } from "../agent/preprocessor.js";
|
||||
import { runPipeline } from "../agent/pipeline.js";
|
||||
import type { LLMConfig } from "../agent/llm.js";
|
||||
|
||||
/** Track running agent sessions to prevent duplicates per device */
|
||||
@@ -224,77 +223,16 @@ export async function handleDeviceMessage(
|
||||
break;
|
||||
}
|
||||
|
||||
// Preprocess: handle simple goals directly, or extract "open X" prefix
|
||||
let effectiveGoal = goal;
|
||||
try {
|
||||
const preResult = await preprocessGoal(deviceId, goal, persistentDeviceId);
|
||||
if (preResult.handled) {
|
||||
await new Promise((r) => setTimeout(r, 1500));
|
||||
|
||||
if (preResult.refinedGoal) {
|
||||
effectiveGoal = preResult.refinedGoal;
|
||||
sendToDevice(ws, {
|
||||
type: "step",
|
||||
step: 0,
|
||||
action: preResult.command,
|
||||
reasoning: "Preprocessor: launched app directly",
|
||||
});
|
||||
} else {
|
||||
// Pure "open X" — fully handled. Persist to DB then return.
|
||||
const sessionId = crypto.randomUUID();
|
||||
try {
|
||||
await db.insert(agentSession).values({
|
||||
id: sessionId,
|
||||
userId,
|
||||
deviceId: persistentDeviceId,
|
||||
goal,
|
||||
status: "completed",
|
||||
stepsUsed: 1,
|
||||
completedAt: new Date(),
|
||||
});
|
||||
await db.insert(agentStep).values({
|
||||
id: crypto.randomUUID(),
|
||||
sessionId,
|
||||
stepNumber: 1,
|
||||
action: preResult.command ?? null,
|
||||
reasoning: `Preprocessor: direct ${preResult.command?.type} action`,
|
||||
result: "OK",
|
||||
});
|
||||
} catch (err) {
|
||||
console.error(`[DB] Failed to save preprocessor session: ${err}`);
|
||||
}
|
||||
|
||||
sendToDevice(ws, { type: "goal_started", sessionId, goal });
|
||||
sendToDevice(ws, {
|
||||
type: "step",
|
||||
step: 1,
|
||||
action: preResult.command,
|
||||
reasoning: `Preprocessor: direct ${preResult.command?.type} action`,
|
||||
});
|
||||
sendToDevice(ws, { type: "goal_completed", success: true, stepsUsed: 1 });
|
||||
|
||||
sessions.notifyDashboard(userId, { type: "goal_completed", sessionId, success: true, stepsUsed: 1 });
|
||||
|
||||
console.log(`[Preprocessor] Goal handled directly: ${goal}`);
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.warn(`[Preprocessor] Error (falling through to LLM): ${err}`);
|
||||
}
|
||||
|
||||
console.log(`[Agent] Starting goal for device ${deviceId}: ${effectiveGoal}${effectiveGoal !== goal ? ` (original: ${goal})` : ""}`);
|
||||
console.log(`[Pipeline] Starting goal for device ${deviceId}: ${goal}`);
|
||||
activeSessions.set(deviceId, goal);
|
||||
|
||||
sendToDevice(ws, { type: "goal_started", sessionId: deviceId, goal });
|
||||
|
||||
// Run agent loop in background (DB persistence happens inside the loop)
|
||||
runAgentLoop({
|
||||
runPipeline({
|
||||
deviceId,
|
||||
persistentDeviceId,
|
||||
userId,
|
||||
goal: effectiveGoal,
|
||||
originalGoal: goal !== effectiveGoal ? goal : undefined,
|
||||
goal,
|
||||
llmConfig: userLlmConfig,
|
||||
onStep(step) {
|
||||
sendToDevice(ws, {
|
||||
@@ -312,13 +250,13 @@ export async function handleDeviceMessage(
|
||||
stepsUsed: result.stepsUsed,
|
||||
});
|
||||
console.log(
|
||||
`[Agent] Completed on ${deviceId}: ${result.success ? "success" : "incomplete"} in ${result.stepsUsed} steps`
|
||||
`[Pipeline] Completed on ${deviceId}: ${result.success ? "success" : "incomplete"} in ${result.stepsUsed} steps`
|
||||
);
|
||||
},
|
||||
}).catch((err) => {
|
||||
activeSessions.delete(deviceId);
|
||||
sendToDevice(ws, { type: "goal_failed", message: String(err) });
|
||||
console.error(`[Agent] Error on ${deviceId}:`, err);
|
||||
console.error(`[Pipeline] Error on ${deviceId}:`, err);
|
||||
});
|
||||
|
||||
break;
|
||||
@@ -331,7 +269,7 @@ export async function handleDeviceMessage(
|
||||
case "apps": {
|
||||
const persistentDeviceId = ws.data.persistentDeviceId;
|
||||
if (persistentDeviceId) {
|
||||
const apps = (msg as unknown as { apps: Array<{ packageName: string; label: string }> }).apps;
|
||||
const apps = (msg as unknown as { apps: Array<{ packageName: string; label: string; intents?: string[] }> }).apps;
|
||||
// Merge apps into existing deviceInfo
|
||||
db.update(device)
|
||||
.set({
|
||||
|
||||
Reference in New Issue
Block a user