Harden Codex auth refresh and responses compatibility

This commit is contained in:
George Pickett 2026-02-25 19:27:54 -08:00
parent 91bdb9eb2d
commit 74c662b63a
9 changed files with 996 additions and 22 deletions

View file

@ -20,6 +20,7 @@ import logging
import os
import shutil
import stat
import base64
import subprocess
import time
import webbrowser
@ -58,6 +59,9 @@ DEFAULT_AGENT_KEY_MIN_TTL_SECONDS = 30 * 60 # 30 minutes
ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120 # refresh 2 min before expiry
DEVICE_AUTH_POLL_INTERVAL_CAP_SECONDS = 1 # poll at most every 1s
DEFAULT_CODEX_BASE_URL = "https://chatgpt.com/backend-api/codex"
CODEX_OAUTH_CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
CODEX_OAUTH_TOKEN_URL = "https://auth.openai.com/oauth/token"
CODEX_ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120
# =============================================================================
@ -380,6 +384,27 @@ def _optional_base_url(value: Any) -> Optional[str]:
return cleaned if cleaned else None
def _decode_jwt_claims(token: Any) -> Dict[str, Any]:
if not isinstance(token, str) or token.count(".") != 2:
return {}
payload = token.split(".")[1]
payload += "=" * ((4 - len(payload) % 4) % 4)
try:
raw = base64.urlsafe_b64decode(payload.encode("utf-8"))
claims = json.loads(raw.decode("utf-8"))
except Exception:
return {}
return claims if isinstance(claims, dict) else {}
def _codex_access_token_is_expiring(access_token: Any, skew_seconds: int) -> bool:
claims = _decode_jwt_claims(access_token)
exp = claims.get("exp")
if not isinstance(exp, (int, float)):
return False
return float(exp) <= (time.time() + max(0, int(skew_seconds)))
# =============================================================================
# SSH / remote session detection
# =============================================================================
@ -405,6 +430,39 @@ def _codex_auth_file_path() -> Path:
return resolve_codex_home_path() / "auth.json"
def _codex_auth_lock_path(auth_path: Path) -> Path:
return auth_path.with_suffix(auth_path.suffix + ".lock")
@contextmanager
def _codex_auth_file_lock(
auth_path: Path,
timeout_seconds: float = AUTH_LOCK_TIMEOUT_SECONDS,
):
lock_path = _codex_auth_lock_path(auth_path)
lock_path.parent.mkdir(parents=True, exist_ok=True)
with lock_path.open("a+") as lock_file:
if fcntl is None:
yield
return
deadline = time.time() + max(1.0, timeout_seconds)
while True:
try:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except BlockingIOError:
if time.time() >= deadline:
raise TimeoutError(f"Timed out waiting for Codex auth lock: {lock_path}")
time.sleep(0.05)
try:
yield
finally:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
def read_codex_auth_file() -> Dict[str, Any]:
"""Read and validate Codex auth.json shape."""
codex_home = resolve_codex_home_path()
@ -469,11 +527,172 @@ def read_codex_auth_file() -> Dict[str, Any]:
}
def resolve_codex_runtime_credentials() -> Dict[str, Any]:
def _persist_codex_auth_payload(
auth_path: Path,
payload: Dict[str, Any],
*,
lock_held: bool = False,
) -> None:
auth_path.parent.mkdir(parents=True, exist_ok=True)
def _write() -> None:
serialized = json.dumps(payload, indent=2, ensure_ascii=False) + "\n"
tmp_path = auth_path.parent / f".{auth_path.name}.{os.getpid()}.{time.time_ns()}.tmp"
try:
with tmp_path.open("w", encoding="utf-8") as tmp_file:
tmp_file.write(serialized)
tmp_file.flush()
os.fsync(tmp_file.fileno())
os.replace(tmp_path, auth_path)
finally:
if tmp_path.exists():
try:
tmp_path.unlink()
except OSError:
pass
try:
auth_path.chmod(stat.S_IRUSR | stat.S_IWUSR)
except OSError:
pass
if lock_held:
_write()
return
with _codex_auth_file_lock(auth_path):
_write()
def _refresh_codex_auth_tokens(
*,
payload: Dict[str, Any],
auth_path: Path,
timeout_seconds: float,
lock_held: bool = False,
) -> Dict[str, Any]:
tokens = payload.get("tokens")
if not isinstance(tokens, dict):
raise AuthError(
"Codex auth file is missing a valid 'tokens' object.",
provider="openai-codex",
code="codex_auth_invalid_shape",
relogin_required=True,
)
refresh_token = tokens.get("refresh_token")
if not isinstance(refresh_token, str) or not refresh_token.strip():
raise AuthError(
"Codex auth file is missing tokens.refresh_token.",
provider="openai-codex",
code="codex_auth_missing_refresh_token",
relogin_required=True,
)
timeout = httpx.Timeout(max(5.0, float(timeout_seconds)))
with httpx.Client(timeout=timeout, headers={"Accept": "application/json"}) as client:
response = client.post(
CODEX_OAUTH_TOKEN_URL,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": CODEX_OAUTH_CLIENT_ID,
},
)
if response.status_code != 200:
code = "codex_refresh_failed"
message = f"Codex token refresh failed with status {response.status_code}."
relogin_required = False
try:
err = response.json()
if isinstance(err, dict):
err_code = err.get("error")
if isinstance(err_code, str) and err_code.strip():
code = err_code.strip()
err_desc = err.get("error_description") or err.get("message")
if isinstance(err_desc, str) and err_desc.strip():
message = f"Codex token refresh failed: {err_desc.strip()}"
except Exception:
pass
if code in {"invalid_grant", "invalid_token", "invalid_request"}:
relogin_required = True
raise AuthError(
message,
provider="openai-codex",
code=code,
relogin_required=relogin_required,
)
try:
refresh_payload = response.json()
except Exception as exc:
raise AuthError(
"Codex token refresh returned invalid JSON.",
provider="openai-codex",
code="codex_refresh_invalid_json",
relogin_required=True,
) from exc
access_token = refresh_payload.get("access_token")
if not isinstance(access_token, str) or not access_token.strip():
raise AuthError(
"Codex token refresh response was missing access_token.",
provider="openai-codex",
code="codex_refresh_missing_access_token",
relogin_required=True,
)
updated_tokens = dict(tokens)
updated_tokens["access_token"] = access_token.strip()
next_refresh = refresh_payload.get("refresh_token")
if isinstance(next_refresh, str) and next_refresh.strip():
updated_tokens["refresh_token"] = next_refresh.strip()
payload["tokens"] = updated_tokens
payload["last_refresh"] = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
_persist_codex_auth_payload(auth_path, payload, lock_held=lock_held)
return updated_tokens
def resolve_codex_runtime_credentials(
*,
force_refresh: bool = False,
refresh_if_expiring: bool = True,
refresh_skew_seconds: int = CODEX_ACCESS_TOKEN_REFRESH_SKEW_SECONDS,
) -> Dict[str, Any]:
"""Resolve runtime credentials from Codex CLI auth state."""
data = read_codex_auth_file()
payload = data["payload"]
tokens = data["tokens"]
tokens = dict(data["tokens"])
auth_path = data["auth_path"]
access_token = str(tokens.get("access_token", "") or "").strip()
refresh_timeout_seconds = float(os.getenv("HERMES_CODEX_REFRESH_TIMEOUT_SECONDS", "20"))
should_refresh = bool(force_refresh)
if (not should_refresh) and refresh_if_expiring:
should_refresh = _codex_access_token_is_expiring(access_token, refresh_skew_seconds)
if should_refresh:
lock_timeout = max(float(AUTH_LOCK_TIMEOUT_SECONDS), refresh_timeout_seconds + 5.0)
with _codex_auth_file_lock(auth_path, timeout_seconds=lock_timeout):
data = read_codex_auth_file()
payload = data["payload"]
tokens = dict(data["tokens"])
access_token = str(tokens.get("access_token", "") or "").strip()
should_refresh = bool(force_refresh)
if (not should_refresh) and refresh_if_expiring:
should_refresh = _codex_access_token_is_expiring(access_token, refresh_skew_seconds)
if should_refresh:
tokens = _refresh_codex_auth_tokens(
payload=payload,
auth_path=auth_path,
timeout_seconds=refresh_timeout_seconds,
lock_held=True,
)
access_token = str(tokens.get("access_token", "") or "").strip()
base_url = (
os.getenv("HERMES_CODEX_BASE_URL", "").strip().rstrip("/")
or DEFAULT_CODEX_BASE_URL
@ -482,11 +701,11 @@ def resolve_codex_runtime_credentials() -> Dict[str, Any]:
return {
"provider": "openai-codex",
"base_url": base_url,
"api_key": tokens["access_token"],
"api_key": access_token,
"source": "codex-auth-json",
"last_refresh": payload.get("last_refresh"),
"auth_mode": payload.get("auth_mode"),
"auth_file": str(data["auth_path"]),
"auth_file": str(auth_path),
"codex_home": str(data["codex_home"]),
}

View file

@ -0,0 +1,91 @@
"""Codex model discovery from local Codex CLI cache/config."""
from __future__ import annotations
import json
from pathlib import Path
from typing import List, Optional
from hermes_cli.auth import resolve_codex_home_path
DEFAULT_CODEX_MODELS: List[str] = [
"gpt-5-codex",
"gpt-5.3-codex",
"gpt-5.2-codex",
"gpt-5.1-codex",
]
def _read_default_model(codex_home: Path) -> Optional[str]:
config_path = codex_home / "config.toml"
if not config_path.exists():
return None
try:
import tomllib
except Exception:
return None
try:
payload = tomllib.loads(config_path.read_text(encoding="utf-8"))
except Exception:
return None
model = payload.get("model") if isinstance(payload, dict) else None
if isinstance(model, str) and model.strip():
return model.strip()
return None
def _read_cache_models(codex_home: Path) -> List[str]:
cache_path = codex_home / "models_cache.json"
if not cache_path.exists():
return []
try:
raw = json.loads(cache_path.read_text(encoding="utf-8"))
except Exception:
return []
entries = raw.get("models") if isinstance(raw, dict) else None
sortable = []
if isinstance(entries, list):
for item in entries:
if not isinstance(item, dict):
continue
slug = item.get("slug")
if not isinstance(slug, str) or not slug.strip():
continue
slug = slug.strip()
if "codex" not in slug.lower():
continue
if item.get("supported_in_api") is False:
continue
visibility = item.get("visibility")
if isinstance(visibility, str) and visibility.strip().lower() == "hidden":
continue
priority = item.get("priority")
rank = int(priority) if isinstance(priority, (int, float)) else 10_000
sortable.append((rank, slug))
sortable.sort(key=lambda item: (item[0], item[1]))
deduped: List[str] = []
for _, slug in sortable:
if slug not in deduped:
deduped.append(slug)
return deduped
def get_codex_model_ids() -> List[str]:
codex_home = resolve_codex_home_path()
ordered: List[str] = []
default_model = _read_default_model(codex_home)
if default_model:
ordered.append(default_model)
for model_id in _read_cache_models(codex_home):
if model_id not in ordered:
ordered.append(model_id)
for model_id in DEFAULT_CODEX_MODELS:
if model_id not in ordered:
ordered.append(model_id)
return ordered

View file

@ -385,6 +385,7 @@ def _model_flow_openai_codex(config, current_model=""):
_update_config_for_provider, _login_openai_codex,
PROVIDER_REGISTRY, DEFAULT_CODEX_BASE_URL,
)
from hermes_cli.codex_models import get_codex_model_ids
from hermes_cli.config import get_env_value, save_env_value
import argparse
@ -402,14 +403,7 @@ def _model_flow_openai_codex(config, current_model=""):
print(f"Login failed: {exc}")
return
# Codex models are not discoverable through /models with this auth path,
# so provide curated IDs with custom fallback.
codex_models = [
"gpt-5-codex",
"gpt-5.3-codex",
"gpt-5.2-codex",
"gpt-5.1-codex",
]
codex_models = get_codex_model_ids()
selected = _prompt_model_selection(codex_models, current_model=current_model)
if selected:

View file

@ -826,12 +826,8 @@ def run_setup_wizard(args):
save_env_value("LLM_MODEL", custom)
# else: keep current
elif selected_provider == "openai-codex":
codex_models = [
"gpt-5-codex",
"gpt-5.3-codex",
"gpt-5.2-codex",
"gpt-5.1-codex",
]
from hermes_cli.codex_models import get_codex_model_ids
codex_models = get_codex_model_ids()
model_choices = [f"{m}" for m in codex_models]
model_choices.append("Custom model")
model_choices.append(f"Keep current ({current_model})")