Skip to main content
Streaming is currently in beta.Reach out via support@pyannote.ai or use the chat icon in the bottom-right corner of this page to share your feedback.
This tutorial shows how to combine two live streams:
  • pyannoteAI streaming diarization tells you who is speaking and when speaker turns end.
  • OpenAI realtime transcription tells you what was said.
The key step is deciding which speaker label belongs to each transcript segment as both APIs stream events independently.

Prerequisites

  • A pyannoteAI API key from the dashboard
  • An OpenAI API key
  • Python 3.10+
  • Microphone access
Install dependencies:
pip install sounddevice numpy "websockets>=13" "openai[realtime]" python-dotenv requests
Create a .env file:
PYANNOTEAI_API_KEY=sk_xxx
OPENAI_API_KEY=sk-xxx

How the merge works

pyannote emits diarization_speaker_start and diarization_speaker_end events. OpenAI emits transcription deltas and completed transcript segments. The script keeps two pieces of shared state:
  • active: speakers pyannote currently hears. Live transcript deltas are displayed with latest active speaker.
  • pending: speaker labels waiting for completed OpenAI transcript segments. When pyannote emits a speaker end event, script commits OpenAI’s audio buffer and queues that speaker label.
This line is the handoff point between diarization and transcription:
await oai.conn.input_audio_buffer.commit()
It tells OpenAI to finalize audio collected during the pyannote speaker turn. When OpenAI later emits transcription.completed, the script pops from pending and prints completed text with that speaker.
sp = convo.pending.popleft() if convo.pending else convo.speaker

Complete script

Save this as live_diarized_transcription.py:
#!/usr/bin/env python3
import asyncio
import base64
import json
import os
import signal
from collections import deque

import numpy as np
import requests
import sounddevice as sd
import websockets
from dotenv import load_dotenv
from openai import AsyncOpenAI

load_dotenv()
PYANNOTEAI_API_KEY = os.environ["PYANNOTEAI_API_KEY"]
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]

SAMPLE_RATE = 16_000
OPENAI_RATE = 24_000
CHUNK = 1600

_COLORS = ["\033[32m", "\033[33m", "\033[34m", "\033[35m", "\033[36m", "\033[31m", "\033[37m", "\033[93m"]
_RESET = "\033[0m"
_colors: dict[str, str] = {}


def color(speaker: str) -> str:
    _colors.setdefault(speaker, _COLORS[len(_colors) % len(_COLORS)])
    return _colors[speaker]


class Conversation:
    def __init__(self) -> None:
        self.active: list[str] = []
        self.pending: deque[str] = deque()
        self.uncommitted = False

    def started(self, speaker: str) -> None:
        if speaker not in self.active:
            self.active.append(speaker)

    def ended(self, speaker: str) -> None:
        if speaker in self.active:
            self.active.remove(speaker)

    @property
    def speaker(self) -> str | None:
        return self.active[-1] if self.active else None


def upsample(x: np.ndarray) -> np.ndarray:
    n = len(x) * OPENAI_RATE // SAMPLE_RATE
    return np.interp(np.linspace(0, len(x) - 1, n), np.arange(len(x)), x).astype(np.float32)


def create_stream() -> tuple[str, str]:
    r = requests.post(
        "https://api.pyannote.ai/v1/live",
        headers={"Authorization": f"Bearer {PYANNOTEAI_API_KEY}"},
    )
    r.raise_for_status()
    data = r.json()
    return data["id"], data["url"]


async def pyannote_task(ws, convo: Conversation, oai) -> None:
    async for raw in ws:
        msg = json.loads(raw)
        data = msg.get("data", {})
        if msg.get("type") == "diarization_speaker_start":
            convo.started(data["speaker"])
        elif msg.get("type") == "diarization_speaker_end":
            convo.ended(data["speaker"])
            if oai.conn is not None and convo.uncommitted:
                convo.pending.append(data["speaker"])
                convo.uncommitted = False
                await oai.conn.input_audio_buffer.commit()
        elif msg.get("type") == "error":
            print(f"\npyannote error: {msg.get('message')}")


class OpenAI:
    conn = None


async def openai_task(client: AsyncOpenAI, convo: Conversation, oai: OpenAI) -> None:
    async with client.realtime.connect(extra_query={"intent": "transcription"}) as conn:
        await conn.session.update(session={
            "type": "transcription",
            "audio": {"input": {
                "format": {"type": "audio/pcm", "rate": OPENAI_RATE},
                "transcription": {"model": "gpt-realtime-whisper", "language": "en"},
                "turn_detection": None,
            }},
        })
        oai.conn = conn

        partial = ""
        async for event in conn:
            et = event.type
            if et.endswith("transcription_session.created"):
                print(f"OpenAI session:  {event.session.id}")
            elif et.endswith("transcription.delta"):
                partial += event.delta
                sp = convo.speaker
                tag = f"{color(sp)}[{sp}]{_RESET} " if sp else ""
                print(f"\r{tag}{partial}\033[K", end="", flush=True)
            elif et.endswith("transcription.completed"):
                sp = convo.pending.popleft() if convo.pending else convo.speaker
                text = event.transcript.strip()
                if text:
                    tag = f"{color(sp)}[{sp}]{_RESET} " if sp else ""
                    print(f"\r{tag}{text}\033[K")
                partial = ""


def microphone(loop, queue: "asyncio.Queue[np.ndarray]") -> sd.InputStream:
    def callback(indata, frames, time, status):
        try:
            loop.call_soon_threadsafe(queue.put_nowait, indata[:, 0].copy())
        except asyncio.QueueFull:
            pass

    stream = sd.InputStream(samplerate=SAMPLE_RATE, channels=1, dtype="float32", blocksize=CHUNK, callback=callback)
    stream.start()
    return stream


async def pump(queue: "asyncio.Queue[np.ndarray]", ws, convo: Conversation, oai: OpenAI) -> None:
    while True:
        frame = await queue.get()
        await ws.send(frame.astype("<f4").tobytes())
        if oai.conn is not None:
            pcm16 = (np.clip(upsample(frame), -1, 1) * 32767).astype("<i2").tobytes()
            await oai.conn.input_audio_buffer.append(audio=base64.b64encode(pcm16).decode())
            convo.uncommitted = True


async def main() -> None:
    loop = asyncio.get_running_loop()
    convo, oai = Conversation(), OpenAI()
    queue: "asyncio.Queue[np.ndarray]" = asyncio.Queue(maxsize=50)

    stream_id, url = create_stream()
    print(f"pyannote stream: {stream_id}")

    client = AsyncOpenAI(api_key=OPENAI_API_KEY)
    async with websockets.connect(url) as ws:
        mic = microphone(loop, queue)
        print("\nListening — speak now (Ctrl+C to stop)\n")

        stop = asyncio.Event()
        loop.add_signal_handler(signal.SIGINT, stop.set)
        tasks = [
            asyncio.create_task(pyannote_task(ws, convo, oai)),
            asyncio.create_task(openai_task(client, convo, oai)),
            asyncio.create_task(pump(queue, ws, convo, oai)),
        ]
        await asyncio.wait({*tasks, asyncio.create_task(stop.wait())}, return_when=asyncio.FIRST_COMPLETED)

        print("\nStopping...")
        mic.stop()
        mic.close()
        for task in tasks:
            task.cancel()
        await asyncio.gather(*tasks, return_exceptions=True)
        await ws.send(json.dumps({"type": "end_of_stream"}))


if __name__ == "__main__":
    asyncio.run(main())
Run it:
python live_diarized_transcription.py
Speak into your microphone. Partial text updates in place while OpenAI streams deltas. Completed turns print as separate lines with speaker labels like [SPEAKER_00]. Example output:
python live_diarized_transcription.py
pyannote stream: 7d3f4a21-9b6c-4f8a-8f12-3c9d8b6e2a44

Listening speak now (Ctrl+C to stop)

[SPEAKER_00] Hello
[SPEAKER_00] This is transcribing while diarization is running
[SPEAKER_00] The speaker label stays attached to this turn
[SPEAKER_01] Now another person is speaking, and the live transcript switches speakers.
[SPEAKER_01] It keeps printing completed turns as they arrive.
[SPEAKER_00] And now the first speaker is back.
^C
Stopping...

Important details

Create a pyannote stream

The script creates a streaming session with one simple request:
r = requests.post(
    "https://api.pyannote.ai/v1/live",
    headers={"Authorization": f"Bearer {PYANNOTEAI_API_KEY}"},
)
The response contains a WebSocket URL. Connect to it and send 16 kHz mono float32 PCM chunks every 100 ms. See Streaming Diarization for stream format details.

Use pyannote as turn detector

OpenAI realtime transcription can do its own turn detection, but this script disables it:
"turn_detection": None
pyannote owns speaker turns, so speaker boundaries and transcript segment boundaries stay aligned.

Send one audio stream to both APIs

The microphone captures 16 kHz float32 audio. That exact frame goes to pyannote:
await ws.send(frame.astype("<f4").tobytes())
The same audio is upsampled to 24 kHz PCM16 and appended to OpenAI’s input buffer:
pcm16 = (np.clip(upsample(frame), -1, 1) * 32767).astype("<i2").tobytes()
await oai.conn.input_audio_buffer.append(audio=base64.b64encode(pcm16).decode())

Attribute completed text to speaker turns

When pyannote ends a turn, queue that speaker label and commit OpenAI’s buffer:
convo.pending.append(data["speaker"])
await oai.conn.input_audio_buffer.commit()
When OpenAI returns completed text, assign queued speaker:
sp = convo.pending.popleft() if convo.pending else convo.speaker
This queue is what combines realtime diarization segments with realtime transcription segments.

Next steps

  • Replace terminal output with WebSocket broadcasts to your frontend.
  • Store completed {speaker, text} turns in your database.
  • Add timestamps from pyannote events if your UI needs time-aligned captions.