Skip to main content
This example demonstrates deploying a text-to-image model using Dedicated Containers. You’ll build a Sprocket worker that generates images from text prompts and deploy it to Together’s managed GPU infrastructure.

What You’ll Learn

  • Deploying a custom model with Sprocket and Jig
  • Returning base64-encoded images from your worker
  • Submitting jobs via the Queue API and polling for results
  • Configuring autoscaling for production workloads

Prerequisites

  • Together API Key – Get one from together.ai
  • Dedicated Containers access – Contact [email protected] to enable for your organization
  • Docker – For building container images. Install Docker
  • Together CLI – Install with pip install together --upgrade or uv tool install together
Set your API key:
export TOGETHER_API_KEY=your_key_here
Install Together library:
pip install together

Overview

This example deploys a Flux2 text-to-image model as a Dedicated Container. The Sprocket worker handles job processing, and Together manages GPU provisioning, autoscaling, and observability. What gets deployed:
  • A Sprocket worker running on an H100 GPU
  • Queue-based job processing for async image generation
  • Automatic scaling based on queue depth

How It Works

  1. Build – Jig builds a Docker image from your pyproject.toml configuration
  2. Push – The image is pushed to Together’s private container registry
  3. Deploy – Together provisions an H100 GPU and starts your container
  4. Queue – Jobs are submitted to the managed queue and processed by your Sprocket worker
  5. Scale – The autoscaler adjusts replicas based on queue depth

Project Structure

flux2-dev/
├── pyproject.toml    # Configuration and dependencies
└── run.py             # Sprocket worker implementation

Implementation

Sprocket Worker Code

import base64
import logging
import os
from io import BytesIO

import sprocket
import torch
from diffusers import Flux2Pipeline

logging.basicConfig(level=logging.INFO)


class Flux2Sprocket(sprocket.Sprocket):
    def setup(self) -> None:
        args = dict(
            repo_id="diffusers/FLUX.2-dev-bnb-4bit", torch_dtype=torch.bfloat16
        )
        device = "cuda" if torch.cuda.is_available() else "cpu"

        logging.info(
            f"Loading Flux2 pipeline from {args['repo_id']} on {device}..."
        )
        self.pipe = Flux2Pipeline.from_pretrained(**args).to(device)
        logging.info("Pipeline loaded successfully!")

    def predict(self, args: dict) -> dict:
        prompt = args.get("prompt", "a cat")

        # Optional parameters with defaults
        num_inference_steps = args.get("num_inference_steps", 28)
        guidance_scale = args.get("guidance_scale", 4.0)

        # Generate image
        logging.info(f"Generating image for prompt: {prompt[:50]}...")
        image = self.pipe(
            prompt=prompt,
            num_inference_steps=num_inference_steps,
            guidance_scale=guidance_scale,
        ).images[0]

        # Convert to base64
        buffered = BytesIO()
        image.save(buffered, format="PNG")
        img_str = base64.b64encode(buffered.getvalue()).decode()
        logging.info("Image generated successfully")

        return {"image": img_str, "format": "png", "encoding": "base64"}


if __name__ == "__main__":
    queue_name = os.environ.get(
        "TOGETHER_DEPLOYMENT_NAME", "sprocket-flux2-dev"
    )
    sprocket.run(Flux2Sprocket(), queue_name)

Configuration

[project]
name = "sprocket-flux2-dev"
version = "0.1.0"
dependencies = [
    "diffusers>=0.33.0",
    "transformers>=4.44.0",
    "torch>=2.0.0",
    "torchvision",
    "pillow",
    "accelerate",
    "bitsandbytes",
    "safetensors",
    "sprocket>=0.1.dev45"
]

[[tool.uv.index]]
name = "together-pypi"
url = "https://pypi.together.ai/"

[tool.uv.sources]
sprocket = { index = "together-pypi" }

[tool.jig.image]
python_version = "3.11"
cmd = "python3 run.py"
auto_include_git = false
copy = ["run.py"]

[tool.jig.deploy]
description = "Flux2-dev Image Generation with Sprocket"
gpu_type = "h100-80gb"
gpu_count = 1
cpu = 4
memory = 32
port = 8000
min_replicas = 1
max_replicas = 1

Key Concepts

Base64 Image Encoding

Images are returned as base64-encoded strings for JSON compatibility:
def predict(self, args: dict) -> dict:
    # Generate the image
    image = self.pipe(prompt=args["prompt"]).images[0]

    # Encode as PNG in base64
    buffered = BytesIO()
    image.save(buffered, format="PNG")
    img_str = base64.b64encode(buffered.getvalue()).decode()

    return {"image": img_str, "format": "png", "encoding": "base64"}
Decoding on the client:
import base64
from PIL import Image
from io import BytesIO

# Decode the response
image_data = base64.b64decode(response["image"])
image = Image.open(BytesIO(image_data))
image.save("output.png")

Generation Parameters

Flux2 supports several parameters to control generation:
ParameterDefaultDescription
prompt"a cat"Text description of the image
num_inference_steps28Denoising steps (more = better quality, slower)
guidance_scale4.0How closely to follow the prompt (higher = more literal)
image = self.pipe(
    prompt=prompt,
    num_inference_steps=28,  # Default for good quality/speed balance
    guidance_scale=4.0,  # Moderate guidance
).images[0]

Using the Deployment Name from Environment

The deployment name is read from the environment, with a fallback default:
queue_name = os.environ.get("TOGETHER_DEPLOYMENT_NAME", "sprocket-flux2-dev")
sprocket.run(Flux2Sprocket(), queue_name)
This allows the same code to work in different deployments by setting TOGETHER_DEPLOYMENT_NAME.

Deployment

Deploy

# Deploy (builds, pushes, and creates deployment)
together beta jig deploy

# Or deploy with cache warmup to reduce cold start latency
together beta jig deploy --warmup

# Monitor startup (model download takes a few minutes on first deploy)
together beta jig logs --follow

Check Deployment Status

# View deployment status and replica health
together beta jig status
Wait until the deployment shows running and replicas are ready before submitting jobs.

Submit Jobs

Jobs are submitted to the managed queue and processed asynchronously. You’ll need to poll for the result.
from together import Together
import base64
import time
from io import BytesIO
from PIL import Image

client = Together()
deployment = "sprocket-flux2-dev"

# Submit job to queue
job = client.beta.queue.submit(
    model=deployment,
    payload={
        "prompt": "A serene Japanese garden with cherry blossoms",
        "num_inference_steps": 28,
        "guidance_scale": 4.0,
    },
)
print(f"Job submitted: {job.request_id}")

# Poll for completion
while True:
    status = client.beta.queue.retrieve(
        request_id=job.request_id,
        model=deployment,
    )

    if status.status == "done":
        # Decode and save the image
        image_data = base64.b64decode(status.outputs["image"])
        image = Image.open(BytesIO(image_data))
        image.save("output.png")
        print("Image saved to output.png")
        break
    elif status.status == "failed":
        print(f"Job failed: {status.error}")
        break
    else:
        print(f"Status: {status.status}")
        time.sleep(2)

Input Parameters

ParameterTypeDefaultDescription
promptstring"a cat"Text description of the image to generate
num_inference_stepsint28Number of denoising steps
guidance_scalefloat4.0Classifier-free guidance scale

Output

{
  "image": "iVBORw0KGgoAAAANSUhEUgAA...",
  "format": "png",
  "encoding": "base64"
}
  • image: Base64-encoded PNG image data
  • format: Image format (always "png")
  • encoding: Encoding type (always "base64")

Batch Processing and Autoscaling

The configuration above can be updated to include autoscaling by increasing the max_replicas parameter. Then when the queue backlog grows, more replicas are added automatically. When workers are idle, replicas are removed (down to min_replicas). To scale more aggressively for high-throughput workloads:
[tool.jig.deploy]
min_replicas = 2      # Always keep 2 warm replicas
max_replicas = 50     # Scale up to 50 replicas

[tool.jig.autoscaling]
profile = "QueueBacklogPerWorker"
targetValue = "0.9"   # More aggressive scaling (more workers than needed)
To scale to zero when idle, specify min_replicas = 0 (saves costs but adds cold start latency):
[tool.jig.deploy]
min_replicas = 0
max_replicas = 10

Cleanup

When you’re done, delete the deployment:
together beta jig destroy

Next Steps