Shared API helpers, model creation, training-image upload, model training, and generation helpers.
Setup and Training
Complete this page before running any use-case recipe. It gives you a trained modelId and shared
helpers for later image, upscale, and video generation calls.
Training polling
Model training normally takes 20 to 25 minutes. Poll GET /models/{modelID} every 5 minutes while
training is running. Avoid tight polling loops.
Shared setup
Use the same auth headers and request helper across every example.
const API_KEY = '<YOUR_API_KEY>'
const BASE_URL = 'https://developer.photogptai.com/api'
const headers = {
Authorization: `Bearer ${API_KEY}`,
'API-Version': '1',
}
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
async function requestJson(path, options = {}) {
const response = await fetch(`${BASE_URL}${path}`, options)
if (!response.ok) {
throw new Error(await response.text())
}
return response.json()
}import json
import time
from pathlib import Path
import requests
API_KEY = "<YOUR_API_KEY>"
BASE_URL = "https://developer.photogptai.com/api"
headers = {
"Authorization": f"Bearer {API_KEY}",
"API-Version": "1",
}
def request_json(method: str, path: str, **kwargs):
response = requests.request(method, f"{BASE_URL}{path}", timeout=120, **kwargs)
response.raise_for_status()
return response.json()Create the model
The model stores identity metadata and gives you a modelID for uploads, training, and
trained-model generation. Use one model per identity.
const modelPayload = {
name: 'UGC Creator',
age: 27,
gender: 'Female',
ethnicity: 'south asian',
eyeColor: 'brown',
}
const createModelBody = await requestJson('/models', {
method: 'POST',
headers: {
...headers,
'Content-Type': 'application/json',
},
body: JSON.stringify(modelPayload),
})
const modelId = createModelBody.result.idmodel_payload = {
"name": "UGC Creator",
"age": 27,
"gender": "Female",
"ethnicity": "south asian",
"eyeColor": "brown",
}
create_model_body = request_json("POST", "/models", headers=headers, json=model_payload)
model_id = create_model_body["result"]["id"]Upload training images
Upload 12 to 20 high-quality images with type: "modelInput". Use varied angles, lighting, and
expressions. Avoid duplicates, heavy filters, sunglasses, heavy occlusion, and images where the
subject is too small.
import { readdir, readFile } from 'node:fs/promises'
import { basename, extname, join } from 'node:path'
async function uploadModelInputImage(modelId, imagePath) {
const bytes = await readFile(imagePath)
const file = new File([bytes], basename(imagePath))
const formData = new FormData()
formData.append('file', file)
formData.append(
'data',
JSON.stringify({
modelID: modelId,
type: 'modelInput',
})
)
const body = await requestJson('/images/upload', {
method: 'POST',
headers,
body: formData,
})
return body.result
}
const supportedExtensions = new Set(['.jpg', '.jpeg', '.png', '.webp', '.heic'])
const trainingImageDir = './training/ugc-creator'
const trainingImagePaths = (await readdir(trainingImageDir))
.filter((file) => supportedExtensions.has(extname(file).toLowerCase()))
.map((file) => join(trainingImageDir, file))
.slice(0, 20)
if (trainingImagePaths.length < 12) {
throw new Error('Upload at least 12 modelInput images before training.')
}
for (const imagePath of trainingImagePaths) {
await uploadModelInputImage(modelId, imagePath)
}def upload_model_input_image(model_id: str, image_path: Path):
payload = {
"modelID": model_id,
"type": "modelInput",
}
with image_path.open("rb") as image_file:
files = {
"file": (image_path.name, image_file),
"data": (None, json.dumps(payload), "application/json"),
}
body = request_json("POST", "/images/upload", headers=headers, files=files)
return body["result"]
training_image_dir = Path("./training/ugc-creator")
supported_extensions = {".jpg", ".jpeg", ".png", ".webp", ".heic"}
training_image_paths = [
path
for path in training_image_dir.iterdir()
if path.suffix.lower() in supported_extensions
][:20]
if len(training_image_paths) < 12:
raise RuntimeError("Upload at least 12 modelInput images before training.")
for image_path in training_image_paths:
upload_model_input_image(model_id, image_path)Trigger training
The train endpoint starts the process and returns a success response. It does not return a
generation jobId.
await requestJson(`/models/${modelId}/train`, {
headers,
})request_json("GET", f"/models/{model_id}/train", headers=headers)Wait for readiness
Poll the model status every 5 minutes. When status becomes ready, the model can be used with
POST /images/generation.
const TRAINING_POLL_INTERVAL_MS = 5 * 60 * 1000
const TRAINING_TIMEOUT_MS = 35 * 60 * 1000
async function waitForModelReady(modelId) {
const startedAt = Date.now()
while (true) {
const body = await requestJson(`/models/${modelId}`, {
headers,
})
const model = body.result
const status = String(model.status ?? '').toLowerCase()
if (status === 'ready') {
return model
}
if (status === 'deleted') {
throw new Error(`Model cannot be used for generation: ${status}`)
}
if (Date.now() - startedAt > TRAINING_TIMEOUT_MS) {
throw new Error('Training is still running. Check the model status again later.')
}
await sleep(TRAINING_POLL_INTERVAL_MS)
}
}
await waitForModelReady(modelId)TRAINING_POLL_INTERVAL_SECONDS = 5 * 60
TRAINING_TIMEOUT_SECONDS = 35 * 60
def wait_for_model_ready(model_id: str):
started_at = time.monotonic()
while True:
body = request_json("GET", f"/models/{model_id}", headers=headers)
model = body["result"]
status = str(model.get("status", "")).lower()
if status == "ready":
return model
if status == "deleted":
raise RuntimeError(f"Model cannot be used for generation: {status}")
if time.monotonic() - started_at > TRAINING_TIMEOUT_SECONDS:
raise RuntimeError("Training is still running. Check the model status again later.")
time.sleep(TRAINING_POLL_INTERVAL_SECONDS)
wait_for_model_ready(model_id)Generation helpers
The use-case pages reuse these helpers to start generation jobs, wait for completion, and extract the first generated image.
async function triggerImageGeneration(payload) {
const body = await requestJson('/images/generation', {
method: 'POST',
headers: {
...headers,
'Content-Type': 'application/json',
},
body: JSON.stringify(payload),
})
return body.result.jobId
}
async function triggerImageUpscaling(payload) {
const body = await requestJson('/images/upscaling', {
method: 'POST',
headers: {
...headers,
'Content-Type': 'application/json',
},
body: JSON.stringify(payload),
})
return body.result.jobId
}
async function triggerVideoGeneration(payload) {
const body = await requestJson('/videos/generation', {
method: 'POST',
headers: {
...headers,
'Content-Type': 'application/json',
},
body: JSON.stringify(payload),
})
return body.result.jobId
}
async function waitForJob(jobId, intervalMs = 5000) {
const runningStatuses = new Set(['created', 'queued', 'running'])
while (true) {
const body = await requestJson(`/jobs/${jobId}`, {
headers,
})
const job = body.result
const status = String(job.status ?? '').toLowerCase()
if (status === 'success') {
return job
}
if (!runningStatuses.has(status)) {
throw new Error(job.error ?? `Job failed: ${job.status}`)
}
await sleep(intervalMs)
}
}
function firstImage(job, label) {
const image = job.images?.[0]
if (!image?.id || !image?.url) {
throw new Error(`${label} did not return an image.`)
}
return image
}def trigger_image_generation(payload: dict) -> str:
body = request_json("POST", "/images/generation", headers=headers, json=payload)
return body["result"]["jobId"]
def trigger_image_upscaling(payload: dict) -> str:
body = request_json("POST", "/images/upscaling", headers=headers, json=payload)
return body["result"]["jobId"]
def trigger_video_generation(payload: dict) -> str:
body = request_json("POST", "/videos/generation", headers=headers, json=payload)
return body["result"]["jobId"]
def wait_for_job(job_id: str, interval_seconds: int = 5):
running_statuses = {"created", "queued", "running"}
while True:
body = request_json("GET", f"/jobs/{job_id}", headers=headers)
job = body["result"]
status = str(job.get("status", "")).lower()
if status == "success":
return job
if status not in running_statuses:
raise RuntimeError(job.get("error") or f"Job failed: {job.get('status')}")
time.sleep(interval_seconds)
def first_image(job: dict, label: str):
images = job.get("images", [])
if not images or not images[0].get("id") or not images[0].get("url"):
raise RuntimeError(f"{label} did not return an image.")
return images[0]