PhotoGPT Developers

Shared API helpers, model creation, training-image upload, model training, and generation helpers.

Setup and Training

Complete this page before running any use-case recipe. It gives you a trained modelId and shared helpers for later image, upscale, and video generation calls.

Training polling

Model training normally takes 20 to 25 minutes. Poll GET /models/{modelID} every 5 minutes while training is running. Avoid tight polling loops.

Shared setup

Use the same auth headers and request helper across every example.

const API_KEY = '<YOUR_API_KEY>'
const BASE_URL = 'https://developer.photogptai.com/api'

const headers = {
  Authorization: `Bearer ${API_KEY}`,
  'API-Version': '1',
}

function sleep(ms) {
  return new Promise((resolve) => setTimeout(resolve, ms))
}

async function requestJson(path, options = {}) {
  const response = await fetch(`${BASE_URL}${path}`, options)

  if (!response.ok) {
    throw new Error(await response.text())
  }

  return response.json()
}
import json
import time
from pathlib import Path

import requests

API_KEY = "<YOUR_API_KEY>"
BASE_URL = "https://developer.photogptai.com/api"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "API-Version": "1",
}


def request_json(method: str, path: str, **kwargs):
    response = requests.request(method, f"{BASE_URL}{path}", timeout=120, **kwargs)
    response.raise_for_status()
    return response.json()

Create the model

The model stores identity metadata and gives you a modelID for uploads, training, and trained-model generation. Use one model per identity.

const modelPayload = {
  name: 'UGC Creator',
  age: 27,
  gender: 'Female',
  ethnicity: 'south asian',
  eyeColor: 'brown',
}

const createModelBody = await requestJson('/models', {
  method: 'POST',
  headers: {
    ...headers,
    'Content-Type': 'application/json',
  },
  body: JSON.stringify(modelPayload),
})

const modelId = createModelBody.result.id
model_payload = {
    "name": "UGC Creator",
    "age": 27,
    "gender": "Female",
    "ethnicity": "south asian",
    "eyeColor": "brown",
}

create_model_body = request_json("POST", "/models", headers=headers, json=model_payload)
model_id = create_model_body["result"]["id"]

Upload training images

Upload 12 to 20 high-quality images with type: "modelInput". Use varied angles, lighting, and expressions. Avoid duplicates, heavy filters, sunglasses, heavy occlusion, and images where the subject is too small.

import { readdir, readFile } from 'node:fs/promises'
import { basename, extname, join } from 'node:path'

async function uploadModelInputImage(modelId, imagePath) {
  const bytes = await readFile(imagePath)
  const file = new File([bytes], basename(imagePath))
  const formData = new FormData()

  formData.append('file', file)
  formData.append(
    'data',
    JSON.stringify({
      modelID: modelId,
      type: 'modelInput',
    })
  )

  const body = await requestJson('/images/upload', {
    method: 'POST',
    headers,
    body: formData,
  })

  return body.result
}

const supportedExtensions = new Set(['.jpg', '.jpeg', '.png', '.webp', '.heic'])
const trainingImageDir = './training/ugc-creator'
const trainingImagePaths = (await readdir(trainingImageDir))
  .filter((file) => supportedExtensions.has(extname(file).toLowerCase()))
  .map((file) => join(trainingImageDir, file))
  .slice(0, 20)

if (trainingImagePaths.length < 12) {
  throw new Error('Upload at least 12 modelInput images before training.')
}

for (const imagePath of trainingImagePaths) {
  await uploadModelInputImage(modelId, imagePath)
}
def upload_model_input_image(model_id: str, image_path: Path):
    payload = {
        "modelID": model_id,
        "type": "modelInput",
    }

    with image_path.open("rb") as image_file:
        files = {
            "file": (image_path.name, image_file),
            "data": (None, json.dumps(payload), "application/json"),
        }
        body = request_json("POST", "/images/upload", headers=headers, files=files)

    return body["result"]


training_image_dir = Path("./training/ugc-creator")
supported_extensions = {".jpg", ".jpeg", ".png", ".webp", ".heic"}
training_image_paths = [
    path
    for path in training_image_dir.iterdir()
    if path.suffix.lower() in supported_extensions
][:20]

if len(training_image_paths) < 12:
    raise RuntimeError("Upload at least 12 modelInput images before training.")

for image_path in training_image_paths:
    upload_model_input_image(model_id, image_path)

Trigger training

The train endpoint starts the process and returns a success response. It does not return a generation jobId.

await requestJson(`/models/${modelId}/train`, {
  headers,
})
request_json("GET", f"/models/{model_id}/train", headers=headers)

Wait for readiness

Poll the model status every 5 minutes. When status becomes ready, the model can be used with POST /images/generation.

const TRAINING_POLL_INTERVAL_MS = 5 * 60 * 1000
const TRAINING_TIMEOUT_MS = 35 * 60 * 1000

async function waitForModelReady(modelId) {
  const startedAt = Date.now()

  while (true) {
    const body = await requestJson(`/models/${modelId}`, {
      headers,
    })
    const model = body.result
    const status = String(model.status ?? '').toLowerCase()

    if (status === 'ready') {
      return model
    }

    if (status === 'deleted') {
      throw new Error(`Model cannot be used for generation: ${status}`)
    }

    if (Date.now() - startedAt > TRAINING_TIMEOUT_MS) {
      throw new Error('Training is still running. Check the model status again later.')
    }

    await sleep(TRAINING_POLL_INTERVAL_MS)
  }
}

await waitForModelReady(modelId)
TRAINING_POLL_INTERVAL_SECONDS = 5 * 60
TRAINING_TIMEOUT_SECONDS = 35 * 60


def wait_for_model_ready(model_id: str):
    started_at = time.monotonic()

    while True:
        body = request_json("GET", f"/models/{model_id}", headers=headers)
        model = body["result"]
        status = str(model.get("status", "")).lower()

        if status == "ready":
            return model

        if status == "deleted":
            raise RuntimeError(f"Model cannot be used for generation: {status}")

        if time.monotonic() - started_at > TRAINING_TIMEOUT_SECONDS:
            raise RuntimeError("Training is still running. Check the model status again later.")

        time.sleep(TRAINING_POLL_INTERVAL_SECONDS)


wait_for_model_ready(model_id)

Generation helpers

The use-case pages reuse these helpers to start generation jobs, wait for completion, and extract the first generated image.

async function triggerImageGeneration(payload) {
  const body = await requestJson('/images/generation', {
    method: 'POST',
    headers: {
      ...headers,
      'Content-Type': 'application/json',
    },
    body: JSON.stringify(payload),
  })

  return body.result.jobId
}

async function triggerImageUpscaling(payload) {
  const body = await requestJson('/images/upscaling', {
    method: 'POST',
    headers: {
      ...headers,
      'Content-Type': 'application/json',
    },
    body: JSON.stringify(payload),
  })

  return body.result.jobId
}

async function triggerVideoGeneration(payload) {
  const body = await requestJson('/videos/generation', {
    method: 'POST',
    headers: {
      ...headers,
      'Content-Type': 'application/json',
    },
    body: JSON.stringify(payload),
  })

  return body.result.jobId
}

async function waitForJob(jobId, intervalMs = 5000) {
  const runningStatuses = new Set(['created', 'queued', 'running'])

  while (true) {
    const body = await requestJson(`/jobs/${jobId}`, {
      headers,
    })
    const job = body.result
    const status = String(job.status ?? '').toLowerCase()

    if (status === 'success') {
      return job
    }

    if (!runningStatuses.has(status)) {
      throw new Error(job.error ?? `Job failed: ${job.status}`)
    }

    await sleep(intervalMs)
  }
}

function firstImage(job, label) {
  const image = job.images?.[0]

  if (!image?.id || !image?.url) {
    throw new Error(`${label} did not return an image.`)
  }

  return image
}
def trigger_image_generation(payload: dict) -> str:
    body = request_json("POST", "/images/generation", headers=headers, json=payload)
    return body["result"]["jobId"]


def trigger_image_upscaling(payload: dict) -> str:
    body = request_json("POST", "/images/upscaling", headers=headers, json=payload)
    return body["result"]["jobId"]


def trigger_video_generation(payload: dict) -> str:
    body = request_json("POST", "/videos/generation", headers=headers, json=payload)
    return body["result"]["jobId"]


def wait_for_job(job_id: str, interval_seconds: int = 5):
    running_statuses = {"created", "queued", "running"}

    while True:
        body = request_json("GET", f"/jobs/{job_id}", headers=headers)
        job = body["result"]
        status = str(job.get("status", "")).lower()

        if status == "success":
            return job

        if status not in running_statuses:
            raise RuntimeError(job.get("error") or f"Job failed: {job.get('status')}")

        time.sleep(interval_seconds)


def first_image(job: dict, label: str):
    images = job.get("images", [])

    if not images or not images[0].get("id") or not images[0].get("url"):
        raise RuntimeError(f"{label} did not return an image.")

    return images[0]

Next steps

On this page