fix: persist google oauth pkce for headless auth

Store the pending OAuth state and code verifier between --auth-url and --auth-code so the manual headless flow can reuse Flow.fetch_token() without disabling PKCE.
This commit is contained in:
teknium1 2026-03-14 22:11:34 -07:00
parent 30120f05a6
commit 4524cddc72
3 changed files with 274 additions and 14 deletions

View file

@ -102,7 +102,9 @@ This prints a URL. **Send the URL to the user** and tell them:
### Step 4: Exchange the code
The user will paste back either a URL like `http://localhost:1/?code=4/0A...&scope=...`
or just the code string. Either works:
or just the code string. Either works. The `--auth-url` step stores a temporary
pending OAuth session locally so `--auth-code` can complete the PKCE exchange
later, even on headless systems:
```bash
$GSETUP --auth-code "THE_URL_OR_CODE_THE_USER_PASTED"
@ -119,6 +121,7 @@ Should print `AUTHENTICATED`. Setup is complete — token refreshes automaticall
### Notes
- Token is stored at `~/.hermes/google_token.json` and auto-refreshes.
- Pending OAuth session state/verifier are stored temporarily at `~/.hermes/google_oauth_pending.json` until exchange completes.
- To revoke: `$GSETUP --revoke`
## Usage

View file

@ -31,6 +31,7 @@ from pathlib import Path
HERMES_HOME = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
TOKEN_PATH = HERMES_HOME / "google_token.json"
CLIENT_SECRET_PATH = HERMES_HOME / "google_client_secret.json"
PENDING_AUTH_PATH = HERMES_HOME / "google_oauth_pending.json"
SCOPES = [
"https://www.googleapis.com/auth/gmail.readonly",
@ -141,6 +142,58 @@ def store_client_secret(path: str):
print(f"OK: Client secret saved to {CLIENT_SECRET_PATH}")
def _save_pending_auth(*, state: str, code_verifier: str):
"""Persist the OAuth session bits needed for a later token exchange."""
PENDING_AUTH_PATH.write_text(
json.dumps(
{
"state": state,
"code_verifier": code_verifier,
"redirect_uri": REDIRECT_URI,
},
indent=2,
)
)
def _load_pending_auth() -> dict:
"""Load the pending OAuth session created by get_auth_url()."""
if not PENDING_AUTH_PATH.exists():
print("ERROR: No pending OAuth session found. Run --auth-url first.")
sys.exit(1)
try:
data = json.loads(PENDING_AUTH_PATH.read_text())
except Exception as e:
print(f"ERROR: Could not read pending OAuth session: {e}")
print("Run --auth-url again to start a fresh OAuth session.")
sys.exit(1)
if not data.get("state") or not data.get("code_verifier"):
print("ERROR: Pending OAuth session is missing PKCE data.")
print("Run --auth-url again to start a fresh OAuth session.")
sys.exit(1)
return data
def _extract_code_and_state(code_or_url: str) -> tuple[str, str | None]:
"""Accept either a raw auth code or the full redirect URL pasted by the user."""
if not code_or_url.startswith("http"):
return code_or_url, None
from urllib.parse import parse_qs, urlparse
parsed = urlparse(code_or_url)
params = parse_qs(parsed.query)
if "code" not in params:
print("ERROR: No 'code' parameter found in URL.")
sys.exit(1)
state = params.get("state", [None])[0]
return params["code"][0], state
def get_auth_url():
"""Print the OAuth authorization URL. User visits this in a browser."""
if not CLIENT_SECRET_PATH.exists():
@ -154,11 +207,13 @@ def get_auth_url():
str(CLIENT_SECRET_PATH),
scopes=SCOPES,
redirect_uri=REDIRECT_URI,
autogenerate_code_verifier=True,
)
auth_url, _ = flow.authorization_url(
auth_url, state = flow.authorization_url(
access_type="offline",
prompt="consent",
)
_save_pending_auth(state=state, code_verifier=flow.code_verifier)
# Print just the URL so the agent can extract it cleanly
print(auth_url)
@ -169,26 +224,23 @@ def exchange_auth_code(code: str):
print("ERROR: No client secret stored. Run --client-secret first.")
sys.exit(1)
pending_auth = _load_pending_auth()
code, returned_state = _extract_code_and_state(code)
if returned_state and returned_state != pending_auth["state"]:
print("ERROR: OAuth state mismatch. Run --auth-url again to start a fresh session.")
sys.exit(1)
_ensure_deps()
from google_auth_oauthlib.flow import Flow
flow = Flow.from_client_secrets_file(
str(CLIENT_SECRET_PATH),
scopes=SCOPES,
redirect_uri=REDIRECT_URI,
redirect_uri=pending_auth.get("redirect_uri", REDIRECT_URI),
state=pending_auth["state"],
code_verifier=pending_auth["code_verifier"],
)
# The code might come as a full redirect URL or just the code itself
if code.startswith("http"):
# Extract code from redirect URL: http://localhost:1/?code=CODE&scope=...
from urllib.parse import urlparse, parse_qs
parsed = urlparse(code)
params = parse_qs(parsed.query)
if "code" not in params:
print("ERROR: No 'code' parameter found in URL.")
sys.exit(1)
code = params["code"][0]
try:
flow.fetch_token(code=code)
except Exception as e:
@ -198,6 +250,7 @@ def exchange_auth_code(code: str):
creds = flow.credentials
TOKEN_PATH.write_text(creds.to_json())
PENDING_AUTH_PATH.unlink(missing_ok=True)
print(f"OK: Authenticated. Token saved to {TOKEN_PATH}")
@ -229,6 +282,7 @@ def revoke():
print(f"Remote revocation failed (token may already be invalid): {e}")
TOKEN_PATH.unlink(missing_ok=True)
PENDING_AUTH_PATH.unlink(missing_ok=True)
print(f"Deleted {TOKEN_PATH}")