import asyncio
import base64
import json
import os
import sys
import numpy as np
import sounddevice as sd
import websockets
# Configuration
API_KEY = os.getenv("TOGETHER_API_KEY")
MODEL = "openai/whisper-large-v3"
SAMPLE_RATE = 16000
BATCH_SIZE = 4096 # 256ms batches for optimal performance
if not API_KEY:
print("Error: Set TOGETHER_API_KEY environment variable")
sys.exit(1)
class RealtimeTranscriber:
"""Realtime transcription client for Together AI."""
def __init__(self):
self.ws = None
self.stream = None
self.is_ready = False
self.audio_buffer = np.array([], dtype=np.float32)
self.audio_queue = asyncio.Queue()
async def connect(self):
"""Connect to Together AI API."""
url = (
f"wss://api.together.ai/v1/realtime"
f"?intent=transcription"
f"&model={MODEL}"
f"&input_audio_format=pcm_s16le_16000"
f"&authorization=Bearer {API_KEY}"
)
self.ws = await websockets.connect(
url,
subprotocols=[
"realtime",
f"openai-insecure-api-key.{API_KEY}",
"openai-beta.realtime-v1",
],
)
async def send_audio(self):
"""Capture and send audio to API."""
def audio_callback(indata, frames, time, status):
self.audio_queue.put_nowait(indata.copy().flatten())
# Start microphone stream
self.stream = sd.InputStream(
samplerate=SAMPLE_RATE,
channels=1,
dtype="float32",
blocksize=1024,
callback=audio_callback,
)
self.stream.start()
# Process and send audio
while True:
try:
audio = await asyncio.wait_for(
self.audio_queue.get(), timeout=0.1
)
if self.ws and self.is_ready:
# Add to buffer
self.audio_buffer = np.concatenate(
[self.audio_buffer, audio]
)
# Send when buffer is full
while len(self.audio_buffer) >= BATCH_SIZE:
batch = self.audio_buffer[:BATCH_SIZE]
self.audio_buffer = self.audio_buffer[BATCH_SIZE:]
# Convert float32 to int16 PCM
audio_int16 = (
np.clip(batch, -1.0, 1.0) * 32767
).astype(np.int16)
audio_base64 = base64.b64encode(
audio_int16.tobytes()
).decode()
# Send to API
await self.ws.send(
json.dumps(
{
"type": "input_audio_buffer.append",
"audio": audio_base64,
}
)
)
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
break
async def receive_transcriptions(self):
"""Receive and display transcription results."""
current_interim = ""
try:
async for message in self.ws:
data = json.loads(message)
if data["type"] == "session.created":
self.is_ready = True
elif (
data["type"]
== "conversation.item.input_audio_transcription.delta"
):
# Interim result
print(
f"\r\033[90m{data['delta']}\033[0m", end="", flush=True
)
current_interim = data["delta"]
elif (
data["type"]
== "conversation.item.input_audio_transcription.completed"
):
# Final result
if current_interim:
print("\r\033[K", end="")
print(f"\033[92m{data['transcript']}\033[0m")
current_interim = ""
elif data["type"] == "error":
print(f"\nError: {data.get('message', 'Unknown error')}")
except websockets.exceptions.ConnectionClosed:
pass
async def close(self):
"""Close connections and cleanup."""
if self.stream:
self.stream.stop()
self.stream.close()
# Flush remaining audio
if len(self.audio_buffer) > 0 and self.ws and self.is_ready:
try:
audio_int16 = (
np.clip(self.audio_buffer, -1.0, 1.0) * 32767
).astype(np.int16)
audio_base64 = base64.b64encode(audio_int16.tobytes()).decode()
await self.ws.send(
json.dumps(
{
"type": "input_audio_buffer.append",
"audio": audio_base64,
}
)
)
except Exception:
pass
if self.ws:
await self.ws.close()
async def run(self):
"""Main execution loop."""
try:
print("🎤 Together AI Realtime Transcription")
print("=" * 40)
print("Connecting...")
await self.connect()
print("✓ Connected")
print("✓ Recording started - speak now\n")
# Run audio capture and transcription concurrently
await asyncio.gather(
self.send_audio(), self.receive_transcriptions()
)
except KeyboardInterrupt:
print("\n\nStopped")
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
finally:
await self.close()
async def main():
transcriber = RealtimeTranscriber()
await transcriber.run()
if __name__ == "__main__":
asyncio.run(main())