Quickstart¶
Extraction is asynchronous. POST /v1/extract returns immediately with one or
more job_ids; the text shows up after the worker finishes. The simplest
pattern is to poll. Each SDK has a wait helper for the polling path.
Get a key¶
Pick one:
- No signup — the single-file snippets below bootstrap an
anonymous sandbox key inline (50 free pages /
24 h). The TypeScript, Go, and Dart batch snippets read
KREUZBERG_API_KEYfrom the environment — point it at a freshsk_sandbox_key fromPOST /v1/sandbox/keyto stay anonymous. - Real key — grab a
kz_key from the dashboard and pass it to the SDK constructor explicitly:KreuzbergCloud(api_key=os.environ["KREUZBERG_API_KEY"])in Python and the per-language equivalents in Installation and Authentication. None of the SDKs read the env var on their own.
Install¶
pip install kreuzberg-cloud-sdk # Python
pnpm add @kreuzberg/cloud # TypeScript
go get github.com/kreuzberg-dev/kreuzberg-cloud-sdk/go # Go
dart pub add kreuzberg_cloud_sdk # Dart
Full per-language install matrix in Installation.
Extract one file¶
import time
import httpx
import os
API = "https://api.kreuzberg.dev"
TOKEN = os.environ["KREUZBERG_API_KEY"] # kz_... for live, sk_sandbox_... for sandbox
with open("invoice.pdf", "rb") as fh:
files = {"file": ("invoice.pdf", fh, "application/pdf")}
data = {"webhook": '{"url":""}'}
submit = httpx.post(
f"{API}/v1/extract",
data=data,
files=files,
headers={"Authorization": f"Bearer {TOKEN}"},
)
submit.raise_for_status()
job_id = submit.json()["job_ids"][0]
while True:
job = httpx.get(f"{API}/v1/jobs/{job_id}", headers={"Authorization": f"Bearer {TOKEN}"}).json()
if job["status"] in {"completed", "failed", "cancelled", "partial_success"}:
break
time.sleep(1)
print(job["result"]["content"])
import { KreuzbergCloud } from "@kreuzberg/cloud";
import { readFile } from "node:fs/promises";
const client = await KreuzbergCloud.fromSandbox();
const data = await readFile("invoice.pdf");
const result = await client.extractAndWait({
file: { name: "invoice.pdf", data, mimeType: "application/pdf" },
});
console.log(result.result?.content);
import { readFile } from "node:fs/promises";
import { setTimeout as sleep } from "node:timers/promises";
const API = "https://api.kreuzberg.dev";
const TOKEN = process.env.KREUZBERG_API_KEY!;
const TERMINAL = new Set(["completed", "failed", "cancelled", "partial_success"]);
const form = new FormData();
const data = await readFile("invoice.pdf");
form.append("file", new Blob([data], { type: "application/pdf" }), "invoice.pdf");
form.append("webhook", JSON.stringify({ url: "" }));
const submit = await fetch(`${API}/v1/extract`, {
method: "POST",
headers: { authorization: `Bearer ${TOKEN}` },
body: form,
});
const { job_ids } = (await submit.json()) as { job_ids: string[] };
let job: { status: string; result?: { content?: string } };
do {
await sleep(1000);
const response = await fetch(`${API}/v1/jobs/${job_ids[0]}`, {
headers: { authorization: `Bearer ${TOKEN}` },
});
job = await response.json();
} while (!TERMINAL.has(job.status));
console.log(job.result?.content);
package main
import (
"context"
"fmt"
"log"
"os"
kreuzbergcloud "github.com/kreuzberg-dev/kreuzberg-cloud-sdk/go"
)
func main() {
ctx := context.Background()
client, err := kreuzbergcloud.FromSandbox(ctx)
if err != nil {
log.Fatal(err)
}
file, err := os.Open("invoice.pdf")
if err != nil {
log.Fatal(err)
}
defer file.Close()
result, err := client.ExtractAndWait(
ctx,
kreuzbergcloud.FileSource{Name: "invoice.pdf", Reader: file},
nil,
)
if err != nil {
log.Fatal(err)
}
fmt.Println(result.Content)
}
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"mime/multipart"
"net/http"
"os"
"time"
)
func main() {
apiKey := os.Getenv("KREUZBERG_API_KEY")
file, err := os.Open("invoice.pdf")
if err != nil {
log.Fatal(err)
}
defer file.Close()
var body bytes.Buffer
writer := multipart.NewWriter(&body)
part, err := writer.CreateFormFile("file", "invoice.pdf")
if err != nil {
log.Fatal(err)
}
if _, err := io.Copy(part, file); err != nil {
log.Fatal(err)
}
if err := writer.WriteField("webhook", `{"url":""}`); err != nil {
log.Fatal(err)
}
if err := writer.Close(); err != nil {
log.Fatal(err)
}
request, err := http.NewRequest("POST", "https://api.kreuzberg.dev/v1/extract", &body)
if err != nil {
log.Fatal(err)
}
request.Header.Set("Authorization", "Bearer "+apiKey)
request.Header.Set("Content-Type", writer.FormDataContentType())
response, err := http.DefaultClient.Do(request)
if err != nil {
log.Fatal(err)
}
defer response.Body.Close()
var submission struct {
JobIDs []string `json:"job_ids"`
}
if err := json.NewDecoder(response.Body).Decode(&submission); err != nil {
log.Fatal(err)
}
jobID := submission.JobIDs[0]
for {
poll, err := http.NewRequest("GET", "https://api.kreuzberg.dev/v1/jobs/"+jobID, nil)
if err != nil {
log.Fatal(err)
}
poll.Header.Set("Authorization", "Bearer "+apiKey)
result, err := http.DefaultClient.Do(poll)
if err != nil {
log.Fatal(err)
}
var job struct {
Status string `json:"status"`
Result struct {
Content string `json:"content"`
} `json:"result"`
}
if err := json.NewDecoder(result.Body).Decode(&job); err != nil {
log.Fatal(err)
}
result.Body.Close()
if job.Status == "completed" {
fmt.Println(job.Result.Content)
return
}
if job.Status == "failed" || job.Status == "cancelled" {
log.Fatalf("extraction %s", job.Status)
}
time.Sleep(time.Second)
}
}
import 'dart:io';
import 'package:dio/dio.dart';
import 'package:kreuzberg_cloud_sdk/kreuzberg_cloud_sdk.dart';
Future<void> main() async {
// The Dart SDK does not ship a sandbox helper yet — fetch one inline.
final sandbox = await Dio().post<Map<String, dynamic>>(
'https://api.kreuzberg.dev/v1/sandbox/key',
);
final apiKey = sandbox.data!['api_key'] as String;
final client = KreuzbergCloudClient(apiKey: apiKey);
final accepted = await client.extractMultipart(
files: [await MultipartFile.fromFile('invoice.pdf')],
webhook: const WebhookConfig(url: ''),
);
final finished = await client.waitForJob(accepted.jobIds.first);
print(finished.result?.content);
client.close();
}
# Grab a sandbox key (no signup; valid 24 h, 50 pages).
KREUZBERG_API_KEY=$(curl -sX POST https://api.kreuzberg.dev/v1/sandbox/key | jq -r .api_key)
# Submit one file. /v1/extract returns { job_ids: [<uuid>], status: "pending" }.
JOB=$(curl -sX POST https://api.kreuzberg.dev/v1/extract \
-H "Authorization: Bearer $KREUZBERG_API_KEY" \
-F "file=@invoice.pdf" \
-F 'webhook={"url":""}' | jq -r '.job_ids[0]')
# Poll until done, then print extracted text.
while [ "$(curl -s "https://api.kreuzberg.dev/v1/jobs/$JOB" \
-H "Authorization: Bearer $KREUZBERG_API_KEY" | jq -r .status)" \
!= "completed" ]; do sleep 1; done
curl -s "https://api.kreuzberg.dev/v1/jobs/$JOB" \
-H "Authorization: Bearer $KREUZBERG_API_KEY" | jq -r .result.content
Extract a batch in parallel¶
Submit many files at once and poll concurrently — this is the production pattern.
import asyncio
from pathlib import Path
from kreuzberg_cloud import AsyncKreuzbergCloud
async def main() -> None:
paths = [Path(p) for p in ("a.pdf", "b.pdf", "c.pdf")]
async with await AsyncKreuzbergCloud.from_sandbox() as client:
jobs = await client.extract_batch(paths)
finished = await client.wait_for_jobs([j.id for j in jobs])
for job in finished:
print(job.filename, job.status)
asyncio.run(main())
import asyncio
import httpx
import os
API = "https://api.kreuzberg.dev"
TOKEN = os.environ["KREUZBERG_API_KEY"] # kz_... for live, sk_sandbox_... for sandbox
TERMINAL = {"completed", "failed", "cancelled", "partial_success"}
async def submit(client: httpx.AsyncClient, path: str) -> str:
with open(path, "rb") as fh:
files = {"file": (path, fh.read(), "application/octet-stream")}
response = await client.post(
"/v1/extract",
data={"webhook": '{"url":""}'},
files=files,
)
response.raise_for_status()
return response.json()["job_ids"][0]
async def wait(client: httpx.AsyncClient, job_id: str) -> dict:
while True:
job = (await client.get(f"/v1/jobs/{job_id}")).json()
if job["status"] in TERMINAL:
return job
await asyncio.sleep(1)
async def main() -> None:
headers = {"Authorization": f"Bearer {TOKEN}"}
async with httpx.AsyncClient(base_url=API, headers=headers, timeout=60) as client:
ids = await asyncio.gather(*(submit(client, p) for p in ["a.pdf", "b.pdf", "c.pdf"]))
results = await asyncio.gather(*(wait(client, jid) for jid in ids))
for job in results:
print(job["filename"], job["status"])
asyncio.run(main())
import { KreuzbergCloud } from "@kreuzberg/cloud";
import { readFile } from "node:fs/promises";
const client = new KreuzbergCloud({ apiKey: process.env.KREUZBERG_API_KEY! });
const files = await Promise.all(
["a.pdf", "b.pdf", "c.pdf"].map(async (name) => ({ name, data: await readFile(name) })),
);
const jobs = await client.extractBatch({ files });
const results = await client.waitForJobs(jobs.map((job) => job.id));
for (const result of results) {
console.log(result.filename, result.status);
}
import { readFile } from "node:fs/promises";
import { setTimeout as sleep } from "node:timers/promises";
const API = "https://api.kreuzberg.dev";
const TOKEN = process.env.KREUZBERG_API_KEY!;
const TERMINAL = new Set(["completed", "failed", "cancelled", "partial_success"]);
async function submit(path: string): Promise<string> {
const form = new FormData();
const data = await readFile(path);
form.append("file", new Blob([data]), path);
form.append("webhook", JSON.stringify({ url: "" }));
const response = await fetch(`${API}/v1/extract`, {
method: "POST",
headers: { authorization: `Bearer ${TOKEN}` },
body: form,
});
const body = (await response.json()) as { job_ids: string[] };
return body.job_ids[0]!;
}
async function wait(jobId: string): Promise<{ filename: string; status: string }> {
for (;;) {
const response = await fetch(`${API}/v1/jobs/${jobId}`, {
headers: { authorization: `Bearer ${TOKEN}` },
});
const job = (await response.json()) as { filename: string; status: string };
if (TERMINAL.has(job.status)) return job;
await sleep(1000);
}
}
const ids = await Promise.all(["a.pdf", "b.pdf", "c.pdf"].map(submit));
const results = await Promise.all(ids.map(wait));
for (const result of results) console.log(result.filename, result.status);
package main
import (
"context"
"fmt"
"log"
"os"
kreuzbergcloud "github.com/kreuzberg-dev/kreuzberg-cloud-sdk/go"
)
func main() {
ctx := context.Background()
client, err := kreuzbergcloud.New(
kreuzbergcloud.WithAPIKey(os.Getenv("KREUZBERG_API_KEY")),
)
if err != nil {
log.Fatal(err)
}
paths := []string{"invoice-a.pdf", "invoice-b.pdf"}
sources := make([]kreuzbergcloud.FileSource, 0, len(paths))
for _, path := range paths {
file, err := os.Open(path)
if err != nil {
log.Fatal(err)
}
defer file.Close()
sources = append(sources, kreuzbergcloud.FileSource{Name: path, Reader: file})
}
jobs, err := client.ExtractBatch(ctx, sources, nil)
if err != nil {
log.Fatal(err)
}
ids := make([]string, len(jobs))
for i, job := range jobs {
ids[i] = job.ID
}
results, err := client.WaitForJobs(ctx, ids, nil)
if err != nil {
log.Fatal(err)
}
for i, result := range results {
fmt.Printf("%s -> %d chars\n", paths[i], len(result.Content))
}
}
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"mime/multipart"
"net/http"
"os"
"sync"
"time"
)
func main() {
apiKey := os.Getenv("KREUZBERG_API_KEY")
paths := []string{"invoice-a.pdf", "invoice-b.pdf"}
var body bytes.Buffer
writer := multipart.NewWriter(&body)
for _, path := range paths {
f, err := os.Open(path)
if err != nil {
log.Fatal(err)
}
part, err := writer.CreateFormFile("file", path)
if err != nil {
log.Fatal(err)
}
if _, err := io.Copy(part, f); err != nil {
log.Fatal(err)
}
f.Close()
}
if err := writer.WriteField("webhook", `{"url":""}`); err != nil {
log.Fatal(err)
}
if err := writer.Close(); err != nil {
log.Fatal(err)
}
request, err := http.NewRequest("POST", "https://api.kreuzberg.dev/v1/extract", &body)
if err != nil {
log.Fatal(err)
}
request.Header.Set("Authorization", "Bearer "+apiKey)
request.Header.Set("Content-Type", writer.FormDataContentType())
response, err := http.DefaultClient.Do(request)
if err != nil {
log.Fatal(err)
}
var submission struct {
JobIDs []string `json:"job_ids"`
}
if err := json.NewDecoder(response.Body).Decode(&submission); err != nil {
log.Fatal(err)
}
response.Body.Close()
var waitGroup sync.WaitGroup
results := make([]string, len(submission.JobIDs))
errs := make([]error, len(submission.JobIDs))
for i, jobID := range submission.JobIDs {
waitGroup.Add(1)
go func(index int, id string) {
defer waitGroup.Done()
for {
poll, err := http.NewRequest("GET", "https://api.kreuzberg.dev/v1/jobs/"+id, nil)
if err != nil {
errs[index] = err
return
}
poll.Header.Set("Authorization", "Bearer "+apiKey)
result, err := http.DefaultClient.Do(poll)
if err != nil {
errs[index] = err
return
}
var job struct {
Status string `json:"status"`
Result struct {
Content string `json:"content"`
} `json:"result"`
}
if err := json.NewDecoder(result.Body).Decode(&job); err != nil {
errs[index] = err
result.Body.Close()
return
}
result.Body.Close()
if job.Status == "completed" {
results[index] = job.Result.Content
return
}
if job.Status == "failed" || job.Status == "cancelled" {
errs[index] = fmt.Errorf("job %s %s", id, job.Status)
return
}
time.Sleep(time.Second)
}
}(i, jobID)
}
waitGroup.Wait()
for index, err := range errs {
if err != nil {
log.Fatalf("%s: %v", paths[index], err)
}
fmt.Printf("%s -> %d chars\n", paths[index], len(results[index]))
}
}
import 'dart:io';
import 'package:dio/dio.dart';
import 'package:kreuzberg_cloud_sdk/kreuzberg_cloud_sdk.dart';
Future<void> main() async {
final client = KreuzbergCloudClient(
apiKey: Platform.environment['KREUZBERG_API_KEY']!,
);
final accepted = await client.extractMultipart(
files: [
await MultipartFile.fromFile('a.pdf'),
await MultipartFile.fromFile('b.pdf'),
await MultipartFile.fromFile('c.pdf'),
],
);
final finished = await Future.wait(
accepted.jobIds.map((id) => client.waitForJob(id)),
);
for (final job in finished) {
print('${job.filename}: ${job.status}');
}
client.close();
}
# Sandbox key (reuse one from the single-file snippet, or grab a fresh one).
KREUZBERG_API_KEY=$(curl -sX POST https://api.kreuzberg.dev/v1/sandbox/key | jq -r .api_key)
HEADER="Authorization: Bearer $KREUZBERG_API_KEY"
API=https://api.kreuzberg.dev
# Submit all files in parallel; collect job IDs.
JOBS=()
for f in invoice.pdf contract.pdf scan.png; do
JOBS+=("$(curl -sX POST "$API/v1/extract" -H "$HEADER" -F "file=@$f" \
-F 'webhook={"url":""}' | jq -r '.job_ids[0]')")
done
# Poll each in parallel until complete; print path → text.
poll() {
while [ "$(curl -s "$API/v1/jobs/$1" -H "$HEADER" | jq -r .status)" != "completed" ]; do
sleep 1
done
echo "$2 → $(curl -s "$API/v1/jobs/$1" -H "$HEADER" | jq -r .result.content | head -c 80)"
}
i=0
for f in invoice.pdf contract.pdf scan.png; do
poll "${JOBS[i]}" "$f" &
i=$((i+1))
done
wait
Job statuses¶
GET /v1/jobs/{id} returns one of nine status values. Treat completed,
partial_success, failed, and cancelled as terminal — anything else
means "poll again".
status |
Meaning | result present |
|---|---|---|
awaiting_upload |
Job created, document upload not yet finalised | — |
pending |
Queued, not yet picked up | — |
processing |
Worker is extracting | — |
chunking |
Large document being split across workers | — |
aggregating |
Per-chunk results being merged | — |
completed |
Done; full result available | yes |
partial_success |
Some pages/files failed; partial result available | yes |
failed |
Unrecoverable error (error_message set) |
— |
cancelled |
Job cancelled by the project | — |
The Python SDK defaults to 1-second poll, ×2 backoff capped at 30 s,
5-minute total timeout (extract_and_wait, wait_for_job,
wait_for_jobs). The TypeScript SDK uses the same defaults in
milliseconds. Tune via the poll_interval / pollInterval, timeout, and
backoff arguments. Don't poll faster than 1 s — rate limits apply.
Where to next¶
- Stop polling, use webhooks — Webhook delivery for production workloads.
- Full REST surface — API Reference.
- Self-host instead — Kreuzberg core ships the same extraction engine as a library or Docker image.