feat: use endpoint metadata for custom model context and pricing (#1906)

* perf: cache base_url.lower() via property, consolidate triple load_config(), hoist set constant

run_agent.py:
- Add base_url property that auto-caches _base_url_lower on every
  assignment, eliminating 12+ redundant .lower() calls per API cycle
  across __init__, _build_api_kwargs, _supports_reasoning_extra_body,
  and the main conversation loop
- Consolidate three separate load_config() disk reads in __init__
  (memory, skills, compression) into a single call, reusing the
  result dict for all three config sections

model_tools.py:
- Hoist _READ_SEARCH_TOOLS set to module level (was rebuilt inside
  handle_function_call on every tool invocation)

* Use endpoint metadata for custom model context and pricing

---------

Co-authored-by: kshitij <82637225+kshitijk4poor@users.noreply.github.com>
This commit is contained in:
Teknium 2026-03-18 03:04:07 -07:00 committed by GitHub
parent 11f029c311
commit a2440f72f6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 375 additions and 49 deletions

View file

@ -10,6 +10,7 @@ import re
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse
import requests
import yaml
@ -21,6 +22,9 @@ logger = logging.getLogger(__name__)
_model_metadata_cache: Dict[str, Dict[str, Any]] = {}
_model_metadata_cache_time: float = 0
_MODEL_CACHE_TTL = 3600
_endpoint_model_metadata_cache: Dict[str, Dict[str, Dict[str, Any]]] = {}
_endpoint_model_metadata_cache_time: Dict[str, float] = {}
_ENDPOINT_MODEL_CACHE_TTL = 300
# Descending tiers for context length probing when the model is unknown.
# We start high and step down on context-length errors until one works.
@ -123,6 +127,128 @@ DEFAULT_CONTEXT_LENGTHS = {
"qwen-vl-max": 32768,
}
_CONTEXT_LENGTH_KEYS = (
"context_length",
"context_window",
"max_context_length",
"max_position_embeddings",
"max_model_len",
"max_input_tokens",
"max_sequence_length",
"max_seq_len",
)
_MAX_COMPLETION_KEYS = (
"max_completion_tokens",
"max_output_tokens",
"max_tokens",
)
def _normalize_base_url(base_url: str) -> str:
return (base_url or "").strip().rstrip("/")
def _is_openrouter_base_url(base_url: str) -> bool:
return "openrouter.ai" in _normalize_base_url(base_url).lower()
def _is_custom_endpoint(base_url: str) -> bool:
normalized = _normalize_base_url(base_url)
return bool(normalized) and not _is_openrouter_base_url(normalized)
def _is_known_provider_base_url(base_url: str) -> bool:
normalized = _normalize_base_url(base_url)
if not normalized:
return False
parsed = urlparse(normalized if "://" in normalized else f"https://{normalized}")
host = parsed.netloc.lower() or parsed.path.lower()
known_hosts = (
"api.openai.com",
"chatgpt.com",
"api.anthropic.com",
"api.z.ai",
"api.moonshot.ai",
"api.kimi.com",
"api.minimax",
)
return any(known_host in host for known_host in known_hosts)
def _iter_nested_dicts(value: Any):
if isinstance(value, dict):
yield value
for nested in value.values():
yield from _iter_nested_dicts(nested)
elif isinstance(value, list):
for item in value:
yield from _iter_nested_dicts(item)
def _coerce_reasonable_int(value: Any, minimum: int = 1024, maximum: int = 10_000_000) -> Optional[int]:
try:
if isinstance(value, bool):
return None
if isinstance(value, str):
value = value.strip().replace(",", "")
result = int(value)
except (TypeError, ValueError):
return None
if minimum <= result <= maximum:
return result
return None
def _extract_first_int(payload: Dict[str, Any], keys: tuple[str, ...]) -> Optional[int]:
keyset = {key.lower() for key in keys}
for mapping in _iter_nested_dicts(payload):
for key, value in mapping.items():
if str(key).lower() not in keyset:
continue
coerced = _coerce_reasonable_int(value)
if coerced is not None:
return coerced
return None
def _extract_context_length(payload: Dict[str, Any]) -> Optional[int]:
return _extract_first_int(payload, _CONTEXT_LENGTH_KEYS)
def _extract_max_completion_tokens(payload: Dict[str, Any]) -> Optional[int]:
return _extract_first_int(payload, _MAX_COMPLETION_KEYS)
def _extract_pricing(payload: Dict[str, Any]) -> Dict[str, Any]:
alias_map = {
"prompt": ("prompt", "input", "input_cost_per_token", "prompt_token_cost"),
"completion": ("completion", "output", "output_cost_per_token", "completion_token_cost"),
"request": ("request", "request_cost"),
"cache_read": ("cache_read", "cached_prompt", "input_cache_read", "cache_read_cost_per_token"),
"cache_write": ("cache_write", "cache_creation", "input_cache_write", "cache_write_cost_per_token"),
}
for mapping in _iter_nested_dicts(payload):
normalized = {str(key).lower(): value for key, value in mapping.items()}
if not any(any(alias in normalized for alias in aliases) for aliases in alias_map.values()):
continue
pricing: Dict[str, Any] = {}
for target, aliases in alias_map.items():
for alias in aliases:
if alias in normalized and normalized[alias] not in (None, ""):
pricing[target] = normalized[alias]
break
if pricing:
return pricing
return {}
def _add_model_aliases(cache: Dict[str, Dict[str, Any]], model_id: str, entry: Dict[str, Any]) -> None:
cache[model_id] = entry
if "/" in model_id:
bare_model = model_id.split("/", 1)[1]
cache.setdefault(bare_model, entry)
def fetch_model_metadata(force_refresh: bool = False) -> Dict[str, Dict[str, Any]]:
"""Fetch model metadata from OpenRouter (cached for 1 hour)."""
@ -139,15 +265,16 @@ def fetch_model_metadata(force_refresh: bool = False) -> Dict[str, Dict[str, Any
cache = {}
for model in data.get("data", []):
model_id = model.get("id", "")
cache[model_id] = {
entry = {
"context_length": model.get("context_length", 128000),
"max_completion_tokens": model.get("top_provider", {}).get("max_completion_tokens", 4096),
"name": model.get("name", model_id),
"pricing": model.get("pricing", {}),
}
_add_model_aliases(cache, model_id, entry)
canonical = model.get("canonical_slug", "")
if canonical and canonical != model_id:
cache[canonical] = cache[model_id]
_add_model_aliases(cache, canonical, entry)
_model_metadata_cache = cache
_model_metadata_cache_time = time.time()
@ -159,6 +286,75 @@ def fetch_model_metadata(force_refresh: bool = False) -> Dict[str, Dict[str, Any
return _model_metadata_cache or {}
def fetch_endpoint_model_metadata(
base_url: str,
api_key: str = "",
force_refresh: bool = False,
) -> Dict[str, Dict[str, Any]]:
"""Fetch model metadata from an OpenAI-compatible ``/models`` endpoint.
This is used for explicit custom endpoints where hardcoded global model-name
defaults are unreliable. Results are cached in memory per base URL.
"""
normalized = _normalize_base_url(base_url)
if not normalized or _is_openrouter_base_url(normalized):
return {}
if not force_refresh:
cached = _endpoint_model_metadata_cache.get(normalized)
cached_at = _endpoint_model_metadata_cache_time.get(normalized, 0)
if cached is not None and (time.time() - cached_at) < _ENDPOINT_MODEL_CACHE_TTL:
return cached
candidates = [normalized]
if normalized.endswith("/v1"):
alternate = normalized[:-3].rstrip("/")
else:
alternate = normalized + "/v1"
if alternate and alternate not in candidates:
candidates.append(alternate)
headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
last_error: Optional[Exception] = None
for candidate in candidates:
url = candidate.rstrip("/") + "/models"
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
payload = response.json()
cache: Dict[str, Dict[str, Any]] = {}
for model in payload.get("data", []):
if not isinstance(model, dict):
continue
model_id = model.get("id")
if not model_id:
continue
entry: Dict[str, Any] = {"name": model.get("name", model_id)}
context_length = _extract_context_length(model)
if context_length is not None:
entry["context_length"] = context_length
max_completion_tokens = _extract_max_completion_tokens(model)
if max_completion_tokens is not None:
entry["max_completion_tokens"] = max_completion_tokens
pricing = _extract_pricing(model)
if pricing:
entry["pricing"] = pricing
_add_model_aliases(cache, model_id, entry)
_endpoint_model_metadata_cache[normalized] = cache
_endpoint_model_metadata_cache_time[normalized] = time.time()
return cache
except Exception as exc:
last_error = exc
if last_error:
logger.debug("Failed to fetch model metadata from %s/models: %s", normalized, last_error)
_endpoint_model_metadata_cache[normalized] = {}
_endpoint_model_metadata_cache_time[normalized] = time.time()
return {}
def _get_context_cache_path() -> Path:
"""Return path to the persistent context length cache file."""
hermes_home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
@ -243,14 +439,15 @@ def parse_context_limit_from_error(error_msg: str) -> Optional[int]:
return None
def get_model_context_length(model: str, base_url: str = "") -> int:
def get_model_context_length(model: str, base_url: str = "", api_key: str = "") -> int:
"""Get the context length for a model.
Resolution order:
1. Persistent cache (previously discovered via probing)
2. OpenRouter API metadata
3. Hardcoded DEFAULT_CONTEXT_LENGTHS (fuzzy match)
4. First probe tier (2M) will be narrowed on first context error
2. Active endpoint metadata (/models for explicit custom endpoints)
3. OpenRouter API metadata
4. Hardcoded DEFAULT_CONTEXT_LENGTHS (fuzzy match for hosted routes only)
5. First probe tier (2M) will be narrowed on first context error
"""
# 1. Check persistent cache (model+provider)
if base_url:
@ -258,19 +455,31 @@ def get_model_context_length(model: str, base_url: str = "") -> int:
if cached is not None:
return cached
# 2. OpenRouter API metadata
# 2. Active endpoint metadata for explicit custom routes
if _is_custom_endpoint(base_url):
endpoint_metadata = fetch_endpoint_model_metadata(base_url, api_key=api_key)
if model in endpoint_metadata:
context_length = endpoint_metadata[model].get("context_length")
if isinstance(context_length, int):
return context_length
if not _is_known_provider_base_url(base_url):
# Explicit third-party endpoints should not borrow fuzzy global
# defaults from unrelated providers with similarly named models.
return CONTEXT_PROBE_TIERS[0]
# 3. OpenRouter API metadata
metadata = fetch_model_metadata()
if model in metadata:
return metadata[model].get("context_length", 128000)
# 3. Hardcoded defaults (fuzzy match — longest key first for specificity)
# 4. Hardcoded defaults (fuzzy match — longest key first for specificity)
for default_model, length in sorted(
DEFAULT_CONTEXT_LENGTHS.items(), key=lambda x: len(x[0]), reverse=True
):
if default_model in model or model in default_model:
return length
# 4. Unknown model — start at highest probe tier
# 5. Unknown model — start at highest probe tier
return CONTEXT_PROBE_TIERS[0]