diff --git a/server/src/agent/pipeline.ts b/server/src/agent/pipeline.ts new file mode 100644 index 0000000..a74712f --- /dev/null +++ b/server/src/agent/pipeline.ts @@ -0,0 +1,293 @@ +/** + * Intent-First Pipeline Orchestrator for DroidClaw. + * + * Goal → Parser (0 LLM) → Classifier (1 LLM) → UI Agent (3-8 LLM) + * handles ~30% handles ~20% handles ~50% + */ + +import type { + InstalledApp, + PipelineResult, +} from "@droidclaw/shared"; +import { sessions } from "../ws/sessions.js"; +import { db } from "../db.js"; +import { + device as deviceTable, + agentSession, + agentStep, +} from "../schema.js"; +import { eq } from "drizzle-orm"; +import { parseGoal, buildCapabilities } from "./parser.js"; +import { classifyGoal } from "./classifier.js"; +import { runAgentLoop, type AgentLoopOptions, type AgentResult } from "./loop.js"; +import type { LLMConfig } from "./llm.js"; + +// ─── Types ─────────────────────────────────────────────────── + +export interface PipelineOptions { + deviceId: string; + persistentDeviceId?: string; + userId: string; + goal: string; + llmConfig: LLMConfig; + maxSteps?: number; + signal?: AbortSignal; + onStep?: AgentLoopOptions["onStep"]; + onComplete?: AgentLoopOptions["onComplete"]; +} + +export interface PipelineResultFinal { + success: boolean; + stepsUsed: number; + sessionId: string; + resolvedBy: "parser" | "classifier" | "ui_agent"; +} + +// ─── Helpers ───────────────────────────────────────────────── + +async function fetchInstalledApps( + persistentDeviceId: string +): Promise { + try { + const rows = await db + .select({ info: deviceTable.deviceInfo }) + .from(deviceTable) + .where(eq(deviceTable.id, persistentDeviceId)) + .limit(1); + const info = rows[0]?.info as Record | null; + return (info?.installedApps as InstalledApp[]) ?? []; + } catch { + return []; + } +} + +async function executeResult( + deviceId: string, + result: PipelineResult +): Promise<{ success: boolean; error?: string }> { + try { + switch (result.type) { + case "intent": { + const res = (await sessions.sendCommand(deviceId, { + type: "intent", + intentAction: result.intent.intentAction, + intentUri: result.intent.uri, + intentType: result.intent.intentType, + intentExtras: result.intent.extras, + packageName: result.intent.packageName, + })) as { success?: boolean; error?: string }; + return { success: res.success !== false, error: res.error }; + } + case "launch": { + const res = (await sessions.sendCommand(deviceId, { + type: "launch", + packageName: result.packageName, + })) as { success?: boolean; error?: string }; + return { success: res.success !== false, error: res.error }; + } + case "open_url": { + const res = (await sessions.sendCommand(deviceId, { + type: "open_url", + url: result.url, + })) as { success?: boolean; error?: string }; + return { success: res.success !== false, error: res.error }; + } + case "open_settings": { + const res = (await sessions.sendCommand(deviceId, { + type: "open_settings", + setting: result.setting, + })) as { success?: boolean; error?: string }; + return { success: res.success !== false, error: res.error }; + } + default: + return { success: false, error: "Unknown result type" }; + } + } catch (err) { + return { success: false, error: (err as Error).message }; + } +} + +async function persistQuickSession( + userId: string, + persistentDeviceId: string, + goal: string, + stage: string, + action: Record +): Promise { + const sessionId = crypto.randomUUID(); + try { + await db.insert(agentSession).values({ + id: sessionId, + userId, + deviceId: persistentDeviceId, + goal, + status: "completed", + stepsUsed: 1, + completedAt: new Date(), + }); + await db.insert(agentStep).values({ + id: crypto.randomUUID(), + sessionId, + stepNumber: 1, + action, + reasoning: `${stage}: direct action`, + result: "OK", + }); + } catch (err) { + console.error(`[Pipeline] Failed to persist session: ${err}`); + } + return sessionId; +} + +// ─── Main Pipeline ─────────────────────────────────────────── + +export async function runPipeline( + options: PipelineOptions +): Promise { + const { + deviceId, + persistentDeviceId, + userId, + goal, + llmConfig, + maxSteps, + signal, + onStep, + onComplete, + } = options; + + // ── Load device capabilities ───────────────────────────── + const apps = persistentDeviceId + ? await fetchInstalledApps(persistentDeviceId) + : []; + const caps = buildCapabilities(apps); + + // ── Stage 1: Deterministic Parser ──────────────────────── + const parseResult = parseGoal(goal, caps); + console.log(`[Pipeline] Stage 1 (Parser): ${parseResult.type}`); + + if (parseResult.type !== "passthrough") { + if (parseResult.type === "done") { + const sessionId = persistentDeviceId + ? await persistQuickSession(userId, persistentDeviceId, goal, "parser", { done: true, reason: parseResult.reason }) + : crypto.randomUUID(); + onComplete?.({ success: true, stepsUsed: 0, sessionId }); + return { success: true, stepsUsed: 0, sessionId, resolvedBy: "parser" }; + } + + const execResult = await executeResult(deviceId, parseResult); + if (execResult.success) { + await new Promise((r) => setTimeout(r, 1500)); + + const sessionId = persistentDeviceId + ? await persistQuickSession(userId, persistentDeviceId, goal, "parser", parseResult as unknown as Record) + : crypto.randomUUID(); + + onStep?.({ + stepNumber: 1, + action: { action: parseResult.type, reason: `Parser: ${parseResult.type}` } as any, + reasoning: `Parser: direct ${parseResult.type} action`, + screenHash: "", + }); + + sessions.notifyDashboard(userId, { + type: "goal_completed", + sessionId, + success: true, + stepsUsed: 1, + }); + + onComplete?.({ success: true, stepsUsed: 1, sessionId }); + console.log(`[Pipeline] Goal resolved by parser: ${goal}`); + return { success: true, stepsUsed: 1, sessionId, resolvedBy: "parser" }; + } + + console.warn(`[Pipeline] Parser action failed: ${execResult.error}. Falling through to classifier.`); + } + + // ── Stage 2: LLM Classifier ────────────────────────────── + const classResult = await classifyGoal(goal, caps, llmConfig); + console.log(`[Pipeline] Stage 2 (Classifier): ${classResult.type}`); + + if (classResult.type === "done") { + const sessionId = persistentDeviceId + ? await persistQuickSession(userId, persistentDeviceId, goal, "classifier", { done: true, reason: classResult.reason }) + : crypto.randomUUID(); + onComplete?.({ success: true, stepsUsed: 1, sessionId }); + return { success: true, stepsUsed: 1, sessionId, resolvedBy: "classifier" }; + } + + if (classResult.type === "intent") { + const execResult = await executeResult(deviceId, classResult); + if (execResult.success) { + await new Promise((r) => setTimeout(r, 1500)); + + const sessionId = persistentDeviceId + ? await persistQuickSession(userId, persistentDeviceId, goal, "classifier", classResult.intent as unknown as Record) + : crypto.randomUUID(); + + onStep?.({ + stepNumber: 1, + action: { action: "intent", reason: "Classifier: intent" } as any, + reasoning: `Classifier: ${classResult.intent.intentAction}`, + screenHash: "", + }); + + sessions.notifyDashboard(userId, { + type: "goal_completed", + sessionId, + success: true, + stepsUsed: 1, + }); + + onComplete?.({ success: true, stepsUsed: 1, sessionId }); + console.log(`[Pipeline] Goal resolved by classifier (intent): ${goal}`); + return { success: true, stepsUsed: 1, sessionId, resolvedBy: "classifier" }; + } + + console.warn(`[Pipeline] Classifier intent failed: ${execResult.error}. Falling through to UI agent.`); + } + + // ── Stage 3: Lean UI Agent ─────────────────────────────── + let effectiveGoal = goal; + let appToLaunch: string | undefined; + + if (classResult.type === "ui") { + effectiveGoal = classResult.subGoal; + appToLaunch = classResult.app; + } else if (classResult.type === "launch") { + appToLaunch = classResult.packageName; + } + + if (appToLaunch) { + try { + await sessions.sendCommand(deviceId, { + type: "launch", + packageName: appToLaunch, + }); + await new Promise((r) => setTimeout(r, 1500)); + console.log(`[Pipeline] Launched ${appToLaunch} for UI agent`); + } catch (err) { + console.warn(`[Pipeline] Failed to launch ${appToLaunch}: ${err}`); + } + } + + const loopResult = await runAgentLoop({ + deviceId, + persistentDeviceId, + userId, + goal: effectiveGoal, + originalGoal: effectiveGoal !== goal ? goal : undefined, + llmConfig, + maxSteps, + signal, + pipelineMode: true, + onStep, + onComplete, + }); + + return { + ...loopResult, + resolvedBy: "ui_agent", + }; +} diff --git a/server/src/routes/goals.ts b/server/src/routes/goals.ts index cbbf9d4..686fc92 100644 --- a/server/src/routes/goals.ts +++ b/server/src/routes/goals.ts @@ -2,7 +2,7 @@ import { Hono } from "hono"; import { eq } from "drizzle-orm"; import { sessionMiddleware, type AuthEnv } from "../middleware/auth.js"; import { sessions } from "../ws/sessions.js"; -import { runAgentLoop, type AgentLoopOptions } from "../agent/loop.js"; +import { runPipeline, type PipelineOptions } from "../agent/pipeline.js"; import type { LLMConfig } from "../agent/llm.js"; import { db } from "../db.js"; import { llmConfig as llmConfigTable } from "../schema.js"; @@ -84,42 +84,35 @@ goals.post("/", async (c) => { } } - const options: AgentLoopOptions = { + const abort = new AbortController(); + + const pipelineOpts: PipelineOptions = { deviceId: device.deviceId, persistentDeviceId: device.persistentDeviceId, userId: user.id, goal: body.goal, llmConfig: llmCfg, maxSteps: body.maxSteps, + signal: abort.signal, }; - // Create abort controller for this session - const abort = new AbortController(); - options.signal = abort.signal; - - // Start the agent loop in the background (fire-and-forget). - // The client observes progress via the /ws/dashboard WebSocket. - const loopPromise = runAgentLoop(options); - - // Track as active until it completes const sessionPlaceholder = { sessionId: "pending", goal: body.goal, abort }; activeSessions.set(trackingKey, sessionPlaceholder); + const loopPromise = runPipeline(pipelineOpts); + loopPromise .then((result) => { activeSessions.delete(trackingKey); console.log( - `[Agent] Completed on ${device.deviceId}: ${result.success ? "success" : "incomplete"} in ${result.stepsUsed} steps (session ${result.sessionId})` + `[Pipeline] Completed on ${device.deviceId}: ${result.success ? "success" : "incomplete"} in ${result.stepsUsed} steps (resolved by ${result.resolvedBy})` ); }) .catch((err) => { activeSessions.delete(trackingKey); - console.error(`[Agent] Error on ${device.deviceId}: ${err}`); + console.error(`[Pipeline] Error on ${device.deviceId}: ${err}`); }); - // We need the sessionId from the loop, but it's created inside runAgentLoop. - // For immediate response, generate one here and let the dashboard events carry the real one. - // The loop will emit goal_started with its sessionId momentarily. return c.json({ deviceId: body.deviceId, goal: body.goal, diff --git a/server/src/ws/device.ts b/server/src/ws/device.ts index 34e429b..553a4dc 100644 --- a/server/src/ws/device.ts +++ b/server/src/ws/device.ts @@ -3,10 +3,9 @@ import type { DeviceMessage } from "@droidclaw/shared"; import { eq, and } from "drizzle-orm"; import { auth } from "../auth.js"; import { db } from "../db.js"; -import { llmConfig, device, agentSession, agentStep } from "../schema.js"; +import { llmConfig, device } from "../schema.js"; import { sessions, type WebSocketData } from "./sessions.js"; -import { runAgentLoop } from "../agent/loop.js"; -import { preprocessGoal } from "../agent/preprocessor.js"; +import { runPipeline } from "../agent/pipeline.js"; import type { LLMConfig } from "../agent/llm.js"; /** Track running agent sessions to prevent duplicates per device */ @@ -224,77 +223,16 @@ export async function handleDeviceMessage( break; } - // Preprocess: handle simple goals directly, or extract "open X" prefix - let effectiveGoal = goal; - try { - const preResult = await preprocessGoal(deviceId, goal, persistentDeviceId); - if (preResult.handled) { - await new Promise((r) => setTimeout(r, 1500)); - - if (preResult.refinedGoal) { - effectiveGoal = preResult.refinedGoal; - sendToDevice(ws, { - type: "step", - step: 0, - action: preResult.command, - reasoning: "Preprocessor: launched app directly", - }); - } else { - // Pure "open X" — fully handled. Persist to DB then return. - const sessionId = crypto.randomUUID(); - try { - await db.insert(agentSession).values({ - id: sessionId, - userId, - deviceId: persistentDeviceId, - goal, - status: "completed", - stepsUsed: 1, - completedAt: new Date(), - }); - await db.insert(agentStep).values({ - id: crypto.randomUUID(), - sessionId, - stepNumber: 1, - action: preResult.command ?? null, - reasoning: `Preprocessor: direct ${preResult.command?.type} action`, - result: "OK", - }); - } catch (err) { - console.error(`[DB] Failed to save preprocessor session: ${err}`); - } - - sendToDevice(ws, { type: "goal_started", sessionId, goal }); - sendToDevice(ws, { - type: "step", - step: 1, - action: preResult.command, - reasoning: `Preprocessor: direct ${preResult.command?.type} action`, - }); - sendToDevice(ws, { type: "goal_completed", success: true, stepsUsed: 1 }); - - sessions.notifyDashboard(userId, { type: "goal_completed", sessionId, success: true, stepsUsed: 1 }); - - console.log(`[Preprocessor] Goal handled directly: ${goal}`); - break; - } - } - } catch (err) { - console.warn(`[Preprocessor] Error (falling through to LLM): ${err}`); - } - - console.log(`[Agent] Starting goal for device ${deviceId}: ${effectiveGoal}${effectiveGoal !== goal ? ` (original: ${goal})` : ""}`); + console.log(`[Pipeline] Starting goal for device ${deviceId}: ${goal}`); activeSessions.set(deviceId, goal); sendToDevice(ws, { type: "goal_started", sessionId: deviceId, goal }); - // Run agent loop in background (DB persistence happens inside the loop) - runAgentLoop({ + runPipeline({ deviceId, persistentDeviceId, userId, - goal: effectiveGoal, - originalGoal: goal !== effectiveGoal ? goal : undefined, + goal, llmConfig: userLlmConfig, onStep(step) { sendToDevice(ws, { @@ -312,13 +250,13 @@ export async function handleDeviceMessage( stepsUsed: result.stepsUsed, }); console.log( - `[Agent] Completed on ${deviceId}: ${result.success ? "success" : "incomplete"} in ${result.stepsUsed} steps` + `[Pipeline] Completed on ${deviceId}: ${result.success ? "success" : "incomplete"} in ${result.stepsUsed} steps` ); }, }).catch((err) => { activeSessions.delete(deviceId); sendToDevice(ws, { type: "goal_failed", message: String(err) }); - console.error(`[Agent] Error on ${deviceId}:`, err); + console.error(`[Pipeline] Error on ${deviceId}:`, err); }); break; @@ -331,7 +269,7 @@ export async function handleDeviceMessage( case "apps": { const persistentDeviceId = ws.data.persistentDeviceId; if (persistentDeviceId) { - const apps = (msg as unknown as { apps: Array<{ packageName: string; label: string }> }).apps; + const apps = (msg as unknown as { apps: Array<{ packageName: string; label: string; intents?: string[] }> }).apps; // Merge apps into existing deviceInfo db.update(device) .set({