import asyncio
import websockets
import json
import base64
import os
async def generate_speech():
api_key = os.environ.get("TOGETHER_API_KEY")
url = "wss://api.together.ai/v1/audio/speech/websocket?model=hexgrad/Kokoro-82M&voice=tara"
headers = {
"Authorization": f"Bearer {api_key}"
}
async with websockets.connect(url, additional_headers=headers) as ws:
# Wait for session created
session_msg = await ws.recv()
session_data = json.loads(session_msg)
print(f"Session created: {session_data['session']['id']}")
# Send text for TTS
text_chunks = [
"Hello, this is a test.",
"This is the second sentence.",
"And this is the final one."
]
async def send_text():
for chunk in text_chunks:
await ws.send(json.dumps({
"type": "input_text_buffer.append",
"text": chunk
}))
await asyncio.sleep(0.5) # Simulate typing
# Commit to process any remaining text
await ws.send(json.dumps({
"type": "input_text_buffer.commit"
}))
async def receive_audio():
audio_data = bytearray()
async for message in ws:
data = json.loads(message)
if data["type"] == "conversation.item.input_text.received":
print(f"Text received: {data['text']}")
elif data["type"] == "conversation.item.audio_output.delta":
# Decode base64 audio chunk
audio_chunk = base64.b64decode(data['delta'])
audio_data.extend(audio_chunk)
print(f"Received audio chunk for item {data['item_id']}")
elif data["type"] == "conversation.item.audio_output.done":
print(f"Audio generation complete for item {data['item_id']}")
elif data["type"] == "conversation.item.tts.failed":
error = data.get("error", {})
print(f"Error: {error.get('message')}")
break
# Save the audio to a file
with open("output.wav", "wb") as f:
f.write(audio_data)
print("Audio saved to output.wav")
# Run send and receive concurrently
await asyncio.gather(send_text(), receive_audio())
asyncio.run(generate_speech())Establishes a WebSocket connection for real-time text-to-speech generation. This endpoint uses WebSocket protocol (wss://api.together.ai/v1/audio/speech/websocket) for bidirectional streaming communication.
Connection Setup:
Client Events:
tts_session.updated: Update session parameters like voice
{
"type": "tts_session.updated",
"session": {
"voice": "tara"
}
}
input_text_buffer.append: Send text chunks for TTS generation
{
"type": "input_text_buffer.append",
"text": "Hello, this is a test."
}
input_text_buffer.clear: Clear the buffered text
{
"type": "input_text_buffer.clear"
}
input_text_buffer.commit: Signal end of text input and process remaining text
{
"type": "input_text_buffer.commit"
}
Server Events:
session.created: Initial session confirmation (sent first)
{
"event_id": "evt_123456",
"type": "session.created",
"session": {
"id": "session-id",
"object": "realtime.tts.session",
"modalities": ["text", "audio"],
"model": "hexgrad/Kokoro-82M",
"voice": "tara"
}
}
conversation.item.input_text.received: Acknowledgment that text was received
{
"type": "conversation.item.input_text.received",
"text": "Hello, this is a test."
}
conversation.item.audio_output.delta: Audio chunks as base64-encoded data
{
"type": "conversation.item.audio_output.delta",
"item_id": "tts_1",
"delta": "<base64_encoded_audio_chunk>"
}
conversation.item.audio_output.done: Audio generation complete for an item
{
"type": "conversation.item.audio_output.done",
"item_id": "tts_1"
}
conversation.item.tts.failed: Error occurred
{
"type": "conversation.item.tts.failed",
"error": {
"message": "Error description",
"type": "invalid_request_error",
"param": null,
"code": "invalid_api_key"
}
}
Text Processing:
max_partial_length characters (default: 250)input_text_buffer.commit event is receivedAudio Format:
conversation.item.audio_output.delta eventsError Codes:
invalid_api_key: Invalid API key provided (401)missing_api_key: Authorization header missing (401)model_not_available: Invalid or unavailable model (400)import asyncio
import websockets
import json
import base64
import os
async def generate_speech():
api_key = os.environ.get("TOGETHER_API_KEY")
url = "wss://api.together.ai/v1/audio/speech/websocket?model=hexgrad/Kokoro-82M&voice=tara"
headers = {
"Authorization": f"Bearer {api_key}"
}
async with websockets.connect(url, additional_headers=headers) as ws:
# Wait for session created
session_msg = await ws.recv()
session_data = json.loads(session_msg)
print(f"Session created: {session_data['session']['id']}")
# Send text for TTS
text_chunks = [
"Hello, this is a test.",
"This is the second sentence.",
"And this is the final one."
]
async def send_text():
for chunk in text_chunks:
await ws.send(json.dumps({
"type": "input_text_buffer.append",
"text": chunk
}))
await asyncio.sleep(0.5) # Simulate typing
# Commit to process any remaining text
await ws.send(json.dumps({
"type": "input_text_buffer.commit"
}))
async def receive_audio():
audio_data = bytearray()
async for message in ws:
data = json.loads(message)
if data["type"] == "conversation.item.input_text.received":
print(f"Text received: {data['text']}")
elif data["type"] == "conversation.item.audio_output.delta":
# Decode base64 audio chunk
audio_chunk = base64.b64decode(data['delta'])
audio_data.extend(audio_chunk)
print(f"Received audio chunk for item {data['item_id']}")
elif data["type"] == "conversation.item.audio_output.done":
print(f"Audio generation complete for item {data['item_id']}")
elif data["type"] == "conversation.item.tts.failed":
error = data.get("error", {})
print(f"Error: {error.get('message')}")
break
# Save the audio to a file
with open("output.wav", "wb") as f:
f.write(audio_data)
print("Audio saved to output.wav")
# Run send and receive concurrently
await asyncio.gather(send_text(), receive_audio())
asyncio.run(generate_speech())Bearer authentication header of the form Bearer <token>, where <token> is your auth token.
The TTS model to use for speech generation. Can also be set via tts_session.updated event.
hexgrad/Kokoro-82M, cartesia/sonic-english The voice to use for speech generation. Default is 'tara'.
Available voices vary by model. Can also be updated via tts_session.updated event.
Maximum number of characters in partial text before forcing TTS generation even without a sentence ending. Helps reduce latency for long text without punctuation.
Switching Protocols - WebSocket connection established successfully.
Error message format:
{
"type": "conversation.item.tts.failed",
"error": {
"message": "Error description",
"type": "invalid_request_error",
"param": null,
"code": "error_code"
}
}Was this page helpful?