import asyncio
import websockets
import json
import base64
import os
async def generate_speech():
api_key = os.environ.get("TOGETHER_API_KEY")
url = "wss://api.together.ai/v1/audio/speech/websocket?model=hexgrad/Kokoro-82M&voice=tara"
headers = {
"Authorization": f"Bearer {api_key}"
}
async with websockets.connect(url, additional_headers=headers) as ws:
# Wait for session created
session_msg = await ws.recv()
session_data = json.loads(session_msg)
print(f"Session created: {session_data['session']['id']}")
# Send text for TTS
text_chunks = [
"Hello, this is a test.",
"This is the second sentence.",
"And this is the final one."
]
async def send_text():
for chunk in text_chunks:
await ws.send(json.dumps({
"type": "input_text_buffer.append",
"text": chunk
}))
await asyncio.sleep(0.5) # Simulate typing
# Commit to process any remaining text
await ws.send(json.dumps({
"type": "input_text_buffer.commit"
}))
async def receive_audio():
audio_data = bytearray()
async for message in ws:
data = json.loads(message)
if data["type"] == "conversation.item.input_text.received":
print(f"Text received: {data['text']}")
elif data["type"] == "conversation.item.audio_output.delta":
# Decode base64 audio chunk
audio_chunk = base64.b64decode(data['delta'])
audio_data.extend(audio_chunk)
print(f"Received audio chunk for item {data['item_id']}")
elif data["type"] == "conversation.item.audio_output.done":
print(f"Audio generation complete for item {data['item_id']}")
elif data["type"] == "conversation.item.tts.failed":
error = data.get("error", {})
print(f"Error: {error.get('message')}")
break
# Save the audio to a file
with open("output.wav", "wb") as f:
f.write(audio_data)
print("Audio saved to output.wav")
# Run send and receive concurrently
await asyncio.gather(send_text(), receive_audio())
asyncio.run(generate_speech())