image generation
This commit is contained in:
parent
5afc80938e
commit
be13782253
11 changed files with 1042 additions and 208 deletions
0
.codex
Normal file
0
.codex
Normal file
8
SKILL.md
8
SKILL.md
|
|
@ -24,6 +24,7 @@ Use it when the task spans several stages at once:
|
|||
Treat the local modules in this repository as specialists:
|
||||
|
||||
- [story-gen/SKILL.md](story-gen/SKILL.md): generate the story and structured scenario brief from the normalized request and available assets.
|
||||
- [image-generation/SKILL.md](image-generation/SKILL.md): generate still images from prompts through the repo-local Nano Banana helper when the heavy-assets phase needs new art or another heavy still asset instead of downloaded stills.
|
||||
- [video-generation/SKILL.md](video-generation/SKILL.md): use the existing repo-specific AI video generation pipelines when the heavy-assets phase needs provider-backed generated video, marketplace promo generation, or staged narrative generation instead of ad hoc prompting.
|
||||
- [download-images/SKILL.md](download-images/SKILL.md): fetch direct still-image assets into `assets/` for heavy-asset acquisition, cutouts, and overlays.
|
||||
- [download-youtube-segment/SKILL.md](download-youtube-segment/SKILL.md): fetch exact source ranges from YouTube.
|
||||
|
|
@ -34,6 +35,7 @@ Treat the local modules in this repository as specialists:
|
|||
Known local skills in this repo:
|
||||
|
||||
- [SKILL.md](SKILL.md): top-level media workflow orchestrator.
|
||||
- [image-generation/SKILL.md](image-generation/SKILL.md): minimal text-to-image path for generated stills and other heavy generated visual assets via `openai/gemini-2.5-flash-image`.
|
||||
- [video-generation/SKILL.md](video-generation/SKILL.md): repo-specific AI video generation pipelines for generated clips, marketplace promo runs, Telegram-bot-backed generation, and microdrama/story-adaptation work during the heavy-assets phase.
|
||||
- [download-images/SKILL.md](download-images/SKILL.md): download direct still-image assets into the local working set for heavy-assets acquisition.
|
||||
- [download-youtube-segment/SKILL.md](download-youtube-segment/SKILL.md): download YouTube segments or frames with the helper scripts in `download-youtube-segment/scripts/`.
|
||||
|
|
@ -44,6 +46,8 @@ Known local skills in this repo:
|
|||
|
||||
Routing reminders:
|
||||
|
||||
- If the heavy-assets phase specifically needs a newly generated still image or another heavy generated visual asset from a prompt, use [image-generation/SKILL.md](image-generation/SKILL.md) first instead of inventing a fresh image-generation flow.
|
||||
- If the user explicitly asks only for an AI-generated still image, including requests like `сгенерируй картинку по трендам` or other trend-based image generation, treat that as a narrow image-generation task and go directly to [image-generation/SKILL.md](image-generation/SKILL.md) or the session-level `imagegen` skill instead of forcing the full media pipeline.
|
||||
- If the heavy-assets phase specifically needs generated video clips and this repository's existing provider-backed generation workflows fit the task, use [video-generation/SKILL.md](video-generation/SKILL.md) first instead of inventing a fresh generation flow.
|
||||
- If the request is specifically about YouTube clipping, use [download-youtube-segment/SKILL.md](download-youtube-segment/SKILL.md) first instead of falling back to ad hoc commands.
|
||||
- If the request is specifically about removing the background from an image, use [remove-background/SKILL.md](remove-background/SKILL.md) first instead of ad hoc image-editing commands.
|
||||
|
|
@ -71,6 +75,8 @@ If the user gives a chaotic brief, normalize it before doing expensive work.
|
|||
|
||||
Treat requests such as `сделай видео`, `сделай мем`, `собери ролик`, or similar end-result wording as a **full pipeline request by default**, not as permission to jump straight to montage. Only skip to a narrower module when the user explicitly asks for a single stage such as `just clip this`, `only write the script`, or `only add captions`.
|
||||
|
||||
If the deliverable is only a trend-based AI-generated still image, this is a single-stage exception: go straight to [image-generation/SKILL.md](image-generation/SKILL.md) or the session-level `imagegen` skill and do not require a scenario brief or the rest of the media pipeline.
|
||||
|
||||
Before any heavy production step, the agent must create or update a structured scenario brief under [assets/](assets/), preferably `assets/scenario.json`, and then use that file as the source of truth for later steps.
|
||||
|
||||
This is a hard gate:
|
||||
|
|
@ -161,6 +167,8 @@ Prefer these paths in order:
|
|||
4. Download exact source ranges from external video links.
|
||||
5. Generate missing shots or draw new still assets only when real footage or downloadable stills do not exist or cannot achieve the needed moment.
|
||||
|
||||
If the missing asset is a generated still image or another heavy generated visual asset rather than a generated video clip, route that work through [image-generation/SKILL.md](image-generation/SKILL.md) before falling back to ad hoc API calls.
|
||||
|
||||
For source downloads:
|
||||
|
||||
- If the user gives only a concept for a helpful still image such as a logo, poster, reaction image, prop, sticker, glasses, clown wig, or clown nose, the heavy-assets phase may first use built-in web/image search to find a suitable asset, then save the direct image URL into the scenario brief and fetch it with [download-images/SKILL.md](download-images/SKILL.md).
|
||||
|
|
|
|||
|
|
@ -1,2 +0,0 @@
|
|||
Используй https://www.youtube.com/watch?v=wDkztLMNK9k.
|
||||
Сделай мем
|
||||
115
image-generation/SKILL.md
Normal file
115
image-generation/SKILL.md
Normal file
|
|
@ -0,0 +1,115 @@
|
|||
---
|
||||
name: image-generation
|
||||
description: Generate a still image from a text prompt through the repo-local Nano Banana helper. Use when the user wants a picture, cover, poster, product shot, meme still, concept art, or any other single generated image via the Lambda-compatible endpoint and `openai/gemini-2.5-flash-image`. Also use for запросы вроде `сгенерируй картинку`, `нарисуй`, `сделай обложку`, `сделай постер`, `сделай иллюстрацию`.
|
||||
---
|
||||
|
||||
# Image Generation
|
||||
|
||||
## Overview
|
||||
|
||||
Use `scripts/generate-image.py` for the narrow image-generation path in this repo.
|
||||
|
||||
This skill is intentionally small:
|
||||
|
||||
- one OpenAI-compatible endpoint by default: `https://llm.lambda.coredump.ru/v1`
|
||||
- one default model: `openai/gemini-2.5-flash-image`
|
||||
- one required prompt source: either a direct prompt or a `story-gen` scenario
|
||||
- output is saved under `output/` automatically when you do not pass a path
|
||||
|
||||
It is also the preferred narrow path when the heavy-assets phase needs a newly
|
||||
generated still image: poster, cover, product shot, concept frame, meme still,
|
||||
packshot, or any other heavy visual asset that does not come from a real source.
|
||||
|
||||
Do not use the larger `video-generation/` flows when the user only needs one
|
||||
still image.
|
||||
|
||||
## Preconditions
|
||||
|
||||
The helper auto-loads variables from:
|
||||
|
||||
- `image-generation/.env`
|
||||
- repo root `.env`
|
||||
|
||||
Shell-exported env vars still work and take priority.
|
||||
|
||||
Set an API key before running:
|
||||
|
||||
- preferred: `OPENAI_API_KEY`
|
||||
- also accepted: `LAMBDA_API_KEY` or `LAMBDA_KEY`
|
||||
|
||||
Optional env:
|
||||
|
||||
- `OPENAI_BASE_URL` to override the default endpoint
|
||||
- `NANOBANANA_MODEL` to override the default model
|
||||
- `NANOBANANA_IMAGE_SIZE` to override the default size (`1024x1024`)
|
||||
|
||||
## Quick Start
|
||||
|
||||
Minimal local setup:
|
||||
|
||||
```bash
|
||||
nano image-generation/.env
|
||||
```
|
||||
|
||||
Generate with only a prompt:
|
||||
|
||||
```bash
|
||||
python3 image-generation/scripts/generate-image.py \
|
||||
"Minimal studio photo of a yellow banana on a blue background"
|
||||
```
|
||||
|
||||
Generate to an explicit path:
|
||||
|
||||
```bash
|
||||
python3 image-generation/scripts/generate-image.py \
|
||||
"Retro sci-fi movie poster, chrome typography, neon fog" \
|
||||
--output output/retro-poster.png
|
||||
```
|
||||
|
||||
Generate at another size:
|
||||
|
||||
```bash
|
||||
python3 image-generation/scripts/generate-image.py \
|
||||
"Clean ecommerce hero shot of a beige handbag on white background" \
|
||||
--size 1536x1024
|
||||
|
||||
Consume a `story-gen` scenario directly:
|
||||
|
||||
```bash
|
||||
python3 image-generation/scripts/generate-image.py \
|
||||
--scenario assets/trend-scenario.json
|
||||
```
|
||||
|
||||
Override the reference photo from the scenario:
|
||||
|
||||
```bash
|
||||
python3 image-generation/scripts/generate-image.py \
|
||||
--scenario assets/trend-scenario.json \
|
||||
--photo assets/me-retake.jpg
|
||||
```
|
||||
```
|
||||
|
||||
## Workflow
|
||||
|
||||
1. Reduce the request to one concrete image prompt or point the helper at a `story-gen` scenario.
|
||||
2. If needed, put the key and defaults into `image-generation/.env`.
|
||||
3. Run `scripts/generate-image.py`.
|
||||
4. Let the script choose an `output/` filename unless the task needs a specific
|
||||
output path.
|
||||
5. Use the resulting local PNG as a normal asset for later editing,
|
||||
compositing, or delivery.
|
||||
|
||||
## Notes
|
||||
|
||||
- The helper writes PNG output.
|
||||
- When `--output` is a bare filename, it is saved under `output/`.
|
||||
- The script uses `/images/generations` for plain prompt generation.
|
||||
- When a reference photo is supplied directly or via `scenario.json`, it switches to `/images/edits`.
|
||||
- If the provider returns base64 image data, the script writes it directly.
|
||||
- If the provider returns a temporary image URL instead, the script downloads it
|
||||
and still saves a local PNG-like output path.
|
||||
|
||||
## Resource
|
||||
|
||||
- `scripts/generate-image.py`: minimal text-to-image helper for
|
||||
`openai/gemini-2.5-flash-image`
|
||||
4
image-generation/agents/openai.yaml
Normal file
4
image-generation/agents/openai.yaml
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
interface:
|
||||
display_name: "Image Generation"
|
||||
short_description: "Generate still images with Nano Banana"
|
||||
default_prompt: "Use $image-generation to generate a single image from a prompt with openai/gemini-2.5-flash-image."
|
||||
511
image-generation/scripts/generate-image.py
Normal file
511
image-generation/scripts/generate-image.py
Normal file
|
|
@ -0,0 +1,511 @@
|
|||
#!/usr/bin/env python3
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import binascii
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import unicodedata
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from urllib.error import HTTPError, URLError
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parents[2]
|
||||
SKILL_DIR = Path(__file__).resolve().parents[1]
|
||||
API_KEY_ENV_NAMES = ("OPENAI_API_KEY", "LAMBDA_API_KEY", "LAMBDA_KEY")
|
||||
USER_AGENT = "media-skill-image-generation/1.0"
|
||||
REQUEST_TIMEOUT_SECONDS = 300
|
||||
FALLBACK_BASE_URL = "https://llm.lambda.coredump.ru/v1"
|
||||
FALLBACK_MODEL = "openai/gemini-2.5-flash-image"
|
||||
FALLBACK_SIZE = "1024x1024"
|
||||
ENV_CANDIDATE_PATHS = (SKILL_DIR / ".env", REPO_ROOT / ".env")
|
||||
PROMPT_KEYS = ("prompt", "visual_prompt", "image_prompt", "action")
|
||||
|
||||
|
||||
def die(message: str, exit_code: int = 1) -> None:
|
||||
print(f"Error: {message}", file=sys.stderr)
|
||||
raise SystemExit(exit_code)
|
||||
|
||||
|
||||
def load_env_file(path: Path) -> None:
|
||||
for raw_line in path.read_text(encoding="utf-8").splitlines():
|
||||
line = raw_line.strip()
|
||||
if not line or line.startswith("#") or "=" not in line:
|
||||
continue
|
||||
key, value = line.split("=", 1)
|
||||
key = key.strip()
|
||||
value = value.strip()
|
||||
if not key:
|
||||
continue
|
||||
if value and value[0] == value[-1] and value[0] in {'"', "'"}:
|
||||
value = value[1:-1]
|
||||
existing = os.environ.get(key)
|
||||
if existing:
|
||||
continue
|
||||
if existing == "" and value == "":
|
||||
continue
|
||||
os.environ[key] = value
|
||||
|
||||
|
||||
def load_default_env_files() -> None:
|
||||
for path in ENV_CANDIDATE_PATHS:
|
||||
if path.is_file():
|
||||
load_env_file(path)
|
||||
|
||||
|
||||
load_default_env_files()
|
||||
|
||||
|
||||
def get_assets_dir() -> Path:
|
||||
return Path(os.getenv("MEDIA_SKILL_ASSETS_DIR", REPO_ROOT / "assets"))
|
||||
|
||||
|
||||
def get_default_output_dir() -> Path:
|
||||
return Path(os.getenv("MEDIA_SKILL_IMAGE_OUTPUT_DIR", REPO_ROOT / "output"))
|
||||
|
||||
|
||||
def get_base_url() -> str:
|
||||
return os.getenv("OPENAI_BASE_URL", FALLBACK_BASE_URL).rstrip("/")
|
||||
|
||||
|
||||
def get_default_model() -> str:
|
||||
return os.getenv("NANOBANANA_MODEL", FALLBACK_MODEL)
|
||||
|
||||
|
||||
def get_default_size() -> str:
|
||||
return os.getenv("NANOBANANA_IMAGE_SIZE", FALLBACK_SIZE)
|
||||
|
||||
|
||||
def parse_args(argv: list[str]) -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Generate or edit a still image with openai/gemini-2.5-flash-image."
|
||||
)
|
||||
parser.add_argument(
|
||||
"prompt",
|
||||
nargs="?",
|
||||
help="Text prompt for the image model. Optional when --scenario is used.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--scenario",
|
||||
help="Path to scenario JSON. Uses image_request.prompt or a scene/card prompt.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--scene",
|
||||
type=int,
|
||||
default=1,
|
||||
help="1-based scene/card index when loading a prompt from scenario. Default: 1.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--photo",
|
||||
help="Reference photo path for image edits. Overrides any path stored in the scenario.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-o",
|
||||
"--output",
|
||||
help="Output path. Bare filenames are saved under output/. Defaults to an auto-generated output path.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--size",
|
||||
default=get_default_size(),
|
||||
help=f"Requested image size. Default: {get_default_size()}",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--model",
|
||||
default=get_default_model(),
|
||||
help=f"Model id. Default: {get_default_model()}",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Print the resolved output path and request settings without calling the API.",
|
||||
)
|
||||
return parser.parse_args(argv)
|
||||
|
||||
|
||||
def get_api_key() -> str:
|
||||
for env_name in API_KEY_ENV_NAMES:
|
||||
value = os.getenv(env_name)
|
||||
if value:
|
||||
return value
|
||||
die(
|
||||
"missing API key; set OPENAI_API_KEY, LAMBDA_API_KEY, or LAMBDA_KEY before running"
|
||||
)
|
||||
|
||||
|
||||
def slugify_prompt(prompt: str, max_length: int = 40) -> str:
|
||||
normalized = unicodedata.normalize("NFKD", prompt)
|
||||
ascii_prompt = normalized.encode("ascii", "ignore").decode("ascii").lower()
|
||||
slug = re.sub(r"[^a-z0-9]+", "-", ascii_prompt).strip("-")
|
||||
return (slug[:max_length].strip("-")) or "image"
|
||||
|
||||
|
||||
def resolve_output_path(output_arg: str | None, prompt: str) -> Path:
|
||||
if not output_arg:
|
||||
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||
return get_default_output_dir() / f"nanobanana-{timestamp}-{slugify_prompt(prompt)}.png"
|
||||
|
||||
output_path = Path(output_arg)
|
||||
if not output_path.is_absolute() and len(output_path.parts) == 1:
|
||||
output_path = get_default_output_dir() / output_path.name
|
||||
if not output_path.suffix:
|
||||
output_path = output_path.with_suffix(".png")
|
||||
return output_path
|
||||
|
||||
|
||||
def ensure_parent_dir(path: Path) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
def read_json_response(response) -> dict:
|
||||
raw = response.read()
|
||||
if not raw:
|
||||
die("API returned an empty response body")
|
||||
try:
|
||||
return json.loads(raw)
|
||||
except json.JSONDecodeError as exc:
|
||||
die(f"API returned invalid JSON: {exc}")
|
||||
|
||||
|
||||
def extract_error_message(body: bytes) -> str:
|
||||
if not body:
|
||||
return "empty error body"
|
||||
try:
|
||||
parsed = json.loads(body)
|
||||
except json.JSONDecodeError:
|
||||
return body.decode("utf-8", errors="replace")
|
||||
if isinstance(parsed, dict):
|
||||
error = parsed.get("error")
|
||||
if isinstance(error, dict):
|
||||
return str(error.get("message") or parsed)
|
||||
return str(parsed)
|
||||
return str(parsed)
|
||||
|
||||
|
||||
def post_generation_request(endpoint: str, body: bytes, headers: dict[str, str]) -> dict:
|
||||
request = Request(
|
||||
endpoint,
|
||||
data=body,
|
||||
headers=headers,
|
||||
method="POST",
|
||||
)
|
||||
try:
|
||||
with urlopen(request, timeout=REQUEST_TIMEOUT_SECONDS) as response:
|
||||
return read_json_response(response)
|
||||
except HTTPError as exc:
|
||||
die(f"image generation failed with HTTP {exc.code}: {extract_error_message(exc.read())}")
|
||||
except URLError as exc:
|
||||
die(f"could not reach image API: {exc.reason}")
|
||||
|
||||
|
||||
def post_image_generation(prompt: str, model: str, size: str, api_key: str) -> dict:
|
||||
payload = {
|
||||
"model": model,
|
||||
"prompt": prompt,
|
||||
"size": size,
|
||||
}
|
||||
return post_generation_request(
|
||||
f"{get_base_url()}/images/generations",
|
||||
json.dumps(payload).encode("utf-8"),
|
||||
{
|
||||
"Authorization": f"Bearer {api_key}",
|
||||
"Content-Type": "application/json",
|
||||
"User-Agent": USER_AGENT,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def guess_mime_type(path: Path) -> str:
|
||||
suffix = path.suffix.lower()
|
||||
if suffix in {".jpg", ".jpeg", ".jfif", ".jpe"}:
|
||||
return "image/jpeg"
|
||||
if suffix == ".png":
|
||||
return "image/png"
|
||||
if suffix == ".webp":
|
||||
return "image/webp"
|
||||
return "application/octet-stream"
|
||||
|
||||
|
||||
def build_multipart_field(boundary: str, name: str, value: str) -> bytes:
|
||||
return (
|
||||
f"--{boundary}\r\n"
|
||||
f'Content-Disposition: form-data; name="{name}"\r\n\r\n'
|
||||
f"{value}\r\n"
|
||||
).encode("utf-8")
|
||||
|
||||
|
||||
def post_image_edit(prompt: str, model: str, size: str, api_key: str, photo_path: Path) -> dict:
|
||||
boundary = uuid.uuid4().hex
|
||||
photo_bytes = photo_path.read_bytes()
|
||||
filename = photo_path.stem + (photo_path.suffix or ".jpg")
|
||||
|
||||
body = (
|
||||
f"--{boundary}\r\n"
|
||||
f'Content-Disposition: form-data; name="image"; filename="{filename}"\r\n'
|
||||
f"Content-Type: {guess_mime_type(photo_path)}\r\n\r\n"
|
||||
).encode("utf-8") + photo_bytes + b"\r\n"
|
||||
body += build_multipart_field(boundary, "prompt", prompt)
|
||||
body += build_multipart_field(boundary, "model", model)
|
||||
body += build_multipart_field(boundary, "size", size)
|
||||
body += build_multipart_field(boundary, "n", "1")
|
||||
body += f"--{boundary}--\r\n".encode("utf-8")
|
||||
|
||||
return post_generation_request(
|
||||
f"{get_base_url()}/images/edits",
|
||||
body,
|
||||
{
|
||||
"Authorization": f"Bearer {api_key}",
|
||||
"Content-Type": f"multipart/form-data; boundary={boundary}",
|
||||
"User-Agent": USER_AGENT,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def download_image_from_url(url: str) -> bytes:
|
||||
request = Request(url, headers={"User-Agent": USER_AGENT})
|
||||
with urlopen(request, timeout=REQUEST_TIMEOUT_SECONDS) as response:
|
||||
body = response.read()
|
||||
if not body:
|
||||
die("provider returned an empty image download")
|
||||
return body
|
||||
|
||||
|
||||
def extract_image_bytes(response_json: dict) -> bytes:
|
||||
if isinstance(response_json.get("error"), dict):
|
||||
die(str(response_json["error"].get("message") or response_json["error"]))
|
||||
|
||||
data = response_json.get("data")
|
||||
if not isinstance(data, list) or not data:
|
||||
die("API response did not include image data")
|
||||
|
||||
first_item = data[0]
|
||||
if not isinstance(first_item, dict):
|
||||
die("API response contained an unexpected image item shape")
|
||||
|
||||
b64_json = first_item.get("b64_json")
|
||||
if b64_json:
|
||||
try:
|
||||
return base64.b64decode(b64_json)
|
||||
except (ValueError, binascii.Error) as exc:
|
||||
die(f"could not decode image payload: {exc}")
|
||||
|
||||
url = first_item.get("url")
|
||||
if url:
|
||||
return download_image_from_url(url)
|
||||
|
||||
die(f"API response did not contain b64_json or url; got keys: {sorted(first_item.keys())}")
|
||||
|
||||
|
||||
def resolve_existing_file(
|
||||
raw_path: str,
|
||||
*,
|
||||
base_dir: Path | None = None,
|
||||
label: str,
|
||||
quiet: bool = False,
|
||||
) -> Path | None:
|
||||
original = Path(raw_path).expanduser()
|
||||
candidates = [original]
|
||||
if not original.is_absolute():
|
||||
if base_dir is not None:
|
||||
candidates.append(base_dir / original)
|
||||
candidates.append(Path.cwd() / original)
|
||||
|
||||
for candidate in candidates:
|
||||
resolved = candidate.resolve()
|
||||
if resolved.is_file():
|
||||
return resolved
|
||||
|
||||
if quiet:
|
||||
return None
|
||||
die(f"{label} not found: {raw_path}")
|
||||
|
||||
|
||||
def load_json_file(path: Path) -> dict:
|
||||
try:
|
||||
payload = json.loads(path.read_text(encoding="utf-8"))
|
||||
except FileNotFoundError:
|
||||
die(f"scenario not found: {path}")
|
||||
except json.JSONDecodeError as exc:
|
||||
die(f"scenario is not valid JSON: {exc}")
|
||||
|
||||
if not isinstance(payload, dict):
|
||||
die("scenario root must be a JSON object")
|
||||
return payload
|
||||
|
||||
|
||||
def extract_prompt_from_mapping(mapping: dict) -> tuple[str | None, str | None]:
|
||||
for key in PROMPT_KEYS:
|
||||
value = mapping.get(key)
|
||||
if isinstance(value, str) and value.strip():
|
||||
return value.strip(), key
|
||||
return None, None
|
||||
|
||||
|
||||
def select_index(item_number: int, total_items: int, label: str) -> int:
|
||||
if item_number < 1:
|
||||
die(f"{label} index must be >= 1")
|
||||
index = item_number - 1
|
||||
if index >= total_items:
|
||||
die(f"{label} {item_number} is out of range; scenario has {total_items} {label}s")
|
||||
return index
|
||||
|
||||
|
||||
def extract_prompt_from_scenario(path: Path, item_number: int) -> tuple[str, str, dict]:
|
||||
payload = load_json_file(path)
|
||||
|
||||
image_request = payload.get("image_request")
|
||||
if isinstance(image_request, dict):
|
||||
prompt, key = extract_prompt_from_mapping(image_request)
|
||||
if prompt:
|
||||
return prompt, f"scenario:image_request.{key}", payload
|
||||
|
||||
scenes = payload.get("scenes")
|
||||
if isinstance(scenes, list) and scenes:
|
||||
index = select_index(item_number, len(scenes), "scene")
|
||||
scene = scenes[index]
|
||||
if not isinstance(scene, dict):
|
||||
die(f"scene {item_number} must be a JSON object")
|
||||
prompt, key = extract_prompt_from_mapping(scene)
|
||||
if prompt:
|
||||
return prompt, f"scenario:scene[{item_number}].{key}", payload
|
||||
die(f"scene {item_number} does not contain a supported prompt field")
|
||||
|
||||
cards = payload.get("cards")
|
||||
if isinstance(cards, list) and cards:
|
||||
index = select_index(item_number, len(cards), "card")
|
||||
card = cards[index]
|
||||
if not isinstance(card, dict):
|
||||
die(f"card {item_number} must be a JSON object")
|
||||
prompt, key = extract_prompt_from_mapping(card)
|
||||
if prompt:
|
||||
return prompt, f"scenario:card[{item_number}].{key}", payload
|
||||
die(f"card {item_number} does not contain a supported prompt field")
|
||||
|
||||
prompt, key = extract_prompt_from_mapping(payload)
|
||||
if prompt:
|
||||
return prompt, f"scenario:{key}", payload
|
||||
|
||||
die("scenario does not contain image_request.prompt, scene visual_prompt, or image_prompt")
|
||||
|
||||
|
||||
def scenario_requires_reference_image(payload: dict) -> bool:
|
||||
image_request = payload.get("image_request")
|
||||
if isinstance(image_request, dict):
|
||||
required = image_request.get("reference_image_required")
|
||||
if isinstance(required, bool):
|
||||
return required
|
||||
required = payload.get("reference_image_required")
|
||||
return bool(required)
|
||||
|
||||
|
||||
def resolve_reference_image(
|
||||
photo_arg: str | None,
|
||||
scenario_payload: dict | None,
|
||||
scenario_path: Path | None,
|
||||
) -> Path | None:
|
||||
if photo_arg:
|
||||
return resolve_existing_file(photo_arg, label="reference photo")
|
||||
|
||||
if not scenario_payload:
|
||||
return None
|
||||
|
||||
candidate_paths: list[str] = []
|
||||
image_request = scenario_payload.get("image_request")
|
||||
if isinstance(image_request, dict):
|
||||
reference_path = image_request.get("reference_image_path")
|
||||
if isinstance(reference_path, str) and reference_path.strip():
|
||||
candidate_paths.append(reference_path.strip())
|
||||
|
||||
top_level_reference = scenario_payload.get("reference_image_path")
|
||||
if isinstance(top_level_reference, str) and top_level_reference.strip():
|
||||
candidate_paths.append(top_level_reference.strip())
|
||||
|
||||
base_dir = scenario_path.parent if scenario_path else None
|
||||
for raw_path in candidate_paths:
|
||||
resolved = resolve_existing_file(
|
||||
raw_path,
|
||||
base_dir=base_dir,
|
||||
label="reference photo",
|
||||
quiet=True,
|
||||
)
|
||||
if resolved:
|
||||
return resolved
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def resolve_prompt_and_context(
|
||||
args: argparse.Namespace,
|
||||
) -> tuple[str, str, dict | None, Path | None]:
|
||||
scenario_payload = None
|
||||
scenario_path = None
|
||||
scenario_prompt = None
|
||||
scenario_prompt_source = None
|
||||
|
||||
if args.scenario:
|
||||
scenario_path = resolve_existing_file(args.scenario, label="scenario")
|
||||
scenario_prompt, scenario_prompt_source, scenario_payload = extract_prompt_from_scenario(
|
||||
scenario_path,
|
||||
args.scene,
|
||||
)
|
||||
|
||||
if args.prompt:
|
||||
return args.prompt, "cli:prompt", scenario_payload, scenario_path
|
||||
if scenario_prompt:
|
||||
return scenario_prompt, scenario_prompt_source, scenario_payload, scenario_path
|
||||
|
||||
die("provide a prompt or --scenario")
|
||||
|
||||
|
||||
def main(argv: list[str]) -> int:
|
||||
args = parse_args(argv)
|
||||
prompt, prompt_source, scenario_payload, scenario_path = resolve_prompt_and_context(args)
|
||||
output_path = resolve_output_path(args.output, prompt)
|
||||
|
||||
reference_photo = resolve_reference_image(args.photo, scenario_payload, scenario_path)
|
||||
reference_required = scenario_requires_reference_image(scenario_payload or {})
|
||||
if reference_required and reference_photo is None:
|
||||
die(
|
||||
"scenario requires a reference photo; pass --photo or include reference_image_path in the scenario"
|
||||
)
|
||||
|
||||
request_mode = "edit" if reference_photo else "generate"
|
||||
endpoint = f"{get_base_url()}/images/edits" if reference_photo else f"{get_base_url()}/images/generations"
|
||||
|
||||
if args.dry_run:
|
||||
print(f"endpoint={endpoint}")
|
||||
print(f"model={args.model}")
|
||||
print(f"size={args.size}")
|
||||
print(f"output={output_path}")
|
||||
print(f"prompt_source={prompt_source}")
|
||||
print(f"request_mode={request_mode}")
|
||||
if reference_photo:
|
||||
print(f"photo={reference_photo}")
|
||||
return 0
|
||||
|
||||
api_key = get_api_key()
|
||||
if reference_photo:
|
||||
response_json = post_image_edit(prompt, args.model, args.size, api_key, reference_photo)
|
||||
else:
|
||||
response_json = post_image_generation(prompt, args.model, args.size, api_key)
|
||||
image_bytes = extract_image_bytes(response_json)
|
||||
|
||||
ensure_parent_dir(output_path)
|
||||
output_path.write_bytes(image_bytes)
|
||||
|
||||
print(f"saved_to={output_path}")
|
||||
print(f"model={args.model}")
|
||||
print(f"size={args.size}")
|
||||
print(f"prompt_source={prompt_source}")
|
||||
print(f"request_mode={request_mode}")
|
||||
if reference_photo:
|
||||
print(f"photo={reference_photo}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main(sys.argv[1:]))
|
||||
|
|
@ -39,6 +39,11 @@ Needs env:
|
|||
### `--mode image` (default)
|
||||
Generates a storyboard scenario for image/visual generation. Each scene has a `visual_prompt` (English) ready for gpt-image-1.5 or veo-3.1.
|
||||
|
||||
For trend-photo asks such as anime portrait, studio headshot, USSR postcard,
|
||||
photo booth, aged self, flowers in hair, and the other curated portrait trends
|
||||
stored under `scripts/trends/`, `generate.py` can also emit a **single-image
|
||||
scenario** that downstream image generation can consume directly.
|
||||
|
||||
### `--mode video`
|
||||
Generates a full shooting script for real video production. Each scene has:
|
||||
- `timecode` — cumulative start time `HH:MM:SS`
|
||||
|
|
@ -54,11 +59,12 @@ Generates a full shooting script for real video production. Each scene has:
|
|||
| Parameter | Values | Description |
|
||||
|-----------|--------|-------------|
|
||||
| `--mode` | `image`, `video` | `image`: visual storyboard; `video`: full shooting script with voiceover |
|
||||
| `--format` | `wb_ad`, `reels`, `viral`, `long`, `postcard`, `educational`, `auto` | Video format (image mode only) |
|
||||
| `--format` | `wb_ad`, `reels`, `viral`, `long`, `postcard`, `educational`, `trend_photo`, `auto` | Video format (image mode only) |
|
||||
| `--platform` | `tiktok`, `instagram`, `wb`, `youtube`, `vk`, `auto` | Target platform |
|
||||
| `--audience` | any text | Target audience description |
|
||||
| `--duration` | seconds | Target duration |
|
||||
| `--lang` | `ru`, `en`, `de`, `auto` | Language for voiceover and captions |
|
||||
| `--photo` | filepath | Reference photo path for trend-photo scenarios |
|
||||
| `--analyze` | flag | Analyze assets before generating (image mode only) |
|
||||
| `--out` | filepath | Save JSON to file (video mode also saves `_voiceover.txt`) |
|
||||
| `--voice` | flag | After script generation, immediately run voice synthesis (video mode + `--out` required) |
|
||||
|
|
@ -94,6 +100,14 @@ python3 {baseDir}/scripts/generate.py \
|
|||
"Анекдот про программиста и кофе" \
|
||||
--format viral --platform tiktok --lang en
|
||||
|
||||
# Curated trend-photo scenario for downstream image generation
|
||||
python3 {baseDir}/scripts/generate.py \
|
||||
"Сделай меня в стиле аниме" \
|
||||
--format trend_photo --photo assets/me.jpg \
|
||||
--out assets/trend-scenario.json
|
||||
# Then hand the JSON to image-generation:
|
||||
# python3 image-generation/scripts/generate-image.py --scenario assets/trend-scenario.json
|
||||
|
||||
# Long educational video shooting script
|
||||
python3 {baseDir}/scripts/generate.py \
|
||||
"How to choose your first bicycle" \
|
||||
|
|
@ -105,7 +119,7 @@ python3 {baseDir}/scripts/generate.py \
|
|||
```json
|
||||
{
|
||||
"title": "video title",
|
||||
"format": "wb_ad|reels|viral|long|postcard|educational",
|
||||
"format": "wb_ad|reels|viral|long|postcard|educational|trend_photo",
|
||||
"platform": "tiktok|instagram|wb|youtube|vk",
|
||||
"language": "ru|en|...",
|
||||
"duration_sec": 30,
|
||||
|
|
@ -122,6 +136,11 @@ python3 {baseDir}/scripts/generate.py \
|
|||
"caption": "on-screen text in target language"
|
||||
}
|
||||
],
|
||||
"image_request": {
|
||||
"prompt": "single image prompt for downstream image-generation",
|
||||
"reference_image_required": true,
|
||||
"reference_image_path": "/abs/path/to/photo.jpg"
|
||||
},
|
||||
"storyboard_grid_prompt": "NxN storyboard grid — all scenes as one image. null if no recurring subject.",
|
||||
"music_mood": "upbeat|calm|dramatic|funny|inspirational",
|
||||
"style_notes": "overall style and delivery notes",
|
||||
|
|
@ -156,6 +175,7 @@ python3 {baseDir}/scripts/generate.py \
|
|||
|
||||
**Image mode** output feeds into:
|
||||
- `visual_prompt` → image generation (`gpt-image-1.5`) or video (`veo-3.1`)
|
||||
- `image_request.prompt` + `reference_image_path` → `image-generation/scripts/generate-image.py` for trend-photo edits
|
||||
- `voiceover` → TTS (`Pocket-TTS` or `ElevenLabs`)
|
||||
- `caption` + `duration_sec` → ffmpeg montage ([../ffmpeg-editing/SKILL.md](../ffmpeg-editing/SKILL.md))
|
||||
- Full JSON → orchestrator ([../SKILL.md](../SKILL.md))
|
||||
|
|
|
|||
|
|
@ -15,16 +15,26 @@ Usage:
|
|||
python story-gen/preview.py "идея" --scene 2
|
||||
"""
|
||||
|
||||
import sys, os, json, argparse, base64, subprocess, tempfile
|
||||
import sys, os, json, argparse, base64, subprocess
|
||||
from urllib import request
|
||||
|
||||
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ('utf-8', 'utf8'):
|
||||
sys.stdout.reconfigure(encoding='utf-8')
|
||||
|
||||
SCRIPTS_DIR = os.path.join(os.path.dirname(__file__), "scripts")
|
||||
if SCRIPTS_DIR not in sys.path:
|
||||
sys.path.insert(0, SCRIPTS_DIR)
|
||||
|
||||
from trend_catalog import expand_trend as expand_trend_prompt
|
||||
|
||||
API_URL = os.environ.get("OPENAI_BASE_URL", "https://llm.lambda.coredump.ru/v1")
|
||||
API_KEY = os.environ.get("OPENAI_API_KEY", "")
|
||||
MODEL = os.environ.get("STORY_MODEL", "qwen3.5-122b")
|
||||
IMAGE_MODEL = os.environ.get("IMAGE_MODEL", "gpt-image-1")
|
||||
OUTPUT_DIR = os.environ.get(
|
||||
"MEDIA_SKILL_IMAGE_OUTPUT_DIR",
|
||||
os.path.join(os.path.dirname(__file__), "..", "output"),
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Step 1: classify the request
|
||||
|
|
@ -91,82 +101,16 @@ def generate_scenario(user_input: str, fmt="auto", platform="auto") -> dict:
|
|||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Step 2b: trend — portrait prompt expansion (file-based + LLM fallback)
|
||||
# Step 2b: trend — portrait prompt expansion (shared catalog + LLM fallback)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
TRENDS_DIR = os.path.join(os.path.dirname(__file__), "scripts", "trends")
|
||||
|
||||
TREND_FALLBACK_PROMPT = """You are a professional prompt engineer for AI portrait image generation.
|
||||
|
||||
The user gives you a short phrase describing a visual style or trend.
|
||||
Expand it into a detailed professional portrait prompt.
|
||||
|
||||
Structure:
|
||||
Transform this photo into a human portrait. Use the uploaded photo for the face — preserve ALL features exactly: face shape, eyes, nose, lips, eyebrows, hair color, hairstyle.
|
||||
|
||||
Clothing: [detailed clothing]
|
||||
Location: [detailed background/setting]
|
||||
Pose & Action: [body position, gesture, eye direction]
|
||||
Lighting: [lighting setup and atmosphere]
|
||||
Mood: [emotional tone, keywords]
|
||||
Technical: [lens, aperture, ISO, art style, quality tags]
|
||||
|
||||
Rules:
|
||||
- Write entirely in English
|
||||
- Be very specific and detailed
|
||||
- Return ONLY the prompt text, no explanations
|
||||
"""
|
||||
|
||||
def _load_trends() -> list[dict]:
|
||||
trends = []
|
||||
if not os.path.isdir(TRENDS_DIR):
|
||||
return trends
|
||||
for fname in os.listdir(TRENDS_DIR):
|
||||
if not fname.endswith(".txt"):
|
||||
continue
|
||||
with open(os.path.join(TRENDS_DIR, fname), encoding="utf-8") as f:
|
||||
content = f.read().strip()
|
||||
lines = content.splitlines()
|
||||
keywords = []
|
||||
prompt_lines = []
|
||||
for i, line in enumerate(lines):
|
||||
if line.startswith("keywords:"):
|
||||
keywords = [k.strip().lower() for k in line[len("keywords:"):].split(",") if k.strip()]
|
||||
else:
|
||||
prompt_lines = lines[i:]
|
||||
break
|
||||
trends.append({
|
||||
"name": fname.replace(".txt", ""),
|
||||
"keywords": keywords,
|
||||
"prompt": "\n".join(prompt_lines).strip()
|
||||
})
|
||||
return trends
|
||||
|
||||
|
||||
def expand_trend(user_input: str) -> str:
|
||||
query = user_input.lower()
|
||||
for trend in _load_trends():
|
||||
if any(kw in query for kw in trend["keywords"]):
|
||||
print(f" Matched trend: {trend['name']}", file=sys.stderr)
|
||||
return trend["prompt"]
|
||||
|
||||
print(" No trend matched — using LLM to generate prompt", file=sys.stderr)
|
||||
payload = json.dumps({
|
||||
"model": MODEL,
|
||||
"messages": [
|
||||
{"role": "system", "content": TREND_FALLBACK_PROMPT},
|
||||
{"role": "user", "content": user_input}
|
||||
],
|
||||
"temperature": 0.7
|
||||
}).encode()
|
||||
req = request.Request(
|
||||
f"{API_URL}/chat/completions",
|
||||
data=payload,
|
||||
headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
|
||||
def expand_trend(user_input: str) -> tuple[str, str]:
|
||||
return expand_trend_prompt(
|
||||
user_input,
|
||||
api_url=API_URL,
|
||||
api_key=API_KEY,
|
||||
model=MODEL,
|
||||
)
|
||||
with request.urlopen(req, timeout=60) as resp:
|
||||
data = json.loads(resp.read())
|
||||
return data["choices"][0]["message"]["content"].strip()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
@ -291,7 +235,7 @@ def open_image(path: str):
|
|||
|
||||
|
||||
def save_and_open(image_bytes: bytes, label: str) -> str:
|
||||
out_dir = os.path.join(tempfile.gettempdir(), "story-gen-previews")
|
||||
out_dir = os.path.abspath(OUTPUT_DIR)
|
||||
os.makedirs(out_dir, exist_ok=True)
|
||||
safe = "".join(c if c.isalnum() or c in "-_ " else "_" for c in label)[:50].strip()
|
||||
path = os.path.join(out_dir, f"{safe}.png")
|
||||
|
|
@ -442,7 +386,8 @@ def main():
|
|||
sys.exit(1)
|
||||
else:
|
||||
print(" Expanding trend prompt...", file=sys.stderr)
|
||||
prompt = expand_trend(args.input)
|
||||
prompt, source = expand_trend(args.input)
|
||||
print(f" Prompt source: {source}", file=sys.stderr)
|
||||
print(f"\nPrompt:\n{prompt}\n", file=sys.stderr)
|
||||
image_bytes = generate_image(prompt, args.size, photo_path=args.photo)
|
||||
save_and_open(image_bytes, f"trend_{args.input[:30]}")
|
||||
|
|
|
|||
|
|
@ -9,6 +9,8 @@ import sys, os, json, argparse, subprocess
|
|||
from pathlib import Path
|
||||
from urllib import request, error
|
||||
|
||||
from trend_catalog import expand_trend, looks_like_trend_request, match_trend
|
||||
|
||||
# Fix Windows console encoding (cp1251 can't handle ₽, emoji, etc.)
|
||||
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ('utf-8', 'utf8'):
|
||||
sys.stdout.reconfigure(encoding='utf-8')
|
||||
|
|
@ -17,6 +19,7 @@ API_URL = os.environ.get("OPENAI_BASE_URL", "https://llm.lambda.coredump.ru/v1")
|
|||
API_KEY = os.environ.get("OPENAI_API_KEY", "")
|
||||
MODEL = os.environ.get("STORY_MODEL", "qwen3.5-122b")
|
||||
DEFAULT_OUTPUT_PATH = Path(__file__).resolve().parents[2] / "assets" / "scenario.json"
|
||||
DEFAULT_TREND_PLATFORM = "instagram"
|
||||
|
||||
SYSTEM_PROMPT = """You are a professional storyboard creator for image-based video production.
|
||||
|
||||
|
|
@ -52,7 +55,7 @@ Rules for storyboard_grid_prompt:
|
|||
Response format:
|
||||
{
|
||||
"title": "video title",
|
||||
"format": "wb_ad|reels|viral|long|postcard|educational",
|
||||
"format": "wb_ad|reels|viral|long|postcard|educational|trend_photo",
|
||||
"platform": "tiktok|instagram|wb|youtube|vk",
|
||||
"language": "ru|en|...",
|
||||
"duration_sec": 30,
|
||||
|
|
@ -462,6 +465,101 @@ def generate(input_text, format_hint="auto", platform="auto",
|
|||
return json.loads(content.strip())
|
||||
|
||||
|
||||
def resolve_reference_photo(photo_path: str | None) -> str | None:
|
||||
if not photo_path:
|
||||
return None
|
||||
|
||||
resolved = Path(photo_path).expanduser()
|
||||
if not resolved.is_absolute():
|
||||
resolved = (Path.cwd() / resolved).resolve()
|
||||
|
||||
if not resolved.is_file():
|
||||
print(f"Error: photo not found: {photo_path}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
return str(resolved)
|
||||
|
||||
|
||||
def build_trend_image_scenario(
|
||||
input_text: str,
|
||||
*,
|
||||
platform: str = "auto",
|
||||
audience: str = "",
|
||||
duration: int | None = None,
|
||||
lang: str = "auto",
|
||||
photo_path: str | None = None,
|
||||
) -> dict:
|
||||
try:
|
||||
prompt, prompt_source = expand_trend(
|
||||
input_text,
|
||||
api_url=API_URL,
|
||||
api_key=API_KEY,
|
||||
model=MODEL,
|
||||
)
|
||||
except RuntimeError as exc:
|
||||
print(f"Error: {exc}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
matched = match_trend(input_text)
|
||||
resolved_photo = resolve_reference_photo(photo_path)
|
||||
resolved_platform = DEFAULT_TREND_PLATFORM if platform == "auto" else platform
|
||||
resolved_language = "auto" if lang == "auto" else lang
|
||||
|
||||
trend_name = matched["name"] if matched else None
|
||||
pretty_name = (trend_name or input_text).replace("_", " ").strip()
|
||||
|
||||
asset_analysis = {
|
||||
"asset_type": "trend_photo_request",
|
||||
"extracted_info": {
|
||||
"subject": "single portrait transformation from a reference image",
|
||||
"key_features": [pretty_name] if pretty_name else [],
|
||||
"tone": "trendy",
|
||||
"existing_visuals": "user reference photo" if resolved_photo else "no local reference photo attached",
|
||||
"gaps": [] if resolved_photo else ["reference image missing for face-preserving edit"],
|
||||
},
|
||||
"recommended_format": "trend_photo",
|
||||
"recommended_platform": resolved_platform,
|
||||
"confidence": "high" if trend_name else "medium",
|
||||
}
|
||||
|
||||
return {
|
||||
"title": f"Trend portrait — {pretty_name}",
|
||||
"format": "trend_photo",
|
||||
"platform": resolved_platform,
|
||||
"language": resolved_language,
|
||||
"duration_sec": duration or 1,
|
||||
"hook": "Single AI-generated trend portrait from a reference image.",
|
||||
"target_audience": audience or "Social media users responding to AI portrait trends.",
|
||||
"content_restrictions": "Single still portrait image. Preserve the reference face exactly when a photo is supplied.",
|
||||
"scenes": [
|
||||
{
|
||||
"id": 1,
|
||||
"duration_sec": duration or 1,
|
||||
"visual_prompt": prompt,
|
||||
"visual_type": "image",
|
||||
"voiceover": "",
|
||||
"caption": "",
|
||||
}
|
||||
],
|
||||
"image_request": {
|
||||
"prompt": prompt,
|
||||
"mode": "edit" if resolved_photo else "edit_required",
|
||||
"size_hint": "1024x1024",
|
||||
"reference_image_required": True,
|
||||
"reference_image_path": resolved_photo,
|
||||
"prompt_source": prompt_source,
|
||||
"trend_name": trend_name,
|
||||
"original_request": input_text,
|
||||
},
|
||||
"reference_image_required": True,
|
||||
"reference_image_path": resolved_photo,
|
||||
"storyboard_grid_prompt": None,
|
||||
"music_mood": "inspirational",
|
||||
"style_notes": "Single-scene trend portrait scenario for downstream image generation.",
|
||||
"asset_analysis": asset_analysis,
|
||||
}
|
||||
|
||||
|
||||
def generate_video(input_text, platform="auto", audience="", duration=None, lang="auto"):
|
||||
"""Generate a full video shooting script with timecoded voiceover and action descriptions."""
|
||||
if not API_KEY:
|
||||
|
|
@ -523,12 +621,14 @@ def main():
|
|||
choices=["image", "video"],
|
||||
help="image: storyboard for image/visual generation; video: full shooting script with voiceover")
|
||||
parser.add_argument("--format", default="auto",
|
||||
choices=["auto","wb_ad","reels","viral","long","postcard","educational"])
|
||||
choices=["auto","wb_ad","reels","viral","long","postcard","educational","trend_photo"])
|
||||
parser.add_argument("--platform", default="auto",
|
||||
choices=["auto","tiktok","instagram","wb","youtube","vk"])
|
||||
parser.add_argument("--audience", default="", help="Target audience description")
|
||||
parser.add_argument("--duration", type=int, default=None, help="Target duration in seconds")
|
||||
parser.add_argument("--lang", default="auto", help="Output language: ru, en, de, auto")
|
||||
parser.add_argument("--photo", default=None,
|
||||
help="Reference photo path for trend-photo requests.")
|
||||
parser.add_argument("--analyze", action="store_true", help="Analyze assets before generating (image mode only)")
|
||||
parser.add_argument("--out", default=None, help="Save JSON output to file")
|
||||
parser.add_argument("--voice", action="store_true",
|
||||
|
|
@ -591,21 +691,39 @@ def main():
|
|||
|
||||
return
|
||||
|
||||
assets = None
|
||||
if args.analyze:
|
||||
print("Analyzing assets...", file=sys.stderr)
|
||||
assets = analyze_assets(args.input)
|
||||
print(f"Asset type: {assets.get('asset_type')} / confidence: {assets.get('confidence')}", file=sys.stderr)
|
||||
if args.format == "auto" and assets.get("recommended_format"):
|
||||
args.format = assets["recommended_format"]
|
||||
if args.platform == "auto" and assets.get("recommended_platform"):
|
||||
args.platform = assets["recommended_platform"]
|
||||
trend_mode = args.format == "trend_photo" or looks_like_trend_request(args.input)
|
||||
if trend_mode:
|
||||
print("Generating trend-photo scenario...", file=sys.stderr)
|
||||
result = build_trend_image_scenario(
|
||||
args.input,
|
||||
platform=args.platform,
|
||||
audience=args.audience,
|
||||
duration=args.duration,
|
||||
lang=args.lang,
|
||||
photo_path=args.photo,
|
||||
)
|
||||
trend_name = result.get("image_request", {}).get("trend_name")
|
||||
prompt_source = result.get("image_request", {}).get("prompt_source")
|
||||
print(f" Trend: {trend_name or 'custom llm expansion'}", file=sys.stderr)
|
||||
print(f" Prompt source: {prompt_source}", file=sys.stderr)
|
||||
if result.get("reference_image_path"):
|
||||
print(f" Reference photo: {result['reference_image_path']}", file=sys.stderr)
|
||||
else:
|
||||
assets = None
|
||||
if args.analyze:
|
||||
print("Analyzing assets...", file=sys.stderr)
|
||||
assets = analyze_assets(args.input)
|
||||
print(f"Asset type: {assets.get('asset_type')} / confidence: {assets.get('confidence')}", file=sys.stderr)
|
||||
if args.format == "auto" and assets.get("recommended_format"):
|
||||
args.format = assets["recommended_format"]
|
||||
if args.platform == "auto" and assets.get("recommended_platform"):
|
||||
args.platform = assets["recommended_platform"]
|
||||
|
||||
result = generate(args.input, args.format, args.platform,
|
||||
args.audience, args.duration, args.lang, assets)
|
||||
result = generate(args.input, args.format, args.platform,
|
||||
args.audience, args.duration, args.lang, assets)
|
||||
|
||||
if assets:
|
||||
result["asset_analysis"] = assets
|
||||
if assets:
|
||||
result["asset_analysis"] = assets
|
||||
|
||||
output = json.dumps(result, ensure_ascii=False, indent=2)
|
||||
|
||||
|
|
|
|||
|
|
@ -12,9 +12,11 @@ Usage:
|
|||
python story-gen/scripts/trend.py "vintage photo booth" --generate --size 1024x1792
|
||||
"""
|
||||
|
||||
import sys, os, json, argparse, base64, subprocess, tempfile
|
||||
import sys, os, json, argparse, base64, subprocess
|
||||
from urllib import request
|
||||
|
||||
from trend_catalog import expand_trend
|
||||
|
||||
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ('utf-8', 'utf8'):
|
||||
sys.stdout.reconfigure(encoding='utf-8')
|
||||
|
||||
|
|
@ -22,116 +24,10 @@ API_URL = os.environ.get("OPENAI_BASE_URL", "https://llm.lambda.coredump.ru/
|
|||
API_KEY = os.environ.get("OPENAI_API_KEY", "")
|
||||
MODEL = os.environ.get("STORY_MODEL", "qwen3.5-122b")
|
||||
IMAGE_MODEL = os.environ.get("IMAGE_MODEL", "gpt-image-1")
|
||||
|
||||
TRENDS_DIR = os.path.join(os.path.dirname(__file__), "trends")
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Trend matching
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def load_trends() -> list[dict]:
|
||||
"""Load all trend files. Returns list of {name, keywords, prompt}."""
|
||||
trends = []
|
||||
if not os.path.isdir(TRENDS_DIR):
|
||||
return trends
|
||||
for fname in os.listdir(TRENDS_DIR):
|
||||
if not fname.endswith(".txt"):
|
||||
continue
|
||||
path = os.path.join(TRENDS_DIR, fname)
|
||||
with open(path, encoding="utf-8") as f:
|
||||
content = f.read().strip()
|
||||
# First line: "keywords: word1, word2, ..."
|
||||
lines = content.splitlines()
|
||||
keywords = []
|
||||
prompt_lines = []
|
||||
for i, line in enumerate(lines):
|
||||
if line.startswith("keywords:"):
|
||||
raw = line[len("keywords:"):].strip()
|
||||
keywords = [k.strip().lower() for k in raw.split(",") if k.strip()]
|
||||
else:
|
||||
prompt_lines = lines[i:]
|
||||
break
|
||||
prompt = "\n".join(prompt_lines).strip()
|
||||
trends.append({
|
||||
"name": fname.replace(".txt", ""),
|
||||
"keywords": keywords,
|
||||
"prompt": prompt
|
||||
})
|
||||
return trends
|
||||
|
||||
|
||||
def match_trend(user_input: str, trends: list[dict]) -> dict | None:
|
||||
"""Return matched trend dict or None if no match."""
|
||||
query = user_input.lower()
|
||||
for trend in trends:
|
||||
for kw in trend["keywords"]:
|
||||
if kw in query:
|
||||
return trend
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# LLM fallback for unknown trends
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
SYSTEM_PROMPT = """You are a professional prompt engineer for AI portrait image generation.
|
||||
|
||||
The user gives you a short casual phrase describing a visual style or trend.
|
||||
Your job: expand it into a detailed, professional portrait prompt.
|
||||
|
||||
Always structure the output as follows:
|
||||
|
||||
Transform this photo into a human portrait. Use the uploaded photo for the face — preserve ALL features exactly: face shape, eyes, nose, lips, eyebrows, hair color, hairstyle.
|
||||
|
||||
Clothing: [detailed clothing description]
|
||||
|
||||
Location: [detailed background/setting description]
|
||||
|
||||
Pose & Action: [body position, gesture, eye direction]
|
||||
|
||||
Lighting: [lighting setup and atmosphere]
|
||||
|
||||
Mood: [emotional tone, keywords]
|
||||
|
||||
Technical: [camera lens, aperture, ISO, art style, quality tags]
|
||||
|
||||
Rules:
|
||||
- Write entirely in English
|
||||
- Be very specific and detailed in every section
|
||||
- The prompt must be ready to paste directly into an image generation model
|
||||
- Return ONLY the prompt text, no explanations, no markdown
|
||||
"""
|
||||
|
||||
|
||||
def expand_trend_llm(user_input: str) -> str:
|
||||
payload = json.dumps({
|
||||
"model": MODEL,
|
||||
"messages": [
|
||||
{"role": "system", "content": SYSTEM_PROMPT},
|
||||
{"role": "user", "content": user_input}
|
||||
],
|
||||
"temperature": 0.7
|
||||
}).encode()
|
||||
req = request.Request(
|
||||
f"{API_URL}/chat/completions",
|
||||
data=payload,
|
||||
headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
|
||||
)
|
||||
with request.urlopen(req, timeout=60) as resp:
|
||||
data = json.loads(resp.read())
|
||||
return data["choices"][0]["message"]["content"].strip()
|
||||
|
||||
|
||||
def expand_trend(user_input: str) -> tuple[str, str]:
|
||||
"""Returns (prompt, source) where source is trend name or 'llm'."""
|
||||
trends = load_trends()
|
||||
matched = match_trend(user_input, trends)
|
||||
if matched:
|
||||
print(f" Matched trend: {matched['name']}", file=sys.stderr)
|
||||
return matched["prompt"], matched["name"]
|
||||
print(" No trend matched — using LLM to generate prompt", file=sys.stderr)
|
||||
return expand_trend_llm(user_input), "llm"
|
||||
|
||||
OUTPUT_DIR = os.environ.get(
|
||||
"MEDIA_SKILL_IMAGE_OUTPUT_DIR",
|
||||
os.path.join(os.path.dirname(__file__), "..", "..", "output"),
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Image generation
|
||||
|
|
@ -181,14 +77,20 @@ def main():
|
|||
sys.exit(1)
|
||||
|
||||
print(f"Processing trend request: {args.input!r}", file=sys.stderr)
|
||||
prompt, _ = expand_trend(args.input)
|
||||
prompt, source = expand_trend(
|
||||
args.input,
|
||||
api_url=API_URL,
|
||||
api_key=API_KEY,
|
||||
model=MODEL,
|
||||
)
|
||||
print(f" Prompt source: {source}", file=sys.stderr)
|
||||
|
||||
print(prompt)
|
||||
|
||||
if args.generate:
|
||||
image_bytes = generate_image(prompt, args.size)
|
||||
|
||||
out_dir = os.path.join(tempfile.gettempdir(), "story-gen-previews")
|
||||
out_dir = os.path.abspath(OUTPUT_DIR)
|
||||
os.makedirs(out_dir, exist_ok=True)
|
||||
safe = "".join(c if c.isalnum() or c in "-_ " else "_" for c in args.input)[:40]
|
||||
path = os.path.join(out_dir, f"trend_{safe}.png")
|
||||
|
|
|
|||
213
story-gen/scripts/trend_catalog.py
Normal file
213
story-gen/scripts/trend_catalog.py
Normal file
|
|
@ -0,0 +1,213 @@
|
|||
#!/usr/bin/env python3
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from functools import lru_cache
|
||||
from pathlib import Path
|
||||
from urllib import request
|
||||
|
||||
DEFAULT_API_URL = "https://llm.lambda.coredump.ru/v1"
|
||||
DEFAULT_MODEL = "qwen3.5-122b"
|
||||
TRENDS_DIR = Path(__file__).resolve().parent / "trends"
|
||||
|
||||
# Conservative auto-routing: only obvious "trend photo" asks should bypass the
|
||||
# generic storyboard generator. Known trend keywords alone are not enough.
|
||||
TREND_REQUEST_HINTS = (
|
||||
"trend photo",
|
||||
"trending photo",
|
||||
"portrait trend",
|
||||
"viral portrait",
|
||||
"viral photo",
|
||||
"ai photo of me",
|
||||
"transform my photo",
|
||||
"use my photo",
|
||||
"upload photo",
|
||||
"uploaded photo",
|
||||
"trend portrait",
|
||||
"трендовая фотка",
|
||||
"трендовая фото",
|
||||
"трендовую фотку",
|
||||
"трендовый портрет",
|
||||
"сделай меня",
|
||||
"по моей фотке",
|
||||
"по моему фото",
|
||||
"из моей фотки",
|
||||
"из моего фото",
|
||||
"используй мое фото",
|
||||
"используй мою фотку",
|
||||
)
|
||||
|
||||
REFERENCE_IMAGE_HINTS = (
|
||||
"my photo",
|
||||
"my face",
|
||||
"my selfie",
|
||||
"my portrait",
|
||||
"me ",
|
||||
" me",
|
||||
"portrait",
|
||||
"selfie",
|
||||
"photo of me",
|
||||
"use photo",
|
||||
"use my",
|
||||
"transform",
|
||||
"upload",
|
||||
"мое фото",
|
||||
"моя фотка",
|
||||
"мое лицо",
|
||||
"моё фото",
|
||||
"моё лицо",
|
||||
"по фото",
|
||||
"по фотке",
|
||||
"портрет",
|
||||
"селфи",
|
||||
"меня",
|
||||
)
|
||||
|
||||
SYSTEM_PROMPT = """You are a professional prompt engineer for AI portrait image generation.
|
||||
|
||||
The user gives you a short casual phrase describing a visual style or trend.
|
||||
Your job: expand it into a detailed, professional portrait prompt.
|
||||
|
||||
Always structure the output as follows:
|
||||
|
||||
Transform this photo into a human portrait. Use the uploaded photo for the face — preserve ALL features exactly: face shape, eyes, nose, lips, eyebrows, hair color, hairstyle.
|
||||
|
||||
Clothing: [detailed clothing description]
|
||||
|
||||
Location: [detailed background/setting description]
|
||||
|
||||
Pose & Action: [body position, gesture, eye direction]
|
||||
|
||||
Lighting: [lighting setup and atmosphere]
|
||||
|
||||
Mood: [emotional tone, keywords]
|
||||
|
||||
Technical: [camera lens, aperture, ISO, art style, quality tags]
|
||||
|
||||
Rules:
|
||||
- Write entirely in English
|
||||
- Be very specific and detailed in every section
|
||||
- The prompt must be ready to paste directly into an image generation model
|
||||
- Return ONLY the prompt text, no explanations, no markdown
|
||||
"""
|
||||
|
||||
|
||||
def _normalize_query(value: str) -> str:
|
||||
return value.lower().strip()
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def load_trends() -> tuple[dict, ...]:
|
||||
trends: list[dict] = []
|
||||
if not TRENDS_DIR.is_dir():
|
||||
return tuple()
|
||||
|
||||
for path in sorted(TRENDS_DIR.glob("*.txt")):
|
||||
content = path.read_text(encoding="utf-8").strip()
|
||||
lines = content.splitlines()
|
||||
keywords: list[str] = []
|
||||
prompt_lines: list[str] = []
|
||||
for index, line in enumerate(lines):
|
||||
if line.startswith("keywords:"):
|
||||
raw_keywords = line[len("keywords:"):].strip()
|
||||
keywords = [
|
||||
keyword.strip().lower()
|
||||
for keyword in raw_keywords.split(",")
|
||||
if keyword.strip()
|
||||
]
|
||||
else:
|
||||
prompt_lines = lines[index:]
|
||||
break
|
||||
|
||||
trends.append(
|
||||
{
|
||||
"name": path.stem,
|
||||
"path": str(path),
|
||||
"keywords": keywords,
|
||||
"prompt": "\n".join(prompt_lines).strip(),
|
||||
}
|
||||
)
|
||||
|
||||
return tuple(trends)
|
||||
|
||||
|
||||
def match_trend(user_input: str, trends: tuple[dict, ...] | None = None) -> dict | None:
|
||||
query = _normalize_query(user_input)
|
||||
catalog = trends if trends is not None else load_trends()
|
||||
for trend in catalog:
|
||||
for keyword in trend["keywords"]:
|
||||
if keyword and keyword in query:
|
||||
return trend
|
||||
return None
|
||||
|
||||
|
||||
def looks_like_trend_request(user_input: str) -> bool:
|
||||
query = _normalize_query(user_input)
|
||||
if any(hint in query for hint in TREND_REQUEST_HINTS):
|
||||
return True
|
||||
|
||||
matched = match_trend(query)
|
||||
if not matched:
|
||||
return False
|
||||
|
||||
return any(hint in query for hint in REFERENCE_IMAGE_HINTS)
|
||||
|
||||
|
||||
def expand_trend_llm(
|
||||
user_input: str,
|
||||
*,
|
||||
api_url: str | None = None,
|
||||
api_key: str | None = None,
|
||||
model: str | None = None,
|
||||
) -> str:
|
||||
resolved_api_url = (api_url or os.environ.get("OPENAI_BASE_URL") or DEFAULT_API_URL).rstrip("/")
|
||||
resolved_api_key = api_key or os.environ.get("OPENAI_API_KEY", "")
|
||||
resolved_model = model or os.environ.get("STORY_MODEL") or DEFAULT_MODEL
|
||||
|
||||
if not resolved_api_key:
|
||||
raise RuntimeError("OPENAI_API_KEY not set; cannot expand an unknown trend via LLM")
|
||||
|
||||
payload = json.dumps(
|
||||
{
|
||||
"model": resolved_model,
|
||||
"messages": [
|
||||
{"role": "system", "content": SYSTEM_PROMPT},
|
||||
{"role": "user", "content": user_input},
|
||||
],
|
||||
"temperature": 0.7,
|
||||
}
|
||||
).encode()
|
||||
|
||||
req = request.Request(
|
||||
f"{resolved_api_url}/chat/completions",
|
||||
data=payload,
|
||||
headers={
|
||||
"Authorization": f"Bearer {resolved_api_key}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
with request.urlopen(req, timeout=60) as resp:
|
||||
data = json.loads(resp.read())
|
||||
return data["choices"][0]["message"]["content"].strip()
|
||||
|
||||
|
||||
def expand_trend(
|
||||
user_input: str,
|
||||
*,
|
||||
api_url: str | None = None,
|
||||
api_key: str | None = None,
|
||||
model: str | None = None,
|
||||
) -> tuple[str, str]:
|
||||
matched = match_trend(user_input)
|
||||
if matched:
|
||||
return matched["prompt"], matched["name"]
|
||||
return (
|
||||
expand_trend_llm(
|
||||
user_input,
|
||||
api_url=api_url,
|
||||
api_key=api_key,
|
||||
model=model,
|
||||
),
|
||||
"llm",
|
||||
)
|
||||
Loading…
Add table
Add a link
Reference in a new issue