#!/usr/bin/env python3
"""
Streaming Transcription WebSocket — sample client.
"""
import asyncio
import json
import time
import wave
from urllib.parse import urlencode
import websockets
def load_audio(audio_path: str):
with wave.open(audio_path, "rb") as wf:
audio_bytes = wf.readframes(wf.getnframes())
sample_rate = wf.getframerate()
num_channels = wf.getnchannels()
sample_width = wf.getsampwidth()
print(
f"audio loaded: {audio_path} @ {sample_rate} Hz, "
f"{num_channels} channel(s), {sample_width} bytes/sample"
)
return audio_bytes, sample_rate, num_channels, sample_width
async def stream_transcription(
api_key: str,
audio_path: str = "sample.wav",
url: str = "wss://client.camb.ai/apis/transcription/listen",
model: str = "boli-v5",
language: str = "en-us",
):
audio_bytes, sample_rate, num_channels, _ = load_audio(audio_path)
params = {
"model": model,
"language": language,
"encoding": "linear16",
"sample_rate": sample_rate,
"channels": num_channels,
}
full_url = f"{url}?{urlencode(params)}"
print(f"Connecting to: {full_url}")
async with websockets.connect(
full_url,
open_timeout=30,
additional_headers={"x-api-key": api_key},
) as websocket:
print("Connected.")
async def receive_messages():
try:
async for message in websocket:
if isinstance(message, bytes):
continue
try:
data = json.loads(message)
except json.JSONDecodeError:
print(f"Non-JSON message: {message!r}")
continue
msg_type = data.get("type")
if msg_type == "Ready":
print("Ready: server is accepting audio.")
elif msg_type == "Results":
alt = (
data.get("channel", {})
.get("alternatives", [{}])[0]
)
transcript = alt.get("transcript", "")
confidence = alt.get("confidence", 0)
print(
f" [INTERIM] '{transcript}' "
f"(confidence={confidence:.2%})"
)
else:
print(f"Unhandled message: {data}")
except websockets.exceptions.ConnectionClosed as exc:
print(f"Connection closed: {exc.code} {exc.reason}")
async def send_messages():
# Stream audio in real-time-paced chunks.
chunk_duration = 0.1 # 100 ms
bytes_per_second = sample_rate * num_channels * 2 # linear16
chunk_size = int(bytes_per_second * chunk_duration)
t0 = time.time()
bytes_sent = 0
print(f"Streaming {len(audio_bytes)} bytes...")
for i in range(0, len(audio_bytes), chunk_size):
chunk = audio_bytes[i : i + chunk_size]
await websocket.send(chunk)
bytes_sent += len(chunk)
expected_elapsed = bytes_sent / bytes_per_second
actual_elapsed = time.time() - t0
sleep_time = expected_elapsed - actual_elapsed
if sleep_time > 0:
await asyncio.sleep(sleep_time)
# Optional: send a heartbeat between bursts of audio.
await websocket.send(json.dumps({"type": "KeepAlive"}))
# Tell the server you're done.
await websocket.send(json.dumps({"type": "CloseStream"}))
print("Sent CloseStream.")
await asyncio.gather(receive_messages(), send_messages())
if __name__ == "__main__":
asyncio.run(
stream_transcription(
api_key="YOUR_API_KEY",
audio_path="sample.wav",
)
)