Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.pyannote.ai/llms.txt

Use this file to discover all available pages before exploring further.

Streaming is currently in beta.To request access, fill in the form at pyannoteai.typeform.com/streaming-beta.If creating a stream session returns a 403 Streaming is disabled error, your team does not yet have access. Use the form above to request it.

Introduction

Streaming diarization lets you identify who is speaking in real-time over a WebSocket connection. As you stream audio, the API continuously emits speaker turn events, telling you which speaker started or stopped talking and when. Use cases include live captioning, real-time meeting assistants, call center monitoring, and any application that needs to attribute speech to speakers without waiting for the full audio to be recorded.

Auth

All requests to the streaming API require a valid API key. You can generate an API key from your pyannote.ai dashboard. Pass your key as a Bearer token in the Authorization header when creating a stream session.

Quickstart

Getting real-time diarization takes three steps: 1. Create a stream session
POST https://api.pyannote.ai/v1/live
Authorization: Bearer <your_api_key>
Content-Type: application/json

{}
Response:
{
  "id": "123e4567-e89b-12d3-a456-426614174000",
  "url": "<websocket_url>"
}
The response contains a single-use url. You can hand this URL directly to your end-user’s client, it only grants access to this one stream and carries no team credentials or API key. 2. Connect to the WebSocket URL Open a WebSocket connection to the url returned above. The connection authenticates automatically via the token embedded in the URL, no additional headers needed.
Cold-starts may delay the WebSocket connection by a few seconds. Wait for the connection to be fully open before sending audio, the open event (or equivalent in your WebSocket client) is your signal that it is safe to start streaming.
3. Stream audio and receive diarization events Send raw audio binary frames over the WebSocket at real-time pace every 100 ms. The server enforces a maximum 5-second buffer; rushing audio ahead of real-time will cause the connection to be closed. The server will emit JSON diarization events as speakers are detected.
→ <binary audio chunk>
→ <binary audio chunk>
← {"type":"diarization_speaker_start","data":{"timestamp":0.42,"speaker":"SPEAKER_00"}}
→ <binary audio chunk>
← {"type":"diarization_speaker_end","data":{"timestamp":1.86,"speaker":"SPEAKER_00"}}

Input events

audio_chunk

Send audio as raw binary WebSocket frames. The audio must meet these requirements:
PropertyValue
FormatPCM float 32-bit little-endian (pcm_f32le)
Sample rate16 kHz
ChannelsMono
Chunk duration100 ms
Send raw PCM bytes only — do not include any file headers (e.g. WAV/RIFF headers). The server expects a continuous stream of audio samples with no container or metadata.
The API tracks up to 8 speakers simultaneously. In case the stream involves more speakers, multiple speakers will end up being merged into one.

end_of_stream

When you have no more audio to send, signal the end of the stream by sending a JSON text frame:
{"type": "end_of_stream"}
Sending this message is optional, but recommended. It tells the server that no more audio frames will be sent, allowing it to finalize diarization and emit any remaining events without waiting for a timeout. The server will then close the connection with close code 1000: normal closure. Do not send further audio frames after end_of_stream. Using this message is recommended over abruptly closing the socket, which may cause final outputs to be lost.

Output events

The server emits JSON text frames with the following event types:

diarization_speaker_start

Emitted when a speaker begins a turn.
{
  "type": "diarization_speaker_start",
  "data": {
    "timestamp": 1.24,
    "speaker": "SPEAKER_00"
  }
}

diarization_speaker_end

Emitted when a speaker’s turn ends.
{
  "type": "diarization_speaker_end",
  "data": {
    "timestamp": 3.86,
    "speaker": "SPEAKER_00"
  }
}
timestamp is in seconds, relative to the start of the stream. speaker is a stable string label for the duration of the session.

error

Emitted when the server encounters a problem processing a frame (e.g. wrong chunk size).
{
  "type": "error",
  "message": "Invalid chunk size"
}

Pricing

Streaming is free during the beta period.

Limits

LimitValue
Concurrent running streams per team10 streams
Idle timeout (no audio received)5 seconds
Maximum stream duration per stream5 hours

Example: Streaming microphone

# /// script
# requires-python = ">=3.10"
# dependencies = [
#     "pyaudio",
#     "requests",
#     "websocket-client",
# ]
# ///
"""
pyannote.ai streaming diarization from microphone

Usage:
    API_KEY=sk_xxx uv run main.py
"""

import json
import os
import signal
import struct
import threading

import pyaudio
import requests
import websocket

API_KEY = os.environ.get("API_KEY", "sk_xxx")

SAMPLE_RATE = 16_000
CHUNK_DURATION_MS = 100
CHUNK_SIZE = (SAMPLE_RATE * CHUNK_DURATION_MS) // 1000  # 1600 samples

# ANSI colors assigned to speakers in order of first appearance
_ANSI_COLORS = [
    "\033[32m",
    "\033[33m",
    "\033[34m",
    "\033[35m",
    "\033[36m",
    "\033[31m",
    "\033[37m",
    "\033[93m",
]
_RESET = "\033[0m"
_speaker_colors: dict[str, str] = {}


def speaker_color(speaker: str) -> str:
    if speaker not in _speaker_colors:
        _speaker_colors[speaker] = _ANSI_COLORS[
            len(_speaker_colors) % len(_ANSI_COLORS)
        ]
    return _speaker_colors[speaker]


def on_message(ws_app, message):
    msg = json.loads(message)
    t = msg.get("type")
    if t in ("diarization_speaker_start", "diarization_speaker_end"):
        speaker = msg["data"]["speaker"]
        ts = msg["data"]["timestamp"]
        color = speaker_color(speaker)
        label = "start" if t == "diarization_speaker_start" else "end  "
        print(f"{color}[{label}] {speaker} @ {ts:.2f}s{_RESET}")


def on_open(ws_app):
    print("Connected. Streaming... (press Ctrl+C to stop)")
    pa = pyaudio.PyAudio()
    stream = pa.open(
        rate=SAMPLE_RATE,
        channels=1,
        format=pyaudio.paInt32,
        input=True,
        frames_per_buffer=CHUNK_SIZE,
    )

    def audio_thread():
        try:
            while ws_app.keep_running:
                pcm_i32 = stream.read(CHUNK_SIZE, exception_on_overflow=False)
                # convert int32 → float32 (pcm_f32le) for the API
                samples_i32 = struct.unpack(f"{CHUNK_SIZE}i", pcm_i32)
                pcm_f32 = struct.pack(
                    f"{CHUNK_SIZE}f",
                    *(s / 2147483648.0 for s in samples_i32),
                )
                ws_app.send_bytes(pcm_f32)
        except Exception as exc:
            print(f"Audio error: {exc}")
        finally:
            stream.stop_stream()
            stream.close()
            pa.terminate()

    t = threading.Thread(target=audio_thread, daemon=True)
    t.start()


def main():
    print("Creating stream session...")
    response = requests.post(
        "https://api.pyannote.ai/v1/live",
        headers={
            "Authorization": f"Bearer {API_KEY}",
            "Content-Type": "application/json",
        },
    )
    response.raise_for_status()
    url = response.json()["url"]

    print("Connecting WebSocket...")
    ws_app = websocket.WebSocketApp(
        url,
        on_open=on_open,
        on_message=on_message,
    )

    def handle_sigint(sig, frame):
        print("\nSending end_of_stream...")
        if ws_app.sock and ws_app.sock.connected:
            ws_app.send(json.dumps({"type": "end_of_stream"}))
        else:
            ws_app.close()

    signal.signal(signal.SIGINT, handle_sigint)
    ws_app.run_forever()


if __name__ == "__main__":
    main()

Feedback

Streaming is in beta and your feedback helps us improve it. Reach out via support@pyannote.ai or use the chat icon in the bottom-right corner of this page.