Synapse Execution Pipeline — Integration Design Document
Arquitecto: Pipeline Architecture Specialist
Fecha: 2026-04-01
Estado: Implementación
Módulo base:functions/api/utils/action-pipeline.js
Target:functions/api/synapse/[[path]].js→executeStepCore()
1. Resumen Ejecutivo
El enorme monolito de Synapse ([[path]].js — 8,187 líneas) contiene toda la lógica de ejecución
de agentes embebida en funciones imperativas sin separación de concerns. La función executeStepCore()
es el punto de convergencia donde TODO agente ejecuta su trabajo (tanto la ruta /execute inicial
como la cadena /continue).
Objetivo: Envolver executeStepCore() con el Execution Pipeline (action-pipeline.js) para ganar:
- Validación estructurada de inputs antes de ejecutar
- Pre-hooks para web search, data context, y agent state checks
- Sistema de permisos (nivel/energía/concurrencia del agente)
- Post-hooks para TTI/TTS post-processing y telemetría
- Clasificación de errores estandarizada
- Métricas por paso (timing de cada fase)
- Base para futura decomposición del monolito
2. Arquitectura Actual vs. Pipeline
2.1 Flujo Actual (executeStepCore — monolítico)
executeStepCore(params) {
1. Detectar capability (web/tti/tts/vision) ← VALIDATION
2. Web search (if needsWeb) ← PRE-PROCESSING
3. Load data context (if data_access_level >= 1) ← PRE-PROCESSING
4. Build collaboration prompt ← PRE-PROCESSING
5. Resolve AI provider (tier-based) ← CONFIGURATION
6. AI call (generateContent/generateVision) ← EXECUTION
7. Web result wrapping ← FORMAT
8. TTI post-processing (parse → generate → store) ← POST-PROCESSING
9. TTS post-processing (extract → generate → store)← POST-PROCESSING
return { outputContent, outputType }
}
Problemas:
- 0 validación de inputs (agent puede ser null, planStep puede faltar campos)
- Web search/data context son side-effects sin aislamiento de errores
- TTI/TTS post-processing son 80+ líneas de parsing frágil incrustado
- No hay métricas (no sabemos cuánto tarda cada fase)
- Los errores se propagan sin clasificar al caller
2.2 Flujo Pipeline (nuevo)
synapsePipeline.run('executeStep', input, context)
│
├── VALIDATE ─────────── Verificar agent, planStep, task existen
│ Verificar capability es válida
│ Verificar agent tiene la capability requerida
│
├── PRE-HOOKS ────────── hook:agentGuard → energía >= 20, no busy
│ hook:webSearch → pre-fetch si needsWeb
│ hook:dataContext → load org data si nivel >= 1
│ hook:promptBuilder → construir prompt enriquecido
│
├── PERMISSION ───────── Verificar agente puede ejecutar este tipo de step
│ L1-L2: execution only
│ L3+: any step type
│
├── EXECUTE ──────────── AI call (generateContent/generateVision)
│ Con timeout configurable por capability
│
├── FORMAT ───────────── Parse JSON response
│ Web wrapping (fuentes_web)
│ Normalize output structure
│
├── POST-HOOKS ───────── hook:ttiProcessor → generate images → store R2
│ hook:ttsProcessor → generate audio → store R2
│ hook:telemetry → log metrics, classify errors
│ hook:eventLogger → emit collaboration events
│
├── PERSIST ──────────── Update synapse_task_steps in D1
│
└── RETURN { outputContent, outputType, metrics }
3. Diseño Detallado
3.1 Pipeline Actions
Se registra UNA acción principal (executeStep) que encapsula el AI call core.
Los pre/post-hooks manejan todo el enriquecimiento y post-procesamiento.
createAction({
name: 'executeStep',
inputSchema: {
required: ['task', 'stepAgent', 'planStep'],
types: {
task: 'object',
stepAgent: 'object',
planStep: 'object',
}
},
validate: (input) => {
// Agent debe tener ID
// PlanStep debe tener step_type y role_in_task
// Task debe tener title y organization_id
},
execute: async (input, ctx) => {
// SOLO el AI call — limpio, aislado
// Todo lo demás está en hooks
},
format: (result) => {
// Parse JSON, web wrapping, normalize
},
persist: async (result, ctx) => {
// Update D1: synapse_task_steps
},
})
3.2 Pre-Hooks
| Hook | Matcher | Timeout | Acción |
|---|---|---|---|
agentGuard |
always | 1s | Verifica energía >= 20, agent no en break, no busy con otra task |
webSearch |
planStep.required_capability === 'web' |
8s | Pre-fetch web results, attach a ctx.webContext |
dataContext |
stepAgent.data_access_level >= 1 |
5s | Load org data context, attach a ctx.dataContext |
promptBuilder |
always | 2s | Build the full prompt from task + agent + data + web, attach a ctx.prompt |
3.3 Post-Hooks
| Hook | Matcher | Timeout | Acción |
|---|---|---|---|
ttiProcessor |
planStep.required_capability === 'tti' |
60s | Parse image prompts, generate images secuencialmente, store R2 |
ttsProcessor |
planStep.required_capability === 'tts' |
30s | Extract TTS text, generate audio, store R2 |
telemetry |
always | 2s | Log step metrics (timing, tokens, capability, agent level) |
eventLogger |
orchestration.collaboration_needed |
2s | Emit collaboration_step event to D1 |
3.4 Permission Model
permissions: {
// Agent level checks
checkPermission: (input, ctx) => {
const agent = input.stepAgent;
const planStep = input.planStep;
// Energía mínima
if ((agent.energy_level || 100) < 20) {
return { allowed: false, behavior: 'deny', reason: 'Agent energy too low' };
}
// Director review solo para L3+
if (['director_review', 'compilation'].includes(planStep.step_type)) {
if ((agent.level || 1) < 3) {
return { allowed: false, behavior: 'ask', reason: 'Requires L3+ agent' };
}
}
return { allowed: true, behavior: 'allow' };
}
}
4. Estrategia de Integración
Principio: Wrap, don't rewrite
No reescribimos executeStepCore(). La envolvemos.
ANTES:
handleContinueTask → executeStepCore(params) → raw result
DESPUÉS:
handleContinueTask → synapsePipeline.run('executeStep', input, ctx)
└── pre-hooks (web, data, prompt)
└── execute: lleva dentro la lógica de executeStepCore
└── post-hooks (tti, tts, telemetry)
└── persist: D1 update
Compatibilidad
- La firma de
executeStepCoreNO cambia handleContinueTaskyhandleRetryStepusan el pipeline como wrapper- Si el pipeline falla internamente, fallback al comportamiento original
- Las métricas son additive (no reemplazan logging existente)
Archivos afectados
| Archivo | Cambio |
|---|---|
functions/api/synapse/synapse-pipeline.js |
NUEVO — Define pipeline, actions, hooks |
functions/api/synapse/[[path]].js |
MODIFICADO — Import pipeline, wrap executeStepCore calls |
functions/api/utils/action-pipeline.js |
Sin cambios (módulo base) |
5. Métricas que ganamos
Con el pipeline integrado, cada step execution reporta:
{
"metrics": {
"totalMs": 4521,
"steps": {
"validate": 2,
"preHook": 1205,
"permission": 1,
"execute": 2850,
"format": 15,
"persist": 120,
"postHook": 328
}
}
}
Esto alimenta directamente el context-analysis.js para tracking de costos/waste.
6. Error Classification
El pipeline clasifica automáticamente cada error que ocurre:
| Tipo | Ejemplo | Recovery |
|---|---|---|
d1_error |
D1 timeout, constraint violation | Auto-retry via /continue |
timeout |
AI provider timeout (25s) | Step marked timed_out, auto-retry |
network_error |
Fetch failed to AI provider | Auto-retry con backoff |
auth_unauthorized |
Invalid API key | Fail task, notify admin |
validation_missing_field |
Agent missing capability | Skip step or assign different agent |
rate_limited |
AI provider rate limit | Delay + retry |
7. Future Decomposition Path
Con el pipeline como intermediario, el monolito se puede descomponer gradualmente:
Phase 1 (NOW): Pipeline wraps executeStepCore
Phase 2 (NEXT): Extract TTI/TTS into separate pipeline actions
Phase 3: Extract web search into standalone service
Phase 4: Each capability becomes its own registered action
Phase 5: Full tool-per-file architecture (like Claude Code)
El pipeline es el caballo de Troya para descomponer el monolito sin romper producción.
8. Diagrama de Integración
┌──────────────────────────────────────────────────────────────┐
│ handleContinueTask() │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ synapsePipeline.run('executeStep') │ │
│ │ ┌──────────┬──────────┬──────────┬────────┬───────┐ │ │
│ │ │ VALIDATE │PRE-HOOKS │PERMISSION│EXECUTE │FORMAT │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ •agent? │•webSearch│•energy>20│•genCont│•parse │ │ │
│ │ │ •task? │•dataCxt │•level ok │•genVis │•wrap │ │ │
│ │ │ •step? │•prompt │•not busy │ │•norm │ │ │
│ │ └──────────┴──────────┴──────────┴────────┴───────┘ │ │
│ │ ┌──────────┬──────────┬──────────┐ │ │
│ │ │POST-HOOKS│ PERSIST │ METRICS │ │ │
│ │ │ │ │ │ │ │
│ │ │•ttiProc │•D1 update│•timing │ │ │
│ │ │•ttsProc │•step row │•classify │ │ │
│ │ │•telemetry│ │•tokens │ │ │
│ │ │•events │ │ │ │ │
│ │ └──────────┴──────────┴──────────┘ │ │
│ └────────────────────────────────────────────────────────┘ │
│ │
│ CEO Inter-Step Review (unchanged — runs after pipeline) │
│ Chain to /continue (unchanged) │
└──────────────────────────────────────────────────────────────┘