> ## Documentation Index
> Fetch the complete documentation index at: https://docs.pyannote.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Streaming Diarization

> Learn how to get real-time insights into your conversations with our streaming diarization API.

<Callout icon="flask" color="#FFC107" iconType="solid">
  Streaming is currently in **beta**.

  Reach out via [support@pyannote.ai](mailto:support@pyannote.ai) or use the chat icon in the bottom-right corner of this page to share your feedback.
</Callout>

## Introduction

Streaming diarization lets you identify who is speaking in real-time over a WebSocket connection. As you stream audio, the API continuously emits speaker turn events, telling you which speaker started or stopped talking and when.

Use cases include live captioning, real-time meeting assistants, call center monitoring, and any application that needs to attribute speech to speakers without waiting for the full audio to be recorded.

## Auth

All requests to the streaming API require a valid API key. You can generate an API key from your [pyannote.ai dashboard](https://dashboard.pyannote.ai).
Pass your key as a Bearer token in the `Authorization` header when creating a stream session.

## Quickstart

Getting real-time diarization takes three steps:

**1. Create a stream session**

```http theme={null}
POST https://api.pyannote.ai/v1/live
Authorization: Bearer <your_api_key>
Content-Type: application/json

{}
```

Response:

```json theme={null}
{
  "id": "123e4567-e89b-12d3-a456-426614174000",
  "url": "<websocket_url>"
}
```

The response contains a single-use `url`. You can hand this URL directly to your end-user's client, it only grants access to this one stream and carries no team credentials or API key.

**2. Connect to the WebSocket URL**

Open a WebSocket connection to the `url` returned above. The connection authenticates automatically via the token embedded in the URL, no additional headers needed.

<Note>
  Cold-starts may delay the WebSocket connection by a few seconds. Wait for the connection to be fully open before sending audio, the `open` event (or equivalent in your WebSocket client) is your signal that it is safe to start streaming.
</Note>

**3. Stream audio and receive diarization events**

Send raw audio binary frames over the WebSocket at real-time pace every 100 ms.
The server enforces a maximum 5-second buffer; rushing audio ahead of real-time will cause the connection to be closed.
The server will emit JSON diarization events as speakers are detected.

```text theme={null}
→ <binary audio chunk>
→ <binary audio chunk>
← {"type":"diarization_speaker_start","data":{"timestamp":0.42,"speaker":"SPEAKER_00"}}
→ <binary audio chunk>
← {"type":"diarization_speaker_end","data":{"timestamp":1.86,"speaker":"SPEAKER_00"}}
```

## Input events

### audio\_chunk

Send audio as raw binary WebSocket frames. The audio must meet these requirements:

| Property       | Value                                        |
| -------------- | -------------------------------------------- |
| Format         | PCM float 32-bit little-endian (`pcm_f32le`) |
| Sample rate    | 16 kHz                                       |
| Channels       | Mono                                         |
| Chunk duration | 100 ms                                       |

<Warning>
  Send **raw PCM bytes only** — do not include any file headers (e.g. WAV/RIFF headers). The server expects a continuous stream of audio samples with no container or metadata.
</Warning>

The API tracks up to **8 speakers** simultaneously. In case the stream involves more speakers, multiple speakers will end up being merged into one.

### end\_of\_stream

When you have no more audio to send, signal the end of the stream by sending a JSON text frame:

```json theme={null}
{"type": "end_of_stream"}
```

Sending this message is optional, but recommended. It tells the server that no
more audio frames will be sent, allowing it to finalize diarization and emit
any remaining events without waiting for a timeout.
The server will then close the connection with close code 1000: normal closure.
**Do not send further audio frames after `end_of_stream`.**
Using this message is recommended over abruptly closing the socket, which may cause final outputs to be lost.

## Output events

The server emits JSON text frames with the following event types:

### `diarization_speaker_start`

Emitted when a speaker begins a turn.

```json theme={null}
{
  "type": "diarization_speaker_start",
  "data": {
    "timestamp": 1.24,
    "speaker": "SPEAKER_00"
  }
}
```

### `diarization_speaker_end`

Emitted when a speaker's turn ends.

```json theme={null}
{
  "type": "diarization_speaker_end",
  "data": {
    "timestamp": 3.86,
    "speaker": "SPEAKER_00"
  }
}
```

`timestamp` is in seconds, relative to the start of the stream. `speaker` is a stable string label for the duration of the session.

### `error`

Emitted when the server encounters a problem processing a frame (e.g. wrong chunk size).

```json theme={null}
{
  "type": "error",
  "message": "Invalid chunk size"
}
```

## Pricing

Streaming is **free** during the beta period.

## Limits

| Limit                               | Value      |
| ----------------------------------- | ---------- |
| Concurrent running streams per team | 10 streams |
| Idle timeout (no audio received)    | 5 seconds  |
| Maximum stream duration per stream  | 5 hours    |

## Example: Streaming microphone

<CodeGroup>
  ```python Python theme={null}
  # /// script
  # requires-python = ">=3.10"
  # dependencies = [
  #     "pyaudio",
  #     "requests",
  #     "websocket-client",
  # ]
  # ///
  """
  pyannote.ai streaming diarization from microphone

  Usage:
      API_KEY=sk_xxx uv run main.py
  """

  import json
  import os
  import signal
  import struct
  import threading

  import pyaudio
  import requests
  import websocket

  API_KEY = os.environ.get("API_KEY", "sk_xxx")

  SAMPLE_RATE = 16_000
  CHUNK_DURATION_MS = 100
  CHUNK_SIZE = (SAMPLE_RATE * CHUNK_DURATION_MS) // 1000  # 1600 samples

  # ANSI colors assigned to speakers in order of first appearance
  _ANSI_COLORS = [
      "\033[32m",
      "\033[33m",
      "\033[34m",
      "\033[35m",
      "\033[36m",
      "\033[31m",
      "\033[37m",
      "\033[93m",
  ]
  _RESET = "\033[0m"
  _speaker_colors: dict[str, str] = {}


  def speaker_color(speaker: str) -> str:
      if speaker not in _speaker_colors:
          _speaker_colors[speaker] = _ANSI_COLORS[
              len(_speaker_colors) % len(_ANSI_COLORS)
          ]
      return _speaker_colors[speaker]


  def on_message(ws_app, message):
      msg = json.loads(message)
      t = msg.get("type")
      if t in ("diarization_speaker_start", "diarization_speaker_end"):
          speaker = msg["data"]["speaker"]
          ts = msg["data"]["timestamp"]
          color = speaker_color(speaker)
          label = "start" if t == "diarization_speaker_start" else "end  "
          print(f"{color}[{label}] {speaker} @ {ts:.2f}s{_RESET}")


  def on_open(ws_app):
      print("Connected. Streaming... (press Ctrl+C to stop)")
      pa = pyaudio.PyAudio()
      stream = pa.open(
          rate=SAMPLE_RATE,
          channels=1,
          format=pyaudio.paInt32,
          input=True,
          frames_per_buffer=CHUNK_SIZE,
      )

      def audio_thread():
          try:
              while ws_app.keep_running:
                  pcm_i32 = stream.read(CHUNK_SIZE, exception_on_overflow=False)
                  # convert int32 → float32 (pcm_f32le) for the API
                  samples_i32 = struct.unpack(f"{CHUNK_SIZE}i", pcm_i32)
                  pcm_f32 = struct.pack(
                      f"{CHUNK_SIZE}f",
                      *(s / 2147483648.0 for s in samples_i32),
                  )
                  ws_app.send_bytes(pcm_f32)
          except Exception as exc:
              print(f"Audio error: {exc}")
          finally:
              stream.stop_stream()
              stream.close()
              pa.terminate()

      t = threading.Thread(target=audio_thread, daemon=True)
      t.start()


  def main():
      print("Creating stream session...")
      response = requests.post(
          "https://api.pyannote.ai/v1/live",
          headers={
              "Authorization": f"Bearer {API_KEY}",
              "Content-Type": "application/json",
          },
      )
      response.raise_for_status()
      url = response.json()["url"]

      print("Connecting WebSocket...")
      ws_app = websocket.WebSocketApp(
          url,
          on_open=on_open,
          on_message=on_message,
      )

      def handle_sigint(sig, frame):
          print("\nSending end_of_stream...")
          if ws_app.sock and ws_app.sock.connected:
              ws_app.send(json.dumps({"type": "end_of_stream"}))
          else:
              ws_app.close()

      signal.signal(signal.SIGINT, handle_sigint)
      ws_app.run_forever()


  if __name__ == "__main__":
      main()
  ```
</CodeGroup>
