Cost-optimised batch classification

Classify 100K support tickets, news items, or comments at 30% off retail using NexToken's batch endpoint. Includes idempotency, resume-on-failure, and structured-output parsing.

⏱ 12 minnex-pro/v1/batches−30% offasync

When to use batch vs sync

Sync (/v1/chat/completions) is the right call when a user is waiting. Batch (/v1/batches) is the right call when nobody's waiting and you have ≥ 1,000 items: classification, summarisation, scoring, enrichment, eval runs.

Step 1 · Build the JSONL

Each line is an OpenAI-shape chat request. The custom_id is yours to recover.

import json

CATEGORIES = ["billing", "technical", "feature_request", "complaint", "praise", "other"]

def make_request(ticket_id, body):
    return {
        "custom_id": ticket_id,
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": "nex-pro",
            "response_format": {"type": "json_object"},
            "messages": [
                {"role": "system", "content":
                    f"Classify the support ticket into ONE of: {', '.join(CATEGORIES)}. "
                    f"Return JSON: {{\"category\": \"...\", \"confidence\": 0-1, \"reason\": \"<5 words\"}}"},
                {"role": "user", "content": body},
            ],
            "max_tokens": 60,
            "temperature": 0,
        },
    }

with open("requests.jsonl", "w") as f:
    for ticket in load_tickets():            # your data source
        f.write(json.dumps(make_request(ticket["id"], ticket["body"])) + "\n")

Step 2 · Submit the batch

import requests, os

NEX_KEY = os.environ["NEX_API_KEY"]
BASE = "https://api.nextoken.biz/v1"

# Read the JSONL into a list and POST as one batch (max 50,000 items)
with open("requests.jsonl") as f:
    items = [json.loads(line) for line in f]

r = requests.post(
    f"{BASE}/batches",
    headers={"Authorization": f"Bearer {NEX_KEY}", "Content-Type": "application/json"},
    json={"items": items},
)
r.raise_for_status()
batch = r.json()
print("submitted:", batch["id"], "items:", batch["item_count"])

Step 3 · Poll until complete

import time

def wait_for_batch(batch_id, every=15):
    while True:
        r = requests.get(
            f"{BASE}/batches/{batch_id}",
            headers={"Authorization": f"Bearer {NEX_KEY}"},
        )
        r.raise_for_status()
        s = r.json()
        print(f"  status={s['status']}  success={s['success_count']}/{s['item_count']}")
        if s["status"] in ("completed", "failed", "cancelled"):
            return s
        time.sleep(every)

final = wait_for_batch(batch["id"])
print(f"Final: ${final['total_retail_usd']} for {final['success_count']} items")

Step 4 · Stream the output

Results come back as NDJSON, one row per item. Stream-parse so you don't load 50K rows into memory.

import json

resp = requests.get(
    f"{BASE}/batches/{batch['id']}/output",
    headers={"Authorization": f"Bearer {NEX_KEY}"},
    stream=True,
)
resp.raise_for_status()

results = {}
for line in resp.iter_lines():
    if not line: continue
    row = json.loads(line)
    custom_id = row["custom_id"]
    if row.get("error"):
        results[custom_id] = {"error": row["error"]}
        continue
    content = row["response"]["body"]["choices"][0]["message"]["content"]
    try:
        results[custom_id] = json.loads(content)
    except Exception:
        results[custom_id] = {"raw": content}

print(f"Parsed {len(results)} results")
Why response_format=json_object? Forces the model to return valid JSON. Combined with temperature=0 and a tight prompt, parse failures drop to under 1%.

Step 5 · Resume on failure

The batch endpoint is idempotent on custom_id — re-submitting an item with the same custom_id is a no-op if it already succeeded. Pattern for a 100K job:

def submit_with_resume(all_items, chunk_size=10_000, state_path="batch_state.json"):
    state = json.load(open(state_path)) if os.path.exists(state_path) else {"done": []}
    done = set(state["done"])
    todo = [it for it in all_items if it["custom_id"] not in done]
    print(f"resuming: {len(done)} done, {len(todo)} remaining")
    for i in range(0, len(todo), chunk_size):
        chunk = todo[i:i + chunk_size]
        # ... submit + wait + parse as above ...
        state["done"].extend(it["custom_id"] for it in chunk)
        json.dump(state, open(state_path, "w"))

Bill comparison

100K classification items × ~80 input + 30 output tokens each, on nex-pro:

What's next