Skip to main content

apps/gateway/src/services/event-pipeline.ts

Metadata

Indexed Symbols

  • persistTelemetryBatch (line 12, function) - Implements persist telemetry batch for service-layer operations.
  • reservePendingOutboxEvents (line 41, function) - Implements reserve pending outbox events for service-layer operations.
  • markOutboxEventStatus (line 79, function) - Implements mark outbox event status for service-layer operations.
  • enqueueOutboxEvent (line 96, function) - Implements enqueue outbox event for service-layer operations.
  • listRecentOutboxEvents (line 109, function) - Implements list recent outbox events for service-layer operations.
  • listRecentTelemetryBatches (line 145, function) - Implements list recent telemetry batches for service-layer operations.
  • listFailedOutboxEvents (line 171, function) - Implements list failed outbox events for service-layer operations.
  • retryOutboxEvent (line 200, function) - Implements retry outbox event for service-layer operations.

Markdown Headings (if applicable)

No markdown headings detected.

Source Preview

import { randomUUID } from "node:crypto";
import type { TelemetryEvent } from "@anchor/contracts";
import { query, withTransaction } from "./postgres.js";
import { env } from "../config/env.js";

type RoutedAction = {
topic: string;
agent: string;
reasoner: string;
};

export async function persistTelemetryBatch(
workspaceId: string,
correlationId: string,
events: TelemetryEvent[],
routedActions: RoutedAction[]
): Promise<void> {
await withTransaction(async (client) => {
await client.query(
`insert into telemetry_ingest_batches (workspace_id, correlation_id, event_count, payload)
values ($1::uuid, $2::uuid, $3, $4::jsonb)`,
[workspaceId, correlationId, events.length, JSON.stringify({ events })]
);

for (const action of routedActions) {