import asyncio
import websockets
import json
import base64
import os
async def transcribe_audio():
api_key = os.environ.get("TOGETHER_API_KEY")
url = "wss://api.together.ai/v1/realtime?model=openai/whisper-large-v3&input_audio_format=pcm_s16le_16000"
headers = {
"Authorization": f"Bearer {api_key}"
}
async with websockets.connect(url, additional_headers=headers) as ws:
# Read audio file
with open("audio.wav", "rb") as f:
audio_data = f.read()
# Send audio in chunks with delay to simulate real-time
chunk_size = 8192
bytes_per_second = 16000 * 2 # 16kHz * 2 bytes (16-bit)
delay_per_chunk = chunk_size / bytes_per_second
for i in range(0, len(audio_data), chunk_size):
chunk = audio_data[i:i+chunk_size]
base64_chunk = base64.b64encode(chunk).decode('utf-8')
await ws.send(json.dumps({
"type": "input_audio_buffer.append",
"audio": base64_chunk
}))
# Simulate real-time streaming
if i + chunk_size < len(audio_data):
await asyncio.sleep(delay_per_chunk)
# Commit the audio buffer
await ws.send(json.dumps({
"type": "input_audio_buffer.commit"
}))
# Receive transcription results
async for message in ws:
data = json.loads(message)
if data["type"] == "conversation.item.input_audio_transcription.delta":
print(f"Partial: {data['delta']}")
elif data["type"] == "conversation.item.input_audio_transcription.completed":
print(f"Final: {data['transcript']}")
break
elif data["type"] == "conversation.item.input_audio_transcription.failed":
error = data.get("error", {})
print(f"Error: {error.get('message')}")
break
asyncio.run(transcribe_audio())