The server uses Voice Activity Detection (VAD) to automatically segment speech. You can tune VAD parameters for your audio characteristics. See the Voice activity detection guide for configuration details and common presets.
The WebSocket API is currently only available via raw WebSocket connections. SDK support coming soon.
Establish a connection
Connect to:wss://api.together.ai/v1/realtime?model={model}&input_audio_format=pcm_s16le_16000
Headers:
{
'Authorization': 'Bearer $TOGETHER_API_KEY',
'OpenAI-Beta': 'realtime=v1'
}
Query parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
| model | string | Yes | Model to use (e.g., openai/whisper-large-v3) |
| input_audio_format | string | Yes | Audio format: pcm_s16le_16000 |
Client-to-server messages
Append audio to buffer
{
"type": "input_audio_buffer.append",
"audio": "base64-encoded-audio-chunk"
}
Commit audio buffer
{
"type": "input_audio_buffer.commit"
}
Server-to-client messages
Delta events (intermediate results)
{
"type": "conversation.item.input_audio_transcription.delta",
"delta": "The quick brown fox jumps"
}
Completed events (final results)
{
"type": "conversation.item.input_audio_transcription.completed",
"transcript": "The quick brown fox jumps over the lazy dog"
}
Real-time example
import asyncio
import base64
import json
import os
import sys
import numpy as np
import sounddevice as sd
import websockets
# Configuration
API_KEY = os.getenv("TOGETHER_API_KEY")
MODEL = "openai/whisper-large-v3"
SAMPLE_RATE = 16000
BATCH_SIZE = 4096 # 256ms batches for optimal performance
if not API_KEY:
print("Error: Set TOGETHER_API_KEY environment variable")
sys.exit(1)
class RealtimeTranscriber:
"""Realtime transcription client for Together AI."""
def __init__(self):
self.ws = None
self.stream = None
self.is_ready = False
self.audio_buffer = np.array([], dtype=np.float32)
self.audio_queue = asyncio.Queue()
async def connect(self):
"""Connect to Together AI API."""
url = (
f"wss://api.together.ai/v1/realtime"
f"?intent=transcription"
f"&model={MODEL}"
f"&input_audio_format=pcm_s16le_16000"
f"&authorization=Bearer {API_KEY}"
)
self.ws = await websockets.connect(
url,
subprotocols=[
"realtime",
f"openai-insecure-api-key.{API_KEY}",
"openai-beta.realtime-v1",
],
)
async def send_audio(self):
"""Capture and send audio to API."""
def audio_callback(indata, frames, time, status):
self.audio_queue.put_nowait(indata.copy().flatten())
# Start microphone stream
self.stream = sd.InputStream(
samplerate=SAMPLE_RATE,
channels=1,
dtype="float32",
blocksize=1024,
callback=audio_callback,
)
self.stream.start()
# Process and send audio
while True:
try:
audio = await asyncio.wait_for(
self.audio_queue.get(), timeout=0.1
)
if self.ws and self.is_ready:
# Add to buffer
self.audio_buffer = np.concatenate(
[self.audio_buffer, audio]
)
# Send when buffer is full
while len(self.audio_buffer) >= BATCH_SIZE:
batch = self.audio_buffer[:BATCH_SIZE]
self.audio_buffer = self.audio_buffer[BATCH_SIZE:]
# Convert float32 to int16 PCM
audio_int16 = (
np.clip(batch, -1.0, 1.0) * 32767
).astype(np.int16)
audio_base64 = base64.b64encode(
audio_int16.tobytes()
).decode()
# Send to API
await self.ws.send(
json.dumps(
{
"type": "input_audio_buffer.append",
"audio": audio_base64,
}
)
)
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
break
async def receive_transcriptions(self):
"""Receive and display transcription results."""
current_interim = ""
try:
async for message in self.ws:
data = json.loads(message)
if data["type"] == "session.created":
self.is_ready = True
elif (
data["type"]
== "conversation.item.input_audio_transcription.delta"
):
# Interim result
print(
f"\r\033[90m{data['delta']}\033[0m", end="", flush=True
)
current_interim = data["delta"]
elif (
data["type"]
== "conversation.item.input_audio_transcription.completed"
):
# Final result
if current_interim:
print("\r\033[K", end="")
print(f"\033[92m{data['transcript']}\033[0m")
current_interim = ""
elif data["type"] == "error":
print(f"\nError: {data.get('message', 'Unknown error')}")
except websockets.exceptions.ConnectionClosed:
pass
async def close(self):
"""Close connections and cleanup."""
if self.stream:
self.stream.stop()
self.stream.close()
# Flush remaining audio
if len(self.audio_buffer) > 0 and self.ws and self.is_ready:
try:
audio_int16 = (
np.clip(self.audio_buffer, -1.0, 1.0) * 32767
).astype(np.int16)
audio_base64 = base64.b64encode(audio_int16.tobytes()).decode()
await self.ws.send(
json.dumps(
{
"type": "input_audio_buffer.append",
"audio": audio_base64,
}
)
)
except Exception:
pass
if self.ws:
await self.ws.close()
async def run(self):
"""Main execution loop."""
try:
print("🎤 Together AI Realtime Transcription")
print("=" * 40)
print("Connecting...")
await self.connect()
print("✓ Connected")
print("✓ Recording started - speak now\n")
# Run audio capture and transcription concurrently
await asyncio.gather(
self.send_audio(), self.receive_transcriptions()
)
except KeyboardInterrupt:
print("\n\nStopped")
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
finally:
await self.close()
async def main():
transcriber = RealtimeTranscriber()
await transcriber.run()
if __name__ == "__main__":
asyncio.run(main())