Add OpenAI Codex provider runtime and responses integration (without .agent/PLANS.md)

This commit is contained in:
George Pickett 2026-02-25 18:20:38 -08:00
parent e3cb957a10
commit 609b19b630
19 changed files with 1713 additions and 145 deletions

View file

@ -30,6 +30,7 @@ import re
import sys
import time
import threading
from types import SimpleNamespace
import uuid
from typing import List, Dict, Any, Optional
from openai import OpenAI
@ -95,6 +96,8 @@ class AIAgent:
self,
base_url: str = None,
api_key: str = None,
provider: str = None,
api_mode: str = None,
model: str = "anthropic/claude-opus-4.6", # OpenRouter format
max_iterations: int = 60, # Default tool-calling iterations
tool_delay: float = 1.0,
@ -127,6 +130,8 @@ class AIAgent:
Args:
base_url (str): Base URL for the model API (optional)
api_key (str): API key for authentication (optional, uses env var if not provided)
provider (str): Provider identifier (optional; used for telemetry/routing hints)
api_mode (str): API mode override: "chat_completions" or "codex_responses"
model (str): Model name to use (default: "anthropic/claude-opus-4.6")
max_iterations (int): Maximum number of tool calling iterations (default: 60)
tool_delay (float): Delay between tool calls in seconds (default: 1.0)
@ -172,6 +177,17 @@ class AIAgent:
# Store effective base URL for feature detection (prompt caching, reasoning, etc.)
# When no base_url is provided, the client defaults to OpenRouter, so reflect that here.
self.base_url = base_url or OPENROUTER_BASE_URL
provider_name = provider.strip().lower() if isinstance(provider, str) and provider.strip() else None
self.provider = provider_name or "openrouter"
if api_mode in {"chat_completions", "codex_responses"}:
self.api_mode = api_mode
elif self.provider == "openai-codex":
self.api_mode = "codex_responses"
elif (provider_name is None) and "chatgpt.com/backend-api/codex" in self.base_url.lower():
self.api_mode = "codex_responses"
self.provider = "openai-codex"
else:
self.api_mode = "chat_completions"
self.tool_progress_callback = tool_progress_callback
self.clarify_callback = clarify_callback
self._last_reported_tool = None # Track for "new tool" mode
@ -1122,6 +1138,220 @@ class AIAgent:
if self._memory_store:
self._memory_store.load_from_disk()
def _responses_tools(self, tools: Optional[List[Dict[str, Any]]] = None) -> Optional[List[Dict[str, Any]]]:
"""Convert chat-completions tool schemas to Responses function-tool schemas."""
source_tools = tools if tools is not None else self.tools
if not source_tools:
return None
converted: List[Dict[str, Any]] = []
for item in source_tools:
fn = item.get("function", {}) if isinstance(item, dict) else {}
name = fn.get("name")
if not isinstance(name, str) or not name.strip():
continue
converted.append({
"type": "function",
"name": name,
"description": fn.get("description", ""),
"parameters": fn.get("parameters", {"type": "object", "properties": {}}),
})
return converted or None
def _chat_messages_to_responses_input(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Convert internal chat-style messages to Responses input items."""
items: List[Dict[str, Any]] = []
for msg in messages:
if not isinstance(msg, dict):
continue
role = msg.get("role")
if role == "system":
continue
if role in {"user", "assistant"}:
content = msg.get("content", "")
content_text = str(content) if content is not None else ""
if role == "assistant":
if content_text.strip():
items.append({"role": "assistant", "content": content_text})
tool_calls = msg.get("tool_calls")
if isinstance(tool_calls, list):
for tc in tool_calls:
if not isinstance(tc, dict):
continue
fn = tc.get("function", {})
fn_name = fn.get("name")
if not isinstance(fn_name, str) or not fn_name.strip():
continue
call_id = tc.get("id") or tc.get("call_id")
if not isinstance(call_id, str) or not call_id.strip():
call_id = f"call_{uuid.uuid4().hex[:12]}"
arguments = fn.get("arguments", "{}")
if isinstance(arguments, dict):
arguments = json.dumps(arguments, ensure_ascii=False)
elif not isinstance(arguments, str):
arguments = str(arguments)
arguments = arguments.strip() or "{}"
items.append({
"type": "function_call",
"id": call_id,
"call_id": call_id,
"name": fn_name,
"arguments": arguments,
})
continue
items.append({"role": role, "content": content_text})
continue
if role == "tool":
call_id = msg.get("tool_call_id")
if not isinstance(call_id, str) or not call_id.strip():
continue
items.append({
"type": "function_call_output",
"call_id": call_id,
"output": str(msg.get("content", "") or ""),
})
return items
def _extract_responses_message_text(self, item: Any) -> str:
"""Extract assistant text from a Responses message output item."""
content = getattr(item, "content", None)
if not isinstance(content, list):
return ""
chunks: List[str] = []
for part in content:
ptype = getattr(part, "type", None)
if ptype not in {"output_text", "text"}:
continue
text = getattr(part, "text", None)
if isinstance(text, str) and text:
chunks.append(text)
return "".join(chunks).strip()
def _extract_responses_reasoning_text(self, item: Any) -> str:
"""Extract a compact reasoning text from a Responses reasoning item."""
summary = getattr(item, "summary", None)
if isinstance(summary, list):
chunks: List[str] = []
for part in summary:
text = getattr(part, "text", None)
if isinstance(text, str) and text:
chunks.append(text)
if chunks:
return "\n".join(chunks).strip()
text = getattr(item, "text", None)
if isinstance(text, str) and text:
return text.strip()
return ""
def _normalize_codex_response(self, response: Any) -> tuple[Any, str]:
"""Normalize a Responses API object to an assistant_message-like object."""
output = getattr(response, "output", None)
if not isinstance(output, list) or not output:
raise RuntimeError("Responses API returned no output items")
response_status = getattr(response, "status", None)
if isinstance(response_status, str):
response_status = response_status.strip().lower()
else:
response_status = None
if response_status in {"failed", "cancelled"}:
error_obj = getattr(response, "error", None)
if isinstance(error_obj, dict):
error_msg = error_obj.get("message") or str(error_obj)
else:
error_msg = str(error_obj) if error_obj else f"Responses API returned status '{response_status}'"
raise RuntimeError(error_msg)
content_parts: List[str] = []
reasoning_parts: List[str] = []
tool_calls: List[Any] = []
has_incomplete_items = response_status in {"queued", "in_progress", "incomplete"}
for item in output:
item_type = getattr(item, "type", None)
item_status = getattr(item, "status", None)
if isinstance(item_status, str):
item_status = item_status.strip().lower()
else:
item_status = None
if item_status in {"queued", "in_progress", "incomplete"}:
has_incomplete_items = True
if item_type == "message":
message_text = self._extract_responses_message_text(item)
if message_text:
content_parts.append(message_text)
elif item_type == "reasoning":
reasoning_text = self._extract_responses_reasoning_text(item)
if reasoning_text:
reasoning_parts.append(reasoning_text)
elif item_type == "function_call":
if item_status in {"queued", "in_progress", "incomplete"}:
continue
fn_name = getattr(item, "name", "") or ""
arguments = getattr(item, "arguments", "{}")
if not isinstance(arguments, str):
arguments = str(arguments)
call_id = getattr(item, "call_id", None) or getattr(item, "id", None) or f"call_{uuid.uuid4().hex[:12]}"
tool_calls.append(SimpleNamespace(
id=call_id,
type="function",
function=SimpleNamespace(name=fn_name, arguments=arguments),
))
elif item_type == "custom_tool_call":
fn_name = getattr(item, "name", "") or ""
arguments = getattr(item, "input", "{}")
if not isinstance(arguments, str):
arguments = str(arguments)
call_id = getattr(item, "call_id", None) or getattr(item, "id", None) or f"call_{uuid.uuid4().hex[:12]}"
tool_calls.append(SimpleNamespace(
id=call_id,
type="function",
function=SimpleNamespace(name=fn_name, arguments=arguments),
))
final_text = "\n".join([p for p in content_parts if p]).strip()
if not final_text and hasattr(response, "output_text"):
out_text = getattr(response, "output_text", "")
if isinstance(out_text, str):
final_text = out_text.strip()
assistant_message = SimpleNamespace(
content=final_text,
tool_calls=tool_calls,
reasoning="\n\n".join(reasoning_parts).strip() if reasoning_parts else None,
reasoning_content=None,
reasoning_details=None,
)
if tool_calls:
finish_reason = "tool_calls"
elif has_incomplete_items:
finish_reason = "incomplete"
else:
finish_reason = "stop"
return assistant_message, finish_reason
def _run_codex_stream(self, api_kwargs: dict):
"""Execute one streaming Responses API request and return the final response."""
with self.client.responses.stream(**api_kwargs) as stream:
for _ in stream:
pass
return stream.get_final_response()
def _interruptible_api_call(self, api_kwargs: dict):
"""
Run the API call in a background thread so the main conversation loop
@ -1135,7 +1365,10 @@ class AIAgent:
def _call():
try:
result["response"] = self.client.chat.completions.create(**api_kwargs)
if self.api_mode == "codex_responses":
result["response"] = self._run_codex_stream(api_kwargs)
else:
result["response"] = self.client.chat.completions.create(**api_kwargs)
except Exception as e:
result["error"] = e
@ -1160,7 +1393,24 @@ class AIAgent:
return result["response"]
def _build_api_kwargs(self, api_messages: list) -> dict:
"""Build the keyword arguments dict for the chat completions API call."""
"""Build the keyword arguments dict for the active API mode."""
if self.api_mode == "codex_responses":
instructions = ""
payload_messages = api_messages
if api_messages and api_messages[0].get("role") == "system":
instructions = str(api_messages[0].get("content") or "").strip()
payload_messages = api_messages[1:]
if not instructions:
instructions = DEFAULT_AGENT_IDENTITY
return {
"model": self.model,
"instructions": instructions,
"input": self._chat_messages_to_responses_input(payload_messages),
"tools": self._responses_tools(),
"store": False,
}
provider_preferences = {}
if self.providers_allowed:
provider_preferences["only"] = self.providers_allowed
@ -1308,36 +1558,43 @@ class AIAgent:
messages.pop() # remove flush msg
return
api_kwargs = {
"model": self.model,
"messages": api_messages,
"tools": [memory_tool_def],
"temperature": 0.3,
"max_tokens": 1024,
}
if self.api_mode == "codex_responses":
codex_kwargs = self._build_api_kwargs(api_messages)
codex_kwargs["tools"] = self._responses_tools([memory_tool_def])
response = self._run_codex_stream(codex_kwargs)
assistant_message, _ = self._normalize_codex_response(response)
else:
api_kwargs = {
"model": self.model,
"messages": api_messages,
"tools": [memory_tool_def],
"temperature": 0.3,
"max_tokens": 1024,
}
response = self.client.chat.completions.create(**api_kwargs, timeout=30.0)
if not response.choices:
assistant_message = None
else:
assistant_message = response.choices[0].message
response = self.client.chat.completions.create(**api_kwargs, timeout=30.0)
if response.choices:
assistant_message = response.choices[0].message
if assistant_message.tool_calls:
# Execute only memory tool calls
for tc in assistant_message.tool_calls:
if tc.function.name == "memory":
try:
args = json.loads(tc.function.arguments)
from tools.memory_tool import memory_tool as _memory_tool
result = _memory_tool(
action=args.get("action"),
target=args.get("target", "memory"),
content=args.get("content"),
old_text=args.get("old_text"),
store=self._memory_store,
)
if not self.quiet_mode:
print(f" 🧠 Memory flush: saved to {args.get('target', 'memory')}")
except Exception as e:
logger.debug("Memory flush tool call failed: %s", e)
if assistant_message and assistant_message.tool_calls:
# Execute only memory tool calls
for tc in assistant_message.tool_calls:
if tc.function.name == "memory":
try:
args = json.loads(tc.function.arguments)
from tools.memory_tool import memory_tool as _memory_tool
_memory_tool(
action=args.get("action"),
target=args.get("target", "memory"),
content=args.get("content"),
old_text=args.get("old_text"),
store=self._memory_store,
)
if not self.quiet_mode:
print(f" 🧠 Memory flush: saved to {args.get('target', 'memory')}")
except Exception as e:
logger.debug("Memory flush tool call failed: %s", e)
except Exception as e:
logger.debug("Memory flush API call failed: %s", e)
finally:
@ -1628,24 +1885,37 @@ class AIAgent:
if _is_nous:
summary_extra_body["tags"] = ["product=hermes-agent"]
summary_kwargs = {
"model": self.model,
"messages": api_messages,
}
if self.max_tokens is not None:
summary_kwargs["max_tokens"] = self.max_tokens
if summary_extra_body:
summary_kwargs["extra_body"] = summary_extra_body
summary_response = self.client.chat.completions.create(**summary_kwargs)
if summary_response.choices and summary_response.choices[0].message.content:
final_response = summary_response.choices[0].message.content
if self.api_mode == "codex_responses":
summary_kwargs = self._build_api_kwargs(api_messages)
summary_kwargs["tools"] = None
summary_response = self._run_codex_stream(summary_kwargs)
assistant_message, _ = self._normalize_codex_response(summary_response)
final_response = assistant_message.content or ""
if "<think>" in final_response:
final_response = re.sub(r'<think>.*?</think>\s*', '', final_response, flags=re.DOTALL).strip()
messages.append({"role": "assistant", "content": final_response})
if final_response:
messages.append({"role": "assistant", "content": final_response})
else:
final_response = "I reached the iteration limit and couldn't generate a summary."
else:
final_response = "I reached the iteration limit and couldn't generate a summary."
summary_kwargs = {
"model": self.model,
"messages": api_messages,
}
if self.max_tokens is not None:
summary_kwargs["max_tokens"] = self.max_tokens
if summary_extra_body:
summary_kwargs["extra_body"] = summary_extra_body
summary_response = self.client.chat.completions.create(**summary_kwargs)
if summary_response.choices and summary_response.choices[0].message.content:
final_response = summary_response.choices[0].message.content
if "<think>" in final_response:
final_response = re.sub(r'<think>.*?</think>\s*', '', final_response, flags=re.DOTALL).strip()
messages.append({"role": "assistant", "content": final_response})
else:
final_response = "I reached the iteration limit and couldn't generate a summary."
except Exception as e:
logging.warning(f"Failed to get summary response: {e}")
@ -1848,6 +2118,8 @@ class AIAgent:
retry_count = 0
max_retries = 6 # Increased to allow longer backoff periods
finish_reason = "stop"
while retry_count <= max_retries:
try:
api_kwargs = self._build_api_kwargs(api_messages)
@ -1873,8 +2145,33 @@ class AIAgent:
resp_model = getattr(response, 'model', 'N/A') if response else 'N/A'
logging.debug(f"API Response received - Model: {resp_model}, Usage: {response.usage if hasattr(response, 'usage') else 'N/A'}")
# Validate response has valid choices before proceeding
if response is None or not hasattr(response, 'choices') or response.choices is None or len(response.choices) == 0:
# Validate response shape before proceeding
response_invalid = False
error_details = []
if self.api_mode == "codex_responses":
output_items = getattr(response, "output", None) if response is not None else None
if response is None:
response_invalid = True
error_details.append("response is None")
elif not isinstance(output_items, list):
response_invalid = True
error_details.append("response.output is not a list")
elif len(output_items) == 0:
response_invalid = True
error_details.append("response.output is empty")
else:
if response is None or not hasattr(response, 'choices') or response.choices is None or len(response.choices) == 0:
response_invalid = True
if response is None:
error_details.append("response is None")
elif not hasattr(response, 'choices'):
error_details.append("response has no 'choices' attribute")
elif response.choices is None:
error_details.append("response.choices is None")
else:
error_details.append("response.choices is empty")
if response_invalid:
# Stop spinner before printing error messages
if thinking_spinner:
thinking_spinner.stop(f"(´;ω;`) oops, retrying...")
@ -1882,15 +2179,6 @@ class AIAgent:
# This is often rate limiting or provider returning malformed response
retry_count += 1
error_details = []
if response is None:
error_details.append("response is None")
elif not hasattr(response, 'choices'):
error_details.append("response has no 'choices' attribute")
elif response.choices is None:
error_details.append("response.choices is None")
else:
error_details.append("response.choices is empty")
# Check for error field in response (some providers include this)
error_msg = "Unknown"
@ -1927,7 +2215,7 @@ class AIAgent:
"messages": messages,
"completed": False,
"api_calls": api_call_count,
"error": f"Invalid API response (choices is None/empty). Likely rate limited by provider.",
"error": "Invalid API response shape. Likely rate limited or malformed provider response.",
"failed": True # Mark as failure for filtering
}
@ -1953,7 +2241,20 @@ class AIAgent:
continue # Retry the API call
# Check finish_reason before proceeding
finish_reason = response.choices[0].finish_reason
if self.api_mode == "codex_responses":
status = getattr(response, "status", None)
incomplete_details = getattr(response, "incomplete_details", None)
incomplete_reason = None
if isinstance(incomplete_details, dict):
incomplete_reason = incomplete_details.get("reason")
else:
incomplete_reason = getattr(incomplete_details, "reason", None)
if status == "incomplete" and incomplete_reason in {"max_output_tokens", "length"}:
finish_reason = "length"
else:
finish_reason = "stop"
else:
finish_reason = response.choices[0].finish_reason
# Handle "length" finish_reason - response was truncated
if finish_reason == "length":
@ -1990,10 +2291,21 @@ class AIAgent:
# Track actual token usage from response for context management
if hasattr(response, 'usage') and response.usage:
if self.api_mode == "codex_responses":
prompt_tokens = getattr(response.usage, 'input_tokens', 0) or 0
completion_tokens = getattr(response.usage, 'output_tokens', 0) or 0
total_tokens = (
getattr(response.usage, 'total_tokens', None)
or (prompt_tokens + completion_tokens)
)
else:
prompt_tokens = getattr(response.usage, 'prompt_tokens', 0) or 0
completion_tokens = getattr(response.usage, 'completion_tokens', 0) or 0
total_tokens = getattr(response.usage, 'total_tokens', 0) or 0
usage_dict = {
"prompt_tokens": getattr(response.usage, 'prompt_tokens', 0),
"completion_tokens": getattr(response.usage, 'completion_tokens', 0),
"total_tokens": getattr(response.usage, 'total_tokens', 0),
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens,
}
self.context_compressor.update_from_response(usage_dict)
@ -2145,7 +2457,10 @@ class AIAgent:
break
try:
assistant_message = response.choices[0].message
if self.api_mode == "codex_responses":
assistant_message, finish_reason = self._normalize_codex_response(response)
else:
assistant_message = response.choices[0].message
# Handle assistant response
if assistant_message.content and not self.quiet_mode:
@ -2185,6 +2500,48 @@ class AIAgent:
# Reset incomplete scratchpad counter on clean response
if hasattr(self, '_incomplete_scratchpad_retries'):
self._incomplete_scratchpad_retries = 0
if self.api_mode == "codex_responses" and finish_reason == "incomplete":
if not hasattr(self, "_codex_incomplete_retries"):
self._codex_incomplete_retries = 0
self._codex_incomplete_retries += 1
interim_msg = self._build_assistant_message(assistant_message, finish_reason)
interim_has_content = bool(interim_msg.get("content", "").strip())
interim_has_reasoning = bool(interim_msg.get("reasoning", "").strip()) if isinstance(interim_msg.get("reasoning"), str) else False
if interim_has_content or interim_has_reasoning:
last_msg = messages[-1] if messages else None
duplicate_interim = (
isinstance(last_msg, dict)
and last_msg.get("role") == "assistant"
and last_msg.get("finish_reason") == "incomplete"
and (last_msg.get("content") or "") == (interim_msg.get("content") or "")
and (last_msg.get("reasoning") or "") == (interim_msg.get("reasoning") or "")
)
if not duplicate_interim:
messages.append(interim_msg)
self._log_msg_to_db(interim_msg)
if self._codex_incomplete_retries < 3:
if not self.quiet_mode:
print(f"{self.log_prefix}↻ Codex response incomplete; continuing turn ({self._codex_incomplete_retries}/3)")
self._session_messages = messages
self._save_session_log(messages)
continue
self._codex_incomplete_retries = 0
self._persist_session(messages, conversation_history)
return {
"final_response": None,
"messages": messages,
"api_calls": api_call_count,
"completed": False,
"partial": True,
"error": "Codex response remained incomplete after 3 continuation attempts",
}
elif hasattr(self, "_codex_incomplete_retries"):
self._codex_incomplete_retries = 0
# Check for tool calls
if assistant_message.tool_calls: