#!/usr/bin/env python3
import asyncio
import base64
import json
import os
import signal
from collections import deque
import numpy as np
import requests
import sounddevice as sd
import websockets
from dotenv import load_dotenv
from openai import AsyncOpenAI
load_dotenv()
PYANNOTEAI_API_KEY = os.environ["PYANNOTEAI_API_KEY"]
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
SAMPLE_RATE = 16_000
OPENAI_RATE = 24_000
CHUNK = 1600
_COLORS = ["\033[32m", "\033[33m", "\033[34m", "\033[35m", "\033[36m", "\033[31m", "\033[37m", "\033[93m"]
_RESET = "\033[0m"
_colors: dict[str, str] = {}
def color(speaker: str) -> str:
_colors.setdefault(speaker, _COLORS[len(_colors) % len(_COLORS)])
return _colors[speaker]
class Conversation:
def __init__(self) -> None:
self.active: list[str] = []
self.pending: deque[str] = deque()
self.uncommitted = False
def started(self, speaker: str) -> None:
if speaker not in self.active:
self.active.append(speaker)
def ended(self, speaker: str) -> None:
if speaker in self.active:
self.active.remove(speaker)
@property
def speaker(self) -> str | None:
return self.active[-1] if self.active else None
def upsample(x: np.ndarray) -> np.ndarray:
n = len(x) * OPENAI_RATE // SAMPLE_RATE
return np.interp(np.linspace(0, len(x) - 1, n), np.arange(len(x)), x).astype(np.float32)
def create_stream() -> tuple[str, str]:
r = requests.post(
"https://api.pyannote.ai/v1/live",
headers={"Authorization": f"Bearer {PYANNOTEAI_API_KEY}"},
)
r.raise_for_status()
data = r.json()
return data["id"], data["url"]
async def pyannote_task(ws, convo: Conversation, oai) -> None:
async for raw in ws:
msg = json.loads(raw)
data = msg.get("data", {})
if msg.get("type") == "diarization_speaker_start":
convo.started(data["speaker"])
elif msg.get("type") == "diarization_speaker_end":
convo.ended(data["speaker"])
if oai.conn is not None and convo.uncommitted:
convo.pending.append(data["speaker"])
convo.uncommitted = False
await oai.conn.input_audio_buffer.commit()
elif msg.get("type") == "error":
print(f"\npyannote error: {msg.get('message')}")
class OpenAI:
conn = None
async def openai_task(client: AsyncOpenAI, convo: Conversation, oai: OpenAI) -> None:
async with client.realtime.connect(extra_query={"intent": "transcription"}) as conn:
await conn.session.update(session={
"type": "transcription",
"audio": {"input": {
"format": {"type": "audio/pcm", "rate": OPENAI_RATE},
"transcription": {"model": "gpt-realtime-whisper", "language": "en"},
"turn_detection": None,
}},
})
oai.conn = conn
partial = ""
async for event in conn:
et = event.type
if et.endswith("transcription_session.created"):
print(f"OpenAI session: {event.session.id}")
elif et.endswith("transcription.delta"):
partial += event.delta
sp = convo.speaker
tag = f"{color(sp)}[{sp}]{_RESET} " if sp else ""
print(f"\r{tag}{partial}\033[K", end="", flush=True)
elif et.endswith("transcription.completed"):
sp = convo.pending.popleft() if convo.pending else convo.speaker
text = event.transcript.strip()
if text:
tag = f"{color(sp)}[{sp}]{_RESET} " if sp else ""
print(f"\r{tag}{text}\033[K")
partial = ""
def microphone(loop, queue: "asyncio.Queue[np.ndarray]") -> sd.InputStream:
def callback(indata, frames, time, status):
try:
loop.call_soon_threadsafe(queue.put_nowait, indata[:, 0].copy())
except asyncio.QueueFull:
pass
stream = sd.InputStream(samplerate=SAMPLE_RATE, channels=1, dtype="float32", blocksize=CHUNK, callback=callback)
stream.start()
return stream
async def pump(queue: "asyncio.Queue[np.ndarray]", ws, convo: Conversation, oai: OpenAI) -> None:
while True:
frame = await queue.get()
await ws.send(frame.astype("<f4").tobytes())
if oai.conn is not None:
pcm16 = (np.clip(upsample(frame), -1, 1) * 32767).astype("<i2").tobytes()
await oai.conn.input_audio_buffer.append(audio=base64.b64encode(pcm16).decode())
convo.uncommitted = True
async def main() -> None:
loop = asyncio.get_running_loop()
convo, oai = Conversation(), OpenAI()
queue: "asyncio.Queue[np.ndarray]" = asyncio.Queue(maxsize=50)
stream_id, url = create_stream()
print(f"pyannote stream: {stream_id}")
client = AsyncOpenAI(api_key=OPENAI_API_KEY)
async with websockets.connect(url) as ws:
mic = microphone(loop, queue)
print("\nListening — speak now (Ctrl+C to stop)\n")
stop = asyncio.Event()
loop.add_signal_handler(signal.SIGINT, stop.set)
tasks = [
asyncio.create_task(pyannote_task(ws, convo, oai)),
asyncio.create_task(openai_task(client, convo, oai)),
asyncio.create_task(pump(queue, ws, convo, oai)),
]
await asyncio.wait({*tasks, asyncio.create_task(stop.wait())}, return_when=asyncio.FIRST_COMPLETED)
print("\nStopping...")
mic.stop()
mic.close()
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
await ws.send(json.dumps({"type": "end_of_stream"}))
if __name__ == "__main__":
asyncio.run(main())