fix merge conflict in trend.py and broken trend_catalog import in generate.py

This commit is contained in:
AMEfremova 2026-04-06 23:25:45 +03:00
commit 2f9e312f33
14 changed files with 1087 additions and 179 deletions

0
.codex Normal file
View file

1
.gitignore vendored
View file

@ -34,3 +34,4 @@ assets/*
download-images/
output/
tests/
.venv-video-generation/

View file

@ -24,6 +24,7 @@ Use it when the task spans several stages at once:
Treat the local modules in this repository as specialists:
- [story-gen/SKILL.md](story-gen/SKILL.md): generate the story and structured scenario brief from the normalized request and available assets.
- [image-generation/SKILL.md](image-generation/SKILL.md): generate still images from prompts through the repo-local Nano Banana helper when the heavy-assets phase needs new art or another heavy still asset instead of downloaded stills.
- [video-generation/SKILL.md](video-generation/SKILL.md): use the existing repo-specific AI video generation pipelines when the heavy-assets phase needs provider-backed generated video, marketplace promo generation, or staged narrative generation instead of ad hoc prompting.
- [download-images/SKILL.md](download-images/SKILL.md): fetch direct still-image assets into `assets/` for heavy-asset acquisition, cutouts, and overlays.
- [download-youtube-segment/SKILL.md](download-youtube-segment/SKILL.md): fetch exact source ranges from YouTube.
@ -34,6 +35,7 @@ Treat the local modules in this repository as specialists:
Known local skills in this repo:
- [SKILL.md](SKILL.md): top-level media workflow orchestrator.
- [image-generation/SKILL.md](image-generation/SKILL.md): minimal text-to-image path for generated stills and other heavy generated visual assets via `openai/gemini-2.5-flash-image`.
- [video-generation/SKILL.md](video-generation/SKILL.md): repo-specific AI video generation pipelines for generated clips, marketplace promo runs, Telegram-bot-backed generation, and microdrama/story-adaptation work during the heavy-assets phase.
- [download-images/SKILL.md](download-images/SKILL.md): download direct still-image assets into the local working set for heavy-assets acquisition.
- [download-youtube-segment/SKILL.md](download-youtube-segment/SKILL.md): download YouTube segments or frames with the helper scripts in `download-youtube-segment/scripts/`.
@ -44,6 +46,8 @@ Known local skills in this repo:
Routing reminders:
- If the heavy-assets phase specifically needs a newly generated still image or another heavy generated visual asset from a prompt, use [image-generation/SKILL.md](image-generation/SKILL.md) first instead of inventing a fresh image-generation flow.
- If the user explicitly asks only for an AI-generated still image, including requests like `сгенерируй картинку по трендам` or other trend-based image generation, treat that as a narrow image-generation task and go directly to [image-generation/SKILL.md](image-generation/SKILL.md) or the session-level `imagegen` skill instead of forcing the full media pipeline.
- If the heavy-assets phase specifically needs generated video clips and this repository's existing provider-backed generation workflows fit the task, use [video-generation/SKILL.md](video-generation/SKILL.md) first instead of inventing a fresh generation flow.
- If the request is specifically about YouTube clipping, use [download-youtube-segment/SKILL.md](download-youtube-segment/SKILL.md) first instead of falling back to ad hoc commands.
- If the request is specifically about removing the background from an image, use [remove-background/SKILL.md](remove-background/SKILL.md) first instead of ad hoc image-editing commands.
@ -71,6 +75,8 @@ If the user gives a chaotic brief, normalize it before doing expensive work.
Treat requests such as `сделай видео`, `сделай мем`, `собери ролик`, or similar end-result wording as a **full pipeline request by default**, not as permission to jump straight to montage. Only skip to a narrower module when the user explicitly asks for a single stage such as `just clip this`, `only write the script`, or `only add captions`.
If the deliverable is only a trend-based AI-generated still image, this is a single-stage exception: go straight to [image-generation/SKILL.md](image-generation/SKILL.md) or the session-level `imagegen` skill and do not require a scenario brief or the rest of the media pipeline.
Before any heavy production step, the agent must create or update a structured scenario brief under [assets/](assets/), preferably `assets/scenario.json`, and then use that file as the source of truth for later steps.
This is a hard gate:
@ -161,6 +167,8 @@ Prefer these paths in order:
4. Download exact source ranges from external video links.
5. Generate missing shots or draw new still assets only when real footage or downloadable stills do not exist or cannot achieve the needed moment.
If the missing asset is a generated still image or another heavy generated visual asset rather than a generated video clip, route that work through [image-generation/SKILL.md](image-generation/SKILL.md) before falling back to ad hoc API calls.
For source downloads:
- If the user gives only a concept for a helpful still image such as a logo, poster, reaction image, prop, sticker, glasses, clown wig, or clown nose, the heavy-assets phase may first use built-in web/image search to find a suitable asset, then save the direct image URL into the scenario brief and fetch it with [download-images/SKILL.md](download-images/SKILL.md).

View file

@ -1,2 +0,0 @@
Используй https://www.youtube.com/watch?v=wDkztLMNK9k.
Сделай мем

115
image-generation/SKILL.md Normal file
View file

@ -0,0 +1,115 @@
---
name: image-generation
description: Generate a still image from a text prompt through the repo-local Nano Banana helper. Use when the user wants a picture, cover, poster, product shot, meme still, concept art, or any other single generated image via the Lambda-compatible endpoint and `openai/gemini-2.5-flash-image`. Also use for запросы вроде `сгенерируй картинку`, `нарисуй`, `сделай обложку`, `сделай постер`, `сделай иллюстрацию`.
---
# Image Generation
## Overview
Use `scripts/generate-image.py` for the narrow image-generation path in this repo.
This skill is intentionally small:
- one OpenAI-compatible endpoint by default: `https://llm.lambda.coredump.ru/v1`
- one default model: `openai/gemini-2.5-flash-image`
- one required prompt source: either a direct prompt or a `story-gen` scenario
- output is saved under `output/` automatically when you do not pass a path
It is also the preferred narrow path when the heavy-assets phase needs a newly
generated still image: poster, cover, product shot, concept frame, meme still,
packshot, or any other heavy visual asset that does not come from a real source.
Do not use the larger `video-generation/` flows when the user only needs one
still image.
## Preconditions
The helper auto-loads variables from:
- `image-generation/.env`
- repo root `.env`
Shell-exported env vars still work and take priority.
Set an API key before running:
- preferred: `OPENAI_API_KEY`
- also accepted: `LAMBDA_API_KEY` or `LAMBDA_KEY`
Optional env:
- `OPENAI_BASE_URL` to override the default endpoint
- `NANOBANANA_MODEL` to override the default model
- `NANOBANANA_IMAGE_SIZE` to override the default size (`1024x1024`)
## Quick Start
Minimal local setup:
```bash
nano image-generation/.env
```
Generate with only a prompt:
```bash
python3 image-generation/scripts/generate-image.py \
"Minimal studio photo of a yellow banana on a blue background"
```
Generate to an explicit path:
```bash
python3 image-generation/scripts/generate-image.py \
"Retro sci-fi movie poster, chrome typography, neon fog" \
--output output/retro-poster.png
```
Generate at another size:
```bash
python3 image-generation/scripts/generate-image.py \
"Clean ecommerce hero shot of a beige handbag on white background" \
--size 1536x1024
Consume a `story-gen` scenario directly:
```bash
python3 image-generation/scripts/generate-image.py \
--scenario assets/trend-scenario.json
```
Override the reference photo from the scenario:
```bash
python3 image-generation/scripts/generate-image.py \
--scenario assets/trend-scenario.json \
--photo assets/me-retake.jpg
```
```
## Workflow
1. Reduce the request to one concrete image prompt or point the helper at a `story-gen` scenario.
2. If needed, put the key and defaults into `image-generation/.env`.
3. Run `scripts/generate-image.py`.
4. Let the script choose an `output/` filename unless the task needs a specific
output path.
5. Use the resulting local PNG as a normal asset for later editing,
compositing, or delivery.
## Notes
- The helper writes PNG output.
- When `--output` is a bare filename, it is saved under `output/`.
- The script uses `/images/generations` for plain prompt generation.
- When a reference photo is supplied directly or via `scenario.json`, it switches to `/images/edits`.
- If the provider returns base64 image data, the script writes it directly.
- If the provider returns a temporary image URL instead, the script downloads it
and still saves a local PNG-like output path.
## Resource
- `scripts/generate-image.py`: minimal text-to-image helper for
`openai/gemini-2.5-flash-image`

View file

@ -0,0 +1,4 @@
interface:
display_name: "Image Generation"
short_description: "Generate still images with Nano Banana"
default_prompt: "Use $image-generation to generate a single image from a prompt with openai/gemini-2.5-flash-image."

View file

@ -0,0 +1,511 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import base64
import binascii
import json
import os
import re
import sys
import unicodedata
import uuid
from datetime import datetime
from pathlib import Path
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
REPO_ROOT = Path(__file__).resolve().parents[2]
SKILL_DIR = Path(__file__).resolve().parents[1]
API_KEY_ENV_NAMES = ("OPENAI_API_KEY", "LAMBDA_API_KEY", "LAMBDA_KEY")
USER_AGENT = "media-skill-image-generation/1.0"
REQUEST_TIMEOUT_SECONDS = 300
FALLBACK_BASE_URL = "https://llm.lambda.coredump.ru/v1"
FALLBACK_MODEL = "openai/gemini-2.5-flash-image"
FALLBACK_SIZE = "1024x1024"
ENV_CANDIDATE_PATHS = (SKILL_DIR / ".env", REPO_ROOT / ".env")
PROMPT_KEYS = ("prompt", "visual_prompt", "image_prompt", "action")
def die(message: str, exit_code: int = 1) -> None:
print(f"Error: {message}", file=sys.stderr)
raise SystemExit(exit_code)
def load_env_file(path: Path) -> None:
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip()
if not key:
continue
if value and value[0] == value[-1] and value[0] in {'"', "'"}:
value = value[1:-1]
existing = os.environ.get(key)
if existing:
continue
if existing == "" and value == "":
continue
os.environ[key] = value
def load_default_env_files() -> None:
for path in ENV_CANDIDATE_PATHS:
if path.is_file():
load_env_file(path)
load_default_env_files()
def get_assets_dir() -> Path:
return Path(os.getenv("MEDIA_SKILL_ASSETS_DIR", REPO_ROOT / "assets"))
def get_default_output_dir() -> Path:
return Path(os.getenv("MEDIA_SKILL_IMAGE_OUTPUT_DIR", REPO_ROOT / "output"))
def get_base_url() -> str:
return os.getenv("OPENAI_BASE_URL", FALLBACK_BASE_URL).rstrip("/")
def get_default_model() -> str:
return os.getenv("NANOBANANA_MODEL", FALLBACK_MODEL)
def get_default_size() -> str:
return os.getenv("NANOBANANA_IMAGE_SIZE", FALLBACK_SIZE)
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Generate or edit a still image with openai/gemini-2.5-flash-image."
)
parser.add_argument(
"prompt",
nargs="?",
help="Text prompt for the image model. Optional when --scenario is used.",
)
parser.add_argument(
"--scenario",
help="Path to scenario JSON. Uses image_request.prompt or a scene/card prompt.",
)
parser.add_argument(
"--scene",
type=int,
default=1,
help="1-based scene/card index when loading a prompt from scenario. Default: 1.",
)
parser.add_argument(
"--photo",
help="Reference photo path for image edits. Overrides any path stored in the scenario.",
)
parser.add_argument(
"-o",
"--output",
help="Output path. Bare filenames are saved under output/. Defaults to an auto-generated output path.",
)
parser.add_argument(
"--size",
default=get_default_size(),
help=f"Requested image size. Default: {get_default_size()}",
)
parser.add_argument(
"--model",
default=get_default_model(),
help=f"Model id. Default: {get_default_model()}",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print the resolved output path and request settings without calling the API.",
)
return parser.parse_args(argv)
def get_api_key() -> str:
for env_name in API_KEY_ENV_NAMES:
value = os.getenv(env_name)
if value:
return value
die(
"missing API key; set OPENAI_API_KEY, LAMBDA_API_KEY, or LAMBDA_KEY before running"
)
def slugify_prompt(prompt: str, max_length: int = 40) -> str:
normalized = unicodedata.normalize("NFKD", prompt)
ascii_prompt = normalized.encode("ascii", "ignore").decode("ascii").lower()
slug = re.sub(r"[^a-z0-9]+", "-", ascii_prompt).strip("-")
return (slug[:max_length].strip("-")) or "image"
def resolve_output_path(output_arg: str | None, prompt: str) -> Path:
if not output_arg:
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
return get_default_output_dir() / f"nanobanana-{timestamp}-{slugify_prompt(prompt)}.png"
output_path = Path(output_arg)
if not output_path.is_absolute() and len(output_path.parts) == 1:
output_path = get_default_output_dir() / output_path.name
if not output_path.suffix:
output_path = output_path.with_suffix(".png")
return output_path
def ensure_parent_dir(path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
def read_json_response(response) -> dict:
raw = response.read()
if not raw:
die("API returned an empty response body")
try:
return json.loads(raw)
except json.JSONDecodeError as exc:
die(f"API returned invalid JSON: {exc}")
def extract_error_message(body: bytes) -> str:
if not body:
return "empty error body"
try:
parsed = json.loads(body)
except json.JSONDecodeError:
return body.decode("utf-8", errors="replace")
if isinstance(parsed, dict):
error = parsed.get("error")
if isinstance(error, dict):
return str(error.get("message") or parsed)
return str(parsed)
return str(parsed)
def post_generation_request(endpoint: str, body: bytes, headers: dict[str, str]) -> dict:
request = Request(
endpoint,
data=body,
headers=headers,
method="POST",
)
try:
with urlopen(request, timeout=REQUEST_TIMEOUT_SECONDS) as response:
return read_json_response(response)
except HTTPError as exc:
die(f"image generation failed with HTTP {exc.code}: {extract_error_message(exc.read())}")
except URLError as exc:
die(f"could not reach image API: {exc.reason}")
def post_image_generation(prompt: str, model: str, size: str, api_key: str) -> dict:
payload = {
"model": model,
"prompt": prompt,
"size": size,
}
return post_generation_request(
f"{get_base_url()}/images/generations",
json.dumps(payload).encode("utf-8"),
{
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"User-Agent": USER_AGENT,
},
)
def guess_mime_type(path: Path) -> str:
suffix = path.suffix.lower()
if suffix in {".jpg", ".jpeg", ".jfif", ".jpe"}:
return "image/jpeg"
if suffix == ".png":
return "image/png"
if suffix == ".webp":
return "image/webp"
return "application/octet-stream"
def build_multipart_field(boundary: str, name: str, value: str) -> bytes:
return (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="{name}"\r\n\r\n'
f"{value}\r\n"
).encode("utf-8")
def post_image_edit(prompt: str, model: str, size: str, api_key: str, photo_path: Path) -> dict:
boundary = uuid.uuid4().hex
photo_bytes = photo_path.read_bytes()
filename = photo_path.stem + (photo_path.suffix or ".jpg")
body = (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="image"; filename="{filename}"\r\n'
f"Content-Type: {guess_mime_type(photo_path)}\r\n\r\n"
).encode("utf-8") + photo_bytes + b"\r\n"
body += build_multipart_field(boundary, "prompt", prompt)
body += build_multipart_field(boundary, "model", model)
body += build_multipart_field(boundary, "size", size)
body += build_multipart_field(boundary, "n", "1")
body += f"--{boundary}--\r\n".encode("utf-8")
return post_generation_request(
f"{get_base_url()}/images/edits",
body,
{
"Authorization": f"Bearer {api_key}",
"Content-Type": f"multipart/form-data; boundary={boundary}",
"User-Agent": USER_AGENT,
},
)
def download_image_from_url(url: str) -> bytes:
request = Request(url, headers={"User-Agent": USER_AGENT})
with urlopen(request, timeout=REQUEST_TIMEOUT_SECONDS) as response:
body = response.read()
if not body:
die("provider returned an empty image download")
return body
def extract_image_bytes(response_json: dict) -> bytes:
if isinstance(response_json.get("error"), dict):
die(str(response_json["error"].get("message") or response_json["error"]))
data = response_json.get("data")
if not isinstance(data, list) or not data:
die("API response did not include image data")
first_item = data[0]
if not isinstance(first_item, dict):
die("API response contained an unexpected image item shape")
b64_json = first_item.get("b64_json")
if b64_json:
try:
return base64.b64decode(b64_json)
except (ValueError, binascii.Error) as exc:
die(f"could not decode image payload: {exc}")
url = first_item.get("url")
if url:
return download_image_from_url(url)
die(f"API response did not contain b64_json or url; got keys: {sorted(first_item.keys())}")
def resolve_existing_file(
raw_path: str,
*,
base_dir: Path | None = None,
label: str,
quiet: bool = False,
) -> Path | None:
original = Path(raw_path).expanduser()
candidates = [original]
if not original.is_absolute():
if base_dir is not None:
candidates.append(base_dir / original)
candidates.append(Path.cwd() / original)
for candidate in candidates:
resolved = candidate.resolve()
if resolved.is_file():
return resolved
if quiet:
return None
die(f"{label} not found: {raw_path}")
def load_json_file(path: Path) -> dict:
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except FileNotFoundError:
die(f"scenario not found: {path}")
except json.JSONDecodeError as exc:
die(f"scenario is not valid JSON: {exc}")
if not isinstance(payload, dict):
die("scenario root must be a JSON object")
return payload
def extract_prompt_from_mapping(mapping: dict) -> tuple[str | None, str | None]:
for key in PROMPT_KEYS:
value = mapping.get(key)
if isinstance(value, str) and value.strip():
return value.strip(), key
return None, None
def select_index(item_number: int, total_items: int, label: str) -> int:
if item_number < 1:
die(f"{label} index must be >= 1")
index = item_number - 1
if index >= total_items:
die(f"{label} {item_number} is out of range; scenario has {total_items} {label}s")
return index
def extract_prompt_from_scenario(path: Path, item_number: int) -> tuple[str, str, dict]:
payload = load_json_file(path)
image_request = payload.get("image_request")
if isinstance(image_request, dict):
prompt, key = extract_prompt_from_mapping(image_request)
if prompt:
return prompt, f"scenario:image_request.{key}", payload
scenes = payload.get("scenes")
if isinstance(scenes, list) and scenes:
index = select_index(item_number, len(scenes), "scene")
scene = scenes[index]
if not isinstance(scene, dict):
die(f"scene {item_number} must be a JSON object")
prompt, key = extract_prompt_from_mapping(scene)
if prompt:
return prompt, f"scenario:scene[{item_number}].{key}", payload
die(f"scene {item_number} does not contain a supported prompt field")
cards = payload.get("cards")
if isinstance(cards, list) and cards:
index = select_index(item_number, len(cards), "card")
card = cards[index]
if not isinstance(card, dict):
die(f"card {item_number} must be a JSON object")
prompt, key = extract_prompt_from_mapping(card)
if prompt:
return prompt, f"scenario:card[{item_number}].{key}", payload
die(f"card {item_number} does not contain a supported prompt field")
prompt, key = extract_prompt_from_mapping(payload)
if prompt:
return prompt, f"scenario:{key}", payload
die("scenario does not contain image_request.prompt, scene visual_prompt, or image_prompt")
def scenario_requires_reference_image(payload: dict) -> bool:
image_request = payload.get("image_request")
if isinstance(image_request, dict):
required = image_request.get("reference_image_required")
if isinstance(required, bool):
return required
required = payload.get("reference_image_required")
return bool(required)
def resolve_reference_image(
photo_arg: str | None,
scenario_payload: dict | None,
scenario_path: Path | None,
) -> Path | None:
if photo_arg:
return resolve_existing_file(photo_arg, label="reference photo")
if not scenario_payload:
return None
candidate_paths: list[str] = []
image_request = scenario_payload.get("image_request")
if isinstance(image_request, dict):
reference_path = image_request.get("reference_image_path")
if isinstance(reference_path, str) and reference_path.strip():
candidate_paths.append(reference_path.strip())
top_level_reference = scenario_payload.get("reference_image_path")
if isinstance(top_level_reference, str) and top_level_reference.strip():
candidate_paths.append(top_level_reference.strip())
base_dir = scenario_path.parent if scenario_path else None
for raw_path in candidate_paths:
resolved = resolve_existing_file(
raw_path,
base_dir=base_dir,
label="reference photo",
quiet=True,
)
if resolved:
return resolved
return None
def resolve_prompt_and_context(
args: argparse.Namespace,
) -> tuple[str, str, dict | None, Path | None]:
scenario_payload = None
scenario_path = None
scenario_prompt = None
scenario_prompt_source = None
if args.scenario:
scenario_path = resolve_existing_file(args.scenario, label="scenario")
scenario_prompt, scenario_prompt_source, scenario_payload = extract_prompt_from_scenario(
scenario_path,
args.scene,
)
if args.prompt:
return args.prompt, "cli:prompt", scenario_payload, scenario_path
if scenario_prompt:
return scenario_prompt, scenario_prompt_source, scenario_payload, scenario_path
die("provide a prompt or --scenario")
def main(argv: list[str]) -> int:
args = parse_args(argv)
prompt, prompt_source, scenario_payload, scenario_path = resolve_prompt_and_context(args)
output_path = resolve_output_path(args.output, prompt)
reference_photo = resolve_reference_image(args.photo, scenario_payload, scenario_path)
reference_required = scenario_requires_reference_image(scenario_payload or {})
if reference_required and reference_photo is None:
die(
"scenario requires a reference photo; pass --photo or include reference_image_path in the scenario"
)
request_mode = "edit" if reference_photo else "generate"
endpoint = f"{get_base_url()}/images/edits" if reference_photo else f"{get_base_url()}/images/generations"
if args.dry_run:
print(f"endpoint={endpoint}")
print(f"model={args.model}")
print(f"size={args.size}")
print(f"output={output_path}")
print(f"prompt_source={prompt_source}")
print(f"request_mode={request_mode}")
if reference_photo:
print(f"photo={reference_photo}")
return 0
api_key = get_api_key()
if reference_photo:
response_json = post_image_edit(prompt, args.model, args.size, api_key, reference_photo)
else:
response_json = post_image_generation(prompt, args.model, args.size, api_key)
image_bytes = extract_image_bytes(response_json)
ensure_parent_dir(output_path)
output_path.write_bytes(image_bytes)
print(f"saved_to={output_path}")
print(f"model={args.model}")
print(f"size={args.size}")
print(f"prompt_source={prompt_source}")
print(f"request_mode={request_mode}")
if reference_photo:
print(f"photo={reference_photo}")
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))

View file

@ -39,6 +39,11 @@ Needs env:
### `--mode image` (default)
Generates a storyboard scenario for image/visual generation. Each scene has a `visual_prompt` (English) ready for gpt-image-1.5 or veo-3.1.
For trend-photo asks such as anime portrait, studio headshot, USSR postcard,
photo booth, aged self, flowers in hair, and the other curated portrait trends
stored under `scripts/trends/`, `generate.py` can also emit a **single-image
scenario** that downstream image generation can consume directly.
### `--mode video`
Generates a full shooting script for real video production. Each scene has:
- `timecode` — cumulative start time `HH:MM:SS`
@ -54,11 +59,12 @@ Generates a full shooting script for real video production. Each scene has:
| Parameter | Values | Description |
|-----------|--------|-------------|
| `--mode` | `image`, `video` | `image`: visual storyboard; `video`: full shooting script with voiceover |
| `--format` | `wb_ad`, `reels`, `viral`, `long`, `postcard`, `educational`, `auto` | Video format (image mode only) |
| `--format` | `wb_ad`, `reels`, `viral`, `long`, `postcard`, `educational`, `trend_photo`, `auto` | Video format (image mode only) |
| `--platform` | `tiktok`, `instagram`, `wb`, `youtube`, `vk`, `auto` | Target platform |
| `--audience` | any text | Target audience description |
| `--duration` | seconds | Target duration |
| `--lang` | `ru`, `en`, `de`, `auto` | Language for voiceover and captions |
| `--photo` | filepath | Reference photo path for trend-photo scenarios |
| `--analyze` | flag | Analyze assets before generating (image mode only) |
| `--out` | filepath | Save JSON to file (video mode also saves `_voiceover.txt`) |
| `--voice` | flag | After script generation, immediately run voice synthesis (video mode + `--out` required) |
@ -94,6 +100,14 @@ python3 {baseDir}/scripts/generate.py \
"Анекдот про программиста и кофе" \
--format viral --platform tiktok --lang en
# Curated trend-photo scenario for downstream image generation
python3 {baseDir}/scripts/generate.py \
"Сделай меня в стиле аниме" \
--format trend_photo --photo assets/me.jpg \
--out assets/trend-scenario.json
# Then hand the JSON to image-generation:
# python3 image-generation/scripts/generate-image.py --scenario assets/trend-scenario.json
# Long educational video shooting script
python3 {baseDir}/scripts/generate.py \
"How to choose your first bicycle" \
@ -105,7 +119,7 @@ python3 {baseDir}/scripts/generate.py \
```json
{
"title": "video title",
"format": "wb_ad|reels|viral|long|postcard|educational",
"format": "wb_ad|reels|viral|long|postcard|educational|trend_photo",
"platform": "tiktok|instagram|wb|youtube|vk",
"language": "ru|en|...",
"duration_sec": 30,
@ -122,6 +136,11 @@ python3 {baseDir}/scripts/generate.py \
"caption": "on-screen text in target language"
}
],
"image_request": {
"prompt": "single image prompt for downstream image-generation",
"reference_image_required": true,
"reference_image_path": "/abs/path/to/photo.jpg"
},
"storyboard_grid_prompt": "NxN storyboard grid — all scenes as one image. null if no recurring subject.",
"music_mood": "upbeat|calm|dramatic|funny|inspirational",
"style_notes": "overall style and delivery notes",
@ -156,6 +175,7 @@ python3 {baseDir}/scripts/generate.py \
**Image mode** output feeds into:
- `visual_prompt` → image generation (`gpt-image-1.5`) or video (`veo-3.1`)
- `image_request.prompt` + `reference_image_path``image-generation/scripts/generate-image.py` for trend-photo edits
- `voiceover` → TTS (`Pocket-TTS` or `ElevenLabs`)
- `caption` + `duration_sec` → ffmpeg montage ([../ffmpeg-editing/SKILL.md](../ffmpeg-editing/SKILL.md))
- Full JSON → orchestrator ([../SKILL.md](../SKILL.md))

View file

@ -15,16 +15,26 @@ Usage:
python story-gen/preview.py "идея" --scene 2
"""
import sys, os, json, argparse, base64, subprocess, tempfile
import sys, os, json, argparse, base64, subprocess
from urllib import request
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ('utf-8', 'utf8'):
sys.stdout.reconfigure(encoding='utf-8')
SCRIPTS_DIR = os.path.join(os.path.dirname(__file__), "scripts")
if SCRIPTS_DIR not in sys.path:
sys.path.insert(0, SCRIPTS_DIR)
from trend_catalog import expand_trend as expand_trend_prompt
API_URL = os.environ.get("OPENAI_BASE_URL", "https://llm.lambda.coredump.ru/v1")
API_KEY = os.environ.get("OPENAI_API_KEY", "")
MODEL = os.environ.get("STORY_MODEL", "qwen3.5-122b")
IMAGE_MODEL = os.environ.get("IMAGE_MODEL", "gpt-image-1")
OUTPUT_DIR = os.environ.get(
"MEDIA_SKILL_IMAGE_OUTPUT_DIR",
os.path.join(os.path.dirname(__file__), "..", "output"),
)
# ---------------------------------------------------------------------------
# Step 1: classify the request
@ -91,82 +101,16 @@ def generate_scenario(user_input: str, fmt="auto", platform="auto") -> dict:
# ---------------------------------------------------------------------------
# Step 2b: trend — portrait prompt expansion (file-based + LLM fallback)
# Step 2b: trend — portrait prompt expansion (shared catalog + LLM fallback)
# ---------------------------------------------------------------------------
TRENDS_DIR = os.path.join(os.path.dirname(__file__), "scripts", "trends")
TREND_FALLBACK_PROMPT = """You are a professional prompt engineer for AI portrait image generation.
The user gives you a short phrase describing a visual style or trend.
Expand it into a detailed professional portrait prompt.
Structure:
Transform this photo into a human portrait. Use the uploaded photo for the face preserve ALL features exactly: face shape, eyes, nose, lips, eyebrows, hair color, hairstyle.
Clothing: [detailed clothing]
Location: [detailed background/setting]
Pose & Action: [body position, gesture, eye direction]
Lighting: [lighting setup and atmosphere]
Mood: [emotional tone, keywords]
Technical: [lens, aperture, ISO, art style, quality tags]
Rules:
- Write entirely in English
- Be very specific and detailed
- Return ONLY the prompt text, no explanations
"""
def _load_trends() -> list[dict]:
trends = []
if not os.path.isdir(TRENDS_DIR):
return trends
for fname in os.listdir(TRENDS_DIR):
if not fname.endswith(".txt"):
continue
with open(os.path.join(TRENDS_DIR, fname), encoding="utf-8") as f:
content = f.read().strip()
lines = content.splitlines()
keywords = []
prompt_lines = []
for i, line in enumerate(lines):
if line.startswith("keywords:"):
keywords = [k.strip().lower() for k in line[len("keywords:"):].split(",") if k.strip()]
else:
prompt_lines = lines[i:]
break
trends.append({
"name": fname.replace(".txt", ""),
"keywords": keywords,
"prompt": "\n".join(prompt_lines).strip()
})
return trends
def expand_trend(user_input: str) -> str:
query = user_input.lower()
for trend in _load_trends():
if any(kw in query for kw in trend["keywords"]):
print(f" Matched trend: {trend['name']}", file=sys.stderr)
return trend["prompt"]
print(" No trend matched — using LLM to generate prompt", file=sys.stderr)
payload = json.dumps({
"model": MODEL,
"messages": [
{"role": "system", "content": TREND_FALLBACK_PROMPT},
{"role": "user", "content": user_input}
],
"temperature": 0.7
}).encode()
req = request.Request(
f"{API_URL}/chat/completions",
data=payload,
headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
def expand_trend(user_input: str) -> tuple[str, str]:
return expand_trend_prompt(
user_input,
api_url=API_URL,
api_key=API_KEY,
model=MODEL,
)
with request.urlopen(req, timeout=60) as resp:
data = json.loads(resp.read())
return data["choices"][0]["message"]["content"].strip()
# ---------------------------------------------------------------------------
@ -291,7 +235,7 @@ def open_image(path: str):
def save_and_open(image_bytes: bytes, label: str) -> str:
out_dir = os.path.join(tempfile.gettempdir(), "story-gen-previews")
out_dir = os.path.abspath(OUTPUT_DIR)
os.makedirs(out_dir, exist_ok=True)
safe = "".join(c if c.isalnum() or c in "-_ " else "_" for c in label)[:50].strip()
path = os.path.join(out_dir, f"{safe}.png")
@ -442,7 +386,8 @@ def main():
sys.exit(1)
else:
print(" Expanding trend prompt...", file=sys.stderr)
prompt = expand_trend(args.input)
prompt, source = expand_trend(args.input)
print(f" Prompt source: {source}", file=sys.stderr)
print(f"\nPrompt:\n{prompt}\n", file=sys.stderr)
image_bytes = generate_image(prompt, args.size, photo_path=args.photo)
save_and_open(image_bytes, f"trend_{args.input[:30]}")

View file

@ -10,6 +10,8 @@ from pathlib import Path
from urllib import request, error
import _env as _ # loads .env from repo root
from trend import expand_trend, load_trends, match_trend
# Fix Windows console encoding (cp1251 can't handle ₽, emoji, etc.)
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ('utf-8', 'utf8'):
sys.stdout.reconfigure(encoding='utf-8')
@ -18,6 +20,7 @@ API_URL = os.environ.get("OPENAI_BASE_URL", "https://llm.lambda.coredump.ru/v1")
API_KEY = os.environ.get("OPENAI_API_KEY", "")
MODEL = os.environ.get("STORY_MODEL", "qwen3.5-122b")
DEFAULT_OUTPUT_PATH = Path(__file__).resolve().parents[2] / "assets" / "scenario.json"
DEFAULT_TREND_PLATFORM = "instagram"
SYSTEM_PROMPT = """You are a professional storyboard creator for image-based video production.
@ -53,7 +56,7 @@ Rules for storyboard_grid_prompt:
Response format:
{
"title": "video title",
"format": "wb_ad|reels|viral|long|postcard|educational",
"format": "wb_ad|reels|viral|long|postcard|educational|trend_photo",
"platform": "tiktok|instagram|wb|youtube|vk",
"language": "ru|en|...",
"duration_sec": 30,
@ -463,6 +466,101 @@ def generate(input_text, format_hint="auto", platform="auto",
return json.loads(content.strip())
def resolve_reference_photo(photo_path: str | None) -> str | None:
if not photo_path:
return None
resolved = Path(photo_path).expanduser()
if not resolved.is_absolute():
resolved = (Path.cwd() / resolved).resolve()
if not resolved.is_file():
print(f"Error: photo not found: {photo_path}", file=sys.stderr)
sys.exit(1)
return str(resolved)
def build_trend_image_scenario(
input_text: str,
*,
platform: str = "auto",
audience: str = "",
duration: int | None = None,
lang: str = "auto",
photo_path: str | None = None,
) -> dict:
try:
prompt, prompt_source = expand_trend(
input_text,
api_url=API_URL,
api_key=API_KEY,
model=MODEL,
)
except RuntimeError as exc:
print(f"Error: {exc}", file=sys.stderr)
sys.exit(1)
matched = match_trend(input_text)
resolved_photo = resolve_reference_photo(photo_path)
resolved_platform = DEFAULT_TREND_PLATFORM if platform == "auto" else platform
resolved_language = "auto" if lang == "auto" else lang
trend_name = matched["name"] if matched else None
pretty_name = (trend_name or input_text).replace("_", " ").strip()
asset_analysis = {
"asset_type": "trend_photo_request",
"extracted_info": {
"subject": "single portrait transformation from a reference image",
"key_features": [pretty_name] if pretty_name else [],
"tone": "trendy",
"existing_visuals": "user reference photo" if resolved_photo else "no local reference photo attached",
"gaps": [] if resolved_photo else ["reference image missing for face-preserving edit"],
},
"recommended_format": "trend_photo",
"recommended_platform": resolved_platform,
"confidence": "high" if trend_name else "medium",
}
return {
"title": f"Trend portrait — {pretty_name}",
"format": "trend_photo",
"platform": resolved_platform,
"language": resolved_language,
"duration_sec": duration or 1,
"hook": "Single AI-generated trend portrait from a reference image.",
"target_audience": audience or "Social media users responding to AI portrait trends.",
"content_restrictions": "Single still portrait image. Preserve the reference face exactly when a photo is supplied.",
"scenes": [
{
"id": 1,
"duration_sec": duration or 1,
"visual_prompt": prompt,
"visual_type": "image",
"voiceover": "",
"caption": "",
}
],
"image_request": {
"prompt": prompt,
"mode": "edit" if resolved_photo else "edit_required",
"size_hint": "1024x1024",
"reference_image_required": True,
"reference_image_path": resolved_photo,
"prompt_source": prompt_source,
"trend_name": trend_name,
"original_request": input_text,
},
"reference_image_required": True,
"reference_image_path": resolved_photo,
"storyboard_grid_prompt": None,
"music_mood": "inspirational",
"style_notes": "Single-scene trend portrait scenario for downstream image generation.",
"asset_analysis": asset_analysis,
}
def generate_video(input_text, platform="auto", audience="", duration=None, lang="auto"):
"""Generate a full video shooting script with timecoded voiceover and action descriptions."""
if not API_KEY:
@ -524,12 +622,14 @@ def main():
choices=["image", "video"],
help="image: storyboard for image/visual generation; video: full shooting script with voiceover")
parser.add_argument("--format", default="auto",
choices=["auto","wb_ad","reels","viral","long","postcard","educational"])
choices=["auto","wb_ad","reels","viral","long","postcard","educational","trend_photo"])
parser.add_argument("--platform", default="auto",
choices=["auto","tiktok","instagram","wb","youtube","vk"])
parser.add_argument("--audience", default="", help="Target audience description")
parser.add_argument("--duration", type=int, default=None, help="Target duration in seconds")
parser.add_argument("--lang", default="auto", help="Output language: ru, en, de, auto")
parser.add_argument("--photo", default=None,
help="Reference photo path for trend-photo requests.")
parser.add_argument("--analyze", action="store_true", help="Analyze assets before generating (image mode only)")
parser.add_argument("--out", default=None, help="Save JSON output to file")
parser.add_argument("--voice", action="store_true",
@ -592,21 +692,39 @@ def main():
return
assets = None
if args.analyze:
print("Analyzing assets...", file=sys.stderr)
assets = analyze_assets(args.input)
print(f"Asset type: {assets.get('asset_type')} / confidence: {assets.get('confidence')}", file=sys.stderr)
if args.format == "auto" and assets.get("recommended_format"):
args.format = assets["recommended_format"]
if args.platform == "auto" and assets.get("recommended_platform"):
args.platform = assets["recommended_platform"]
trend_mode = args.format == "trend_photo" or looks_like_trend_request(args.input)
if trend_mode:
print("Generating trend-photo scenario...", file=sys.stderr)
result = build_trend_image_scenario(
args.input,
platform=args.platform,
audience=args.audience,
duration=args.duration,
lang=args.lang,
photo_path=args.photo,
)
trend_name = result.get("image_request", {}).get("trend_name")
prompt_source = result.get("image_request", {}).get("prompt_source")
print(f" Trend: {trend_name or 'custom llm expansion'}", file=sys.stderr)
print(f" Prompt source: {prompt_source}", file=sys.stderr)
if result.get("reference_image_path"):
print(f" Reference photo: {result['reference_image_path']}", file=sys.stderr)
else:
assets = None
if args.analyze:
print("Analyzing assets...", file=sys.stderr)
assets = analyze_assets(args.input)
print(f"Asset type: {assets.get('asset_type')} / confidence: {assets.get('confidence')}", file=sys.stderr)
if args.format == "auto" and assets.get("recommended_format"):
args.format = assets["recommended_format"]
if args.platform == "auto" and assets.get("recommended_platform"):
args.platform = assets["recommended_platform"]
result = generate(args.input, args.format, args.platform,
args.audience, args.duration, args.lang, assets)
result = generate(args.input, args.format, args.platform,
args.audience, args.duration, args.lang, assets)
if assets:
result["asset_analysis"] = assets
if assets:
result["asset_analysis"] = assets
output = json.dumps(result, ensure_ascii=False, indent=2)

View file

@ -4,15 +4,17 @@ trend.py — portrait trend prompt enhancer.
Checks if the user's request matches one of the known trends (stored in trends/ folder).
If matched returns the curated prompt directly.
If not matched asks LLM to generate a prompt based on the request.
If not matched asks LLM to generate a prompt based on live TikTok trends context.
Usage:
python story-gen/scripts/trend.py "аниме"
python story-gen/scripts/trend.py "я в средневековье" --generate
python story-gen/scripts/trend.py "vintage photo booth" --generate --size 1024x1792
python story-gen/scripts/trend.py --list-trends
python story-gen/scripts/trend.py --update-trends
"""
import sys, os, json, argparse, base64, subprocess, tempfile
import sys, os, json, argparse, base64, subprocess
from urllib import request
from datetime import datetime, timezone, timedelta
import _env as _ # loads .env from repo root
@ -20,20 +22,21 @@ import _env as _ # loads .env from repo root
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ('utf-8', 'utf8'):
sys.stdout.reconfigure(encoding='utf-8')
API_URL = os.environ.get("OPENAI_BASE_URL", "https://llm.lambda.coredump.ru/v1")
API_KEY = os.environ.get("OPENAI_API_KEY", "")
MODEL = os.environ.get("STORY_MODEL", "qwen3.5-122b")
IMAGE_MODEL = os.environ.get("IMAGE_MODEL", "gpt-image-1")
API_URL = os.environ.get("OPENAI_BASE_URL", "https://llm.lambda.coredump.ru/v1")
API_KEY = os.environ.get("OPENAI_API_KEY", "")
MODEL = os.environ.get("STORY_MODEL", "qwen3.5-122b")
IMAGE_MODEL = os.environ.get("IMAGE_MODEL", "gpt-image-1")
OUTPUT_DIR = os.environ.get("MEDIA_SKILL_IMAGE_OUTPUT_DIR",
os.path.join(os.path.dirname(__file__), "..", "..", "output"))
TRENDS_DIR = os.path.join(os.path.dirname(__file__), "trends")
TRENDS_CACHE = os.path.join(os.path.dirname(__file__), "trends_cache.json")
# ---------------------------------------------------------------------------
# Trend matching
# Static trend files (trends/*.txt)
# ---------------------------------------------------------------------------
def load_trends() -> list[dict]:
"""Load all trend files. Returns list of {name, keywords, prompt}."""
trends = []
if not os.path.isdir(TRENDS_DIR):
return trends
@ -43,10 +46,8 @@ def load_trends() -> list[dict]:
path = os.path.join(TRENDS_DIR, fname)
with open(path, encoding="utf-8") as f:
content = f.read().strip()
# First line: "keywords: word1, word2, ..."
lines = content.splitlines()
keywords = []
prompt_lines = []
keywords, prompt_lines = [], []
for i, line in enumerate(lines):
if line.startswith("keywords:"):
raw = line[len("keywords:"):].strip()
@ -54,17 +55,15 @@ def load_trends() -> list[dict]:
else:
prompt_lines = lines[i:]
break
prompt = "\n".join(prompt_lines).strip()
trends.append({
"name": fname.replace(".txt", ""),
"keywords": keywords,
"prompt": prompt
"prompt": "\n".join(prompt_lines).strip()
})
return trends
def match_trend(user_input: str, trends: list[dict]) -> dict | None:
"""Return matched trend dict or None if no match."""
query = user_input.lower()
for trend in trends:
for kw in trend["keywords"]:
@ -72,17 +71,11 @@ def match_trend(user_input: str, trends: list[dict]) -> dict | None:
return trend
return None
# ---------------------------------------------------------------------------
# LLM fallback for unknown trends
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# Live TikTok trends cache
# ---------------------------------------------------------------------------
def load_live_trends() -> list[dict]:
"""Load cached TikTok trends. Returns empty list if cache missing or stale (>25h)."""
if not os.path.exists(TRENDS_CACHE):
return []
with open(TRENDS_CACHE, encoding="utf-8") as f:
@ -91,30 +84,30 @@ def load_live_trends() -> list[dict]:
if updated_at:
age = datetime.now(timezone.utc) - datetime.fromisoformat(updated_at)
if age > timedelta(hours=25):
print(" Note: trends cache is older than 25h, consider running trend_collector.py",
file=sys.stderr)
return cache.get("trends", [])
print(" Note: trends cache is older than 25h, run --update-trends", file=sys.stderr)
return cache.get("video_trends", [])
def build_trends_context(trends: list[dict]) -> str:
"""Format live trends into a context block for the system prompt."""
if not trends:
return ""
lines = ["Current TikTok trends (use these to make the prompt more relevant):"]
for t in trends[:6]: # top 6 trends
name = t.get("trend_name", "")
fmt = t.get("content_format", "")
visual = t.get("visual_style", "")
hook = t.get("hook_pattern", "")
kw = ", ".join(t.get("prompt_keywords", []))
sounds = ", ".join(t.get("top_sounds", []))
lines = ["Current TikTok trends (use to make the prompt more relevant):"]
for t in trends[:6]:
name = t.get("trend_name", "")
fmt = t.get("content_format", "")
visual = t.get("visual_style", "")
hook = t.get("hook_pattern", "") or (t.get("hook_examples") or [""])[0]
kw = ", ".join(t.get("prompt_keywords", []))
sounds = ", ".join(t.get("top_sounds", []))
lines.append(
f"- [{fmt}] {name}: visual style — {visual}. "
f"Hook: {hook}. Keywords: {kw}."
+ (f" Trending sounds: {sounds}." if sounds else "")
f"- [{fmt}] {name}: visual — {visual}. Hook: {hook}. Keywords: {kw}."
+ (f" Sounds: {sounds}." if sounds else "")
)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# LLM prompt expansion
# ---------------------------------------------------------------------------
SYSTEM_PROMPT_BASE = """You are a professional prompt engineer for AI portrait image generation.
@ -147,31 +140,31 @@ Rules:
"""
def build_system_prompt() -> str:
def expand_trend_llm(user_input: str) -> str:
trends = load_live_trends()
ctx = build_trends_context(trends)
if ctx:
print(f" Using {len(trends)} live TikTok trends as context", file=sys.stderr)
return SYSTEM_PROMPT_BASE.format(trends_context=ctx if ctx else "")
system = SYSTEM_PROMPT_BASE.format(trends_context=ctx if ctx else "")
def expand_trend_llm(user_input: str) -> str:
payload = json.dumps({
"model": MODEL,
"messages": [
{"role": "system", "content": build_system_prompt()},
{"role": "system", "content": system},
{"role": "user", "content": user_input}
],
"temperature": 0.7
"temperature": 0.7,
"max_tokens": 4000,
}).encode()
req = request.Request(
f"{API_URL}/chat/completions",
data=payload,
headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
)
with request.urlopen(req, timeout=60) as resp:
with request.urlopen(req, timeout=120) as resp:
data = json.loads(resp.read())
return data["choices"][0]["message"]["content"].strip()
msg = data["choices"][0]["message"]
return (msg.get("content") or msg.get("reasoning_content") or "").strip()
def expand_trend(user_input: str) -> tuple[str, str]:
@ -181,10 +174,9 @@ def expand_trend(user_input: str) -> tuple[str, str]:
if matched:
print(f" Matched trend: {matched['name']}", file=sys.stderr)
return matched["prompt"], matched["name"]
print(" No trend matched — using LLM to generate prompt", file=sys.stderr)
print(" No trend matched — using LLM", file=sys.stderr)
return expand_trend_llm(user_input), "llm"
# ---------------------------------------------------------------------------
# Image generation
# ---------------------------------------------------------------------------
@ -218,6 +210,9 @@ def open_image(path: str):
else:
subprocess.run(["xdg-open", path])
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Portrait trend prompt enhancer")
@ -232,34 +227,24 @@ def main():
help="Run trend_collector.py to refresh trends cache")
args = parser.parse_args()
# --list-trends: show cached trends
if args.list_trends:
trends = load_live_trends()
if not trends:
print("No cached trends found. Run with --update-trends first.")
print("No cached trends. Run: python story-gen/scripts/trend.py --update-trends")
return
cache_path = TRENDS_CACHE
cache_mtime = ""
if os.path.exists(cache_path):
import datetime as dt
mtime = os.path.getmtime(cache_path)
cache_mtime = dt.datetime.fromtimestamp(mtime).strftime("%Y-%m-%d %H:%M")
print(f"TikTok Trends (updated: {cache_mtime})\n")
mtime = datetime.fromtimestamp(os.path.getmtime(TRENDS_CACHE)).strftime("%Y-%m-%d %H:%M")
print(f"TikTok Trends (updated: {mtime})\n")
for i, t in enumerate(trends, 1):
print(f"{i}. [{t.get('content_format','?')}] {t.get('trend_name','?')}")
print(f" Topic: {t.get('topic','?')}")
print(f"{i}. [{t.get('category','?')}] {t.get('trend_name','?')}")
print(f" Visual: {t.get('visual_style','?')}")
print(f" Hook: {t.get('hook_pattern','?')}")
print(f" Keywords: {', '.join(t.get('prompt_keywords', []))}")
print(f" Avg plays: {t.get('avg_plays', 0):,}")
print(f" Hooks: {t.get('hook_examples', [])}")
print(f" Sounds: {t.get('top_sounds', [])}")
print()
return
# --update-trends: run collector
if args.update_trends:
collector = os.path.join(os.path.dirname(__file__), "trend_collector.py")
import subprocess as sp
result = sp.run([sys.executable, collector], capture_output=False)
result = subprocess.run([sys.executable, collector])
sys.exit(result.returncode)
if not args.input:
@ -270,15 +255,14 @@ def main():
print("Error: OPENAI_API_KEY not set", file=sys.stderr)
sys.exit(1)
print(f"Processing trend request: {args.input!r}", file=sys.stderr)
prompt, _ = expand_trend(args.input)
print(f"Processing: {args.input!r}", file=sys.stderr)
prompt, source = expand_trend(args.input)
print(f" Source: {source}", file=sys.stderr)
print(prompt)
if args.generate:
image_bytes = generate_image(prompt, args.size)
out_dir = os.path.join(tempfile.gettempdir(), "story-gen-previews")
out_dir = os.path.abspath(OUTPUT_DIR)
os.makedirs(out_dir, exist_ok=True)
safe = "".join(c if c.isalnum() or c in "-_ " else "_" for c in args.input)[:40]
path = os.path.join(out_dir, f"trend_{safe}.png")

View file

@ -0,0 +1,213 @@
#!/usr/bin/env python3
from __future__ import annotations
import json
import os
from functools import lru_cache
from pathlib import Path
from urllib import request
DEFAULT_API_URL = "https://llm.lambda.coredump.ru/v1"
DEFAULT_MODEL = "qwen3.5-122b"
TRENDS_DIR = Path(__file__).resolve().parent / "trends"
# Conservative auto-routing: only obvious "trend photo" asks should bypass the
# generic storyboard generator. Known trend keywords alone are not enough.
TREND_REQUEST_HINTS = (
"trend photo",
"trending photo",
"portrait trend",
"viral portrait",
"viral photo",
"ai photo of me",
"transform my photo",
"use my photo",
"upload photo",
"uploaded photo",
"trend portrait",
"трендовая фотка",
"трендовая фото",
"трендовую фотку",
"трендовый портрет",
"сделай меня",
"по моей фотке",
"по моему фото",
"из моей фотки",
"из моего фото",
"используй мое фото",
"используй мою фотку",
)
REFERENCE_IMAGE_HINTS = (
"my photo",
"my face",
"my selfie",
"my portrait",
"me ",
" me",
"portrait",
"selfie",
"photo of me",
"use photo",
"use my",
"transform",
"upload",
"мое фото",
"моя фотка",
"мое лицо",
"моё фото",
"моё лицо",
"по фото",
"по фотке",
"портрет",
"селфи",
"меня",
)
SYSTEM_PROMPT = """You are a professional prompt engineer for AI portrait image generation.
The user gives you a short casual phrase describing a visual style or trend.
Your job: expand it into a detailed, professional portrait prompt.
Always structure the output as follows:
Transform this photo into a human portrait. Use the uploaded photo for the face preserve ALL features exactly: face shape, eyes, nose, lips, eyebrows, hair color, hairstyle.
Clothing: [detailed clothing description]
Location: [detailed background/setting description]
Pose & Action: [body position, gesture, eye direction]
Lighting: [lighting setup and atmosphere]
Mood: [emotional tone, keywords]
Technical: [camera lens, aperture, ISO, art style, quality tags]
Rules:
- Write entirely in English
- Be very specific and detailed in every section
- The prompt must be ready to paste directly into an image generation model
- Return ONLY the prompt text, no explanations, no markdown
"""
def _normalize_query(value: str) -> str:
return value.lower().strip()
@lru_cache(maxsize=1)
def load_trends() -> tuple[dict, ...]:
trends: list[dict] = []
if not TRENDS_DIR.is_dir():
return tuple()
for path in sorted(TRENDS_DIR.glob("*.txt")):
content = path.read_text(encoding="utf-8").strip()
lines = content.splitlines()
keywords: list[str] = []
prompt_lines: list[str] = []
for index, line in enumerate(lines):
if line.startswith("keywords:"):
raw_keywords = line[len("keywords:"):].strip()
keywords = [
keyword.strip().lower()
for keyword in raw_keywords.split(",")
if keyword.strip()
]
else:
prompt_lines = lines[index:]
break
trends.append(
{
"name": path.stem,
"path": str(path),
"keywords": keywords,
"prompt": "\n".join(prompt_lines).strip(),
}
)
return tuple(trends)
def match_trend(user_input: str, trends: tuple[dict, ...] | None = None) -> dict | None:
query = _normalize_query(user_input)
catalog = trends if trends is not None else load_trends()
for trend in catalog:
for keyword in trend["keywords"]:
if keyword and keyword in query:
return trend
return None
def looks_like_trend_request(user_input: str) -> bool:
query = _normalize_query(user_input)
if any(hint in query for hint in TREND_REQUEST_HINTS):
return True
matched = match_trend(query)
if not matched:
return False
return any(hint in query for hint in REFERENCE_IMAGE_HINTS)
def expand_trend_llm(
user_input: str,
*,
api_url: str | None = None,
api_key: str | None = None,
model: str | None = None,
) -> str:
resolved_api_url = (api_url or os.environ.get("OPENAI_BASE_URL") or DEFAULT_API_URL).rstrip("/")
resolved_api_key = api_key or os.environ.get("OPENAI_API_KEY", "")
resolved_model = model or os.environ.get("STORY_MODEL") or DEFAULT_MODEL
if not resolved_api_key:
raise RuntimeError("OPENAI_API_KEY not set; cannot expand an unknown trend via LLM")
payload = json.dumps(
{
"model": resolved_model,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_input},
],
"temperature": 0.7,
}
).encode()
req = request.Request(
f"{resolved_api_url}/chat/completions",
data=payload,
headers={
"Authorization": f"Bearer {resolved_api_key}",
"Content-Type": "application/json",
},
)
with request.urlopen(req, timeout=60) as resp:
data = json.loads(resp.read())
return data["choices"][0]["message"]["content"].strip()
def expand_trend(
user_input: str,
*,
api_url: str | None = None,
api_key: str | None = None,
model: str | None = None,
) -> tuple[str, str]:
matched = match_trend(user_input)
if matched:
return matched["prompt"], matched["name"]
return (
expand_trend_llm(
user_input,
api_url=api_url,
api_key=api_key,
model=model,
),
"llm",
)

View file

@ -7,21 +7,12 @@ die() {
exit 1
}
usage() {
cat <<'EOF'
Usage:
replace_audio.sh <manifest_file> <video_file> <output_video_file>
Пример:
replace_audio.sh voice/segments/segments.txt rendered_video.mp4 final_output.mp4
EOF
}
require_command() {
command -v "$1" >/dev/null 2>&1 || die "missing required command: $1"
}
[ $# -eq 3 ] || { usage; exit 1; }
[ $# -eq 3 ] || { echo "Usage: $0 <manifest_file> <video_file> <output_file>"; exit 1; }
MANIFEST="$1"
VIDEO_IN="$2"
VIDEO_OUT="$3"
@ -33,13 +24,13 @@ require_command ffprobe
TEMP_LIST="$(mktemp)"
trap 'rm -f "$TEMP_LIST"' EXIT
# 2. Читаем манифест, сортируем по времени и создаем list.txt для ffmpeg
# ffmpeg concat требует файлы без указания таймкодов, просто последовательно.
# Мы также проверяем, что аудиофайлы существуют.
# 2. Читаем манифест, извлекаем только пути к аудиофайлам
echo "Сборка временного списка аудиофайлов из манифеста..."
while IFS= read -r line; do
[[ -z "$line" ]] && continue
# Извлекаем второй столбец (путь к аудиофайлу)
audio_file=$(echo "$line" | awk '{print $2}')
if [ -f "$audio_file" ]; then
# Экранируем спецсимволы в пути для ffmpeg concat
printf "file '%s'\n" "$(echo "$audio_file" | sed "s/'/'\\\\''/g")" >> "$TEMP_LIST"
@ -62,9 +53,7 @@ ffmpeg -y -f concat -safe 0 -i "$TEMP_LIST" -c copy "$TEMP_AUDIO" || die "Оши
AUDIO_DURATION=$(ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 "$TEMP_AUDIO")
echo "Длительность озвучки: $AUDIO_DURATION секунд"
# 5. Накладываем аудио на видео (обрезаем или дополняем тишиной до длины видео)
# -shortest обрежет по самому короткому потоку (в нашем случае это, скорее всего, аудио)
# Чтобы видео не обрезалось, используем фильтр apad, который добавит тишины, если аудио короче.
# 5. Накладываем аудио на видео
echo "Наложение аудио на видео..."
ffmpeg -y -i "$VIDEO_IN" -i "$TEMP_AUDIO" \
-filter_complex "[1:a]apad[aud]" \

View file

@ -1,3 +1,5 @@
#pip install edge-tts
#python voice/tts_generate.py assets/scenario.json
import argparse
import asyncio
import json