# Dudoxx Omni — STT Integration Skill

> Self-contained guide for integrating the Dudoxx streaming Speech-to-Text service from Next.js 16, NestJS 11, and Python 3.12. Drop this file into any LLM context (`.claude/`, `AGENTS.md`, project docs) — it stands alone.

**Service**: `ddx-cuda-live-stt` (port `4600`) and `ddx-mlx-live-stt` (port `4100`).
**Public**: `https://stt.forge.dudoxx.com`
**Wire format**: Deepgram-shaped envelope, frozen v1.0.0.
**Routes**: `/v1/listen` (legacy) · `/v1/listen/dg` (Deepgram-compatible, recommended).

---

## TL;DR

- **Browser**: never connect directly to `:4600`. Open WSS to a Next.js / NestJS / NGINX edge; the edge proxies to `ws://127.0.0.1:4600/v1/listen/dg`.
- **SSR / RSC / Python agents**: connect directly via loopback.
- **Auth**: `Authorization: Token <key>` header OR `Sec-WebSocket-Protocol: token, <key>`. Strip-and-replace user keys at the edge with a service-only secret.
- **Audio**: 16 kHz mono int16 PCM (`linear16`). Normalizer accepts `mulaw`, `opus`, `mp3`, `aac`, `flac`, `webm`, `ogg-opus` for full-frame inputs.
- **Frames out**: `Metadata` (open) → N × `Results` (partial+final) → `UtteranceEnd` → `Metadata` (final, on CloseStream).

---

## Why proxy

1. Browsers can't hold service secrets — direct WS exposes the token in `Authorization`.
2. TLS termination lives at the edge — browsers refuse `ws://` from `https://` pages.
3. Per-user authn happens at the proxy — validate session, then inject the service token.

```
browser ──wss://app.example.com/api/stt/listen──▶ Next.js / NestJS / NGINX
                                                          │ validate session
                                                          │ inject service token
                                              ws://127.0.0.1:4600/v1/listen/dg
```

---

## Next.js 16 — browser client (`app/lib/stt-client.ts`)

```ts
'use client';

export type SttFrame =
  | { type: 'Metadata'; transaction_key: string; request_id: string; sha256: string; created: string; duration: number; channels: number; models: string[] }
  | { type: 'Results'; channel: { alternatives: { transcript: string; confidence: number; words: Array<{ word: string; start: number; end: number; confidence: number; punctuated_word: string; speaker: number }> }[] }; is_final: boolean; speech_final: boolean; from_finalize: boolean; start: number; duration: number; metadata: { request_id: string } }
  | { type: 'SpeechStarted'; channel: number[]; timestamp: number }
  | { type: 'UtteranceEnd'; channel: number[]; last_word_end: number };

export interface SttHandle {
  sendPcm: (pcm: Int16Array) => void;
  close: () => void;
}

export async function openSttSocket(opts: {
  url: string; // wss://app.example.com/api/stt/listen
  language: 'en' | 'fr' | 'de' | 'it';
  onFrame: (frame: SttFrame) => void;
}): Promise<SttHandle> {
  const params = new URLSearchParams({
    model: 'nova-2',
    language: opts.language,
    encoding: 'linear16',
    sample_rate: '16000',
    channels: '1',
    interim_results: 'true',
    punctuate: 'true',
    words: 'true',
  });
  const ws = new WebSocket(`${opts.url}?${params.toString()}`);
  ws.binaryType = 'arraybuffer';
  await new Promise<void>((resolve, reject) => {
    ws.addEventListener('open', () => resolve(), { once: true });
    ws.addEventListener('error', (e) => reject(e), { once: true });
  });
  ws.addEventListener('message', (ev) => {
    if (typeof ev.data !== 'string') return;
    try { opts.onFrame(JSON.parse(ev.data) as SttFrame); } catch { /* ignore */ }
  });
  const keepalive = setInterval(() => {
    if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify({ type: 'KeepAlive' }));
  }, 5000);
  return {
    sendPcm: (pcm) => { if (ws.readyState === WebSocket.OPEN) ws.send(pcm.buffer); },
    close: () => {
      clearInterval(keepalive);
      if (ws.readyState === WebSocket.OPEN) {
        ws.send(JSON.stringify({ type: 'CloseStream' }));
        ws.close(1000);
      }
    },
  };
}
```

---

## Next.js 16 — edge WSS proxy (`server.mjs`)

Next.js's built-in dev server doesn't proxy WebSockets out of the box; use a tiny Node server that wraps Next + a `ws` upgrade handler.

```js
import { createServer } from 'node:http';
import { WebSocketServer, WebSocket } from 'ws';
import next from 'next';
import { parse } from 'node:url';

const dev = process.env.NODE_ENV !== 'production';
const app = next({ dev });
await app.prepare();
const handle = app.getRequestHandler();
const server = createServer((req, res) => handle(req, res, parse(req.url, true)));
const wss = new WebSocketServer({ noServer: true });

server.on('upgrade', async (req, sock, head) => {
  const { pathname, search } = parse(req.url ?? '/', true);
  if (pathname !== '/api/stt/listen') { sock.destroy(); return; }
  const ok = await validateSession(req.headers.cookie ?? '');
  if (!ok) { sock.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); sock.destroy(); return; }
  wss.handleUpgrade(req, sock, head, (clientWs) => {
    const upstreamUrl = `ws://127.0.0.1:4600/v1/listen/dg${search ?? ''}`;
    const upstream = new WebSocket(upstreamUrl, {
      headers: { Authorization: `Token ${process.env.STT_SERVICE_TOKEN ?? 'dummy'}` },
    });
    upstream.on('open', () => {
      clientWs.on('message', (data) => upstream.send(data));
      upstream.on('message', (data) => clientWs.send(data));
    });
    const close = () => { try { clientWs.close(1000); } catch {}; try { upstream.close(1000); } catch {}; };
    clientWs.on('close', close); upstream.on('close', close);
    clientWs.on('error', close); upstream.on('error', close);
  });
});

server.listen(Number(process.env.PORT ?? 3000));

async function validateSession(cookieHeader) { return cookieHeader.includes('session='); }
```

---

## NestJS 11 — gateway proxy

```ts
import { OnGatewayConnection, OnGatewayDisconnect, WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server, WebSocket } from 'ws';
import { IncomingMessage } from 'node:http';

@WebSocketGateway({ path: '/api/stt/listen', cors: { origin: true } })
export class SttProxyGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer() server!: Server;
  private bridges = new WeakMap<WebSocket, WebSocket>();

  async handleConnection(client: WebSocket, req: IncomingMessage): Promise<void> {
    if (!(await this.validateSession(req))) { client.close(1008, 'auth-missing'); return; }
    const search = req.url?.split('?')[1] ?? '';
    const upstream = new WebSocket(`ws://127.0.0.1:4600/v1/listen/dg?${search}`, {
      headers: { Authorization: `Token ${process.env.STT_SERVICE_TOKEN ?? 'dummy'}` },
    });
    this.bridges.set(client, upstream);
    upstream.on('open', () => {
      client.on('message', (data) => upstream.send(data));
      upstream.on('message', (data) => client.send(data));
    });
    const close = () => { try { client.close(1000); } catch {}; try { upstream.close(1000); } catch {}; };
    client.on('close', close); upstream.on('close', close);
    client.on('error', close); upstream.on('error', close);
  }
  handleDisconnect(client: WebSocket): void {
    this.bridges.get(client)?.close(1000); this.bridges.delete(client);
  }
  private async validateSession(req: IncomingMessage): Promise<boolean> {
    return Boolean(req.headers.cookie?.includes('session='));
  }
}

// main.ts — register WsAdapter
import { WsAdapter } from '@nestjs/platform-ws';
const app = await NestFactory.create(AppModule);
app.useWebSocketAdapter(new WsAdapter(app));
```

---

## Python 3.12 — async client (`websockets>=13`)

```python
from __future__ import annotations
import asyncio, json
from pathlib import Path
import websockets

STT_URL = "ws://127.0.0.1:4600/v1/listen/dg"

async def transcribe_file(path: Path, language: str = "en") -> str:
    pcm = path.read_bytes()  # 16k mono int16 headerless
    qs = (
        "?model=nova-2"
        f"&language={language}"
        "&encoding=linear16&sample_rate=16000&channels=1"
        "&punctuate=true&words=true"
    )
    finals: list[str] = []
    async with websockets.connect(
        STT_URL + qs,
        max_size=2**24,
        additional_headers=[("Authorization", "Token dummy")],
    ) as ws:
        async def consumer() -> None:
            async for msg in ws:
                if not isinstance(msg, str):
                    continue
                frame = json.loads(msg)
                if frame.get("type") == "Results" and frame.get("is_final"):
                    finals.append(frame["channel"]["alternatives"][0]["transcript"])

        consumer_task = asyncio.create_task(consumer())
        for i in range(0, len(pcm), 2048):  # ~64 ms frames
            await ws.send(pcm[i:i + 2048])
        await ws.send(json.dumps({"type": "CloseStream"}))
        try:
            await asyncio.wait_for(consumer_task, timeout=30)
        except TimeoutError:
            consumer_task.cancel()
    return " ".join(finals).strip()
```

---

## NGINX — production reverse proxy

```nginx
server {
  listen 443 ssl http2;
  server_name app.example.com;
  ssl_certificate     /etc/letsencrypt/live/app.example.com/fullchain.pem;
  ssl_certificate_key /etc/letsencrypt/live/app.example.com/privkey.pem;

  location /api/stt/listen {
    auth_request /api/_auth_check;
    proxy_pass http://127.0.0.1:4600/v1/listen/dg;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
    proxy_set_header Host $host;
    proxy_set_header Authorization "Token $stt_service_token";
    proxy_read_timeout 3600s;
    proxy_send_timeout 3600s;
  }

  location = /api/_auth_check {
    internal;
    proxy_pass http://127.0.0.1:3000/api/auth/check;
    proxy_pass_request_body off;
    proxy_set_header Content-Length "";
    proxy_set_header X-Original-URI $request_uri;
  }
}
```

`$stt_service_token` is set with `set $stt_service_token "real-key";`. Strip any `Authorization` header from the browser before this block.

---

## Control messages (`/v1/listen/dg`)

| Type | Server response | Use |
|---|---|---|
| `{"type":"KeepAlive"}` | none (resets 10s NET-0001 idle timer) | Send every 5–8s during long pauses |
| `{"type":"Finalize"}` | next `Results` has `is_final=true, from_finalize=true`; session stays open | Force final boundary mid-stream |
| `{"type":"CloseStream"}` | final `Results` → final `Metadata` → close 1000 | Clean shutdown |

Unknown control type → close 1008 `DATA-0000`. JSON parse error → close 1008 `DATA-0000`. 10s no audio + no control → close 1011 `NET-0001`.

---

## Failure modes

| Symptom | Cause | Fix |
|---|---|---|
| HTTP 403 on WS handshake | missing `Authorization: Token …` and no `Sec-WebSocket-Protocol` | Add either; both empty → server pre-accept close (Invariant 7) |
| Close 1003 `unsupported-data` | encoding the normalizer can't decode | Use `linear16` 16 kHz mono OR add encoding to `audio_normalize.py` |
| Close 1008 `channels-too-many` / `sample-rate-too-high` | exceeds `DDX_STT_AUDIO_MAX_*` caps | Resample / downmix client-side, or raise env caps |
| Close 1011 `NET-0001` | 10s without audio AND without KeepAlive | Send `{"type":"KeepAlive"}` every 5s |
| `Cannot unfreeze partially…` server warnings | NeMo `model.transcribe()` race; cosmetic | No client action; finalize-only `_final_lock` keeps finals correct |
| Long broadcasts emit only partials | No natural utterance gap → VAD never fires `speech_end` | Send `{"type":"Finalize"}` on a 30s timer to chunk transcripts |

---

## Reference

- Service docs: `ddx-cuda-live-stt/STT_API_USAGE.md`, `STT_FULL_CAPABILITIES.md`, `STT_API_ENDPOINTS.md`
- Server invariants: `plans/stt-deepgram-compat/_invariants.md`
- Frozen wire format: `ddx-prd-specs/envelopes/README.md`, schemas `asr-frame.schema.json`, `control-messages.schema.json`
- Bench tool: `tools/loopback_bench.py` (`--concurrent`, `--n`, `--report`, `--broadcast`, `--encoding`, `--duration`)
- SDK smoke: `tools/dg_sdk_smoke.py [--no-auth] [--probe-control] [--corpus-wav PATH]`
