diff --git a/skills/productivity/google-workspace/SKILL.md b/skills/productivity/google-workspace/SKILL.md index 77374d2e..00d91de9 100644 --- a/skills/productivity/google-workspace/SKILL.md +++ b/skills/productivity/google-workspace/SKILL.md @@ -102,7 +102,9 @@ This prints a URL. **Send the URL to the user** and tell them: ### Step 4: Exchange the code The user will paste back either a URL like `http://localhost:1/?code=4/0A...&scope=...` -or just the code string. Either works: +or just the code string. Either works. The `--auth-url` step stores a temporary +pending OAuth session locally so `--auth-code` can complete the PKCE exchange +later, even on headless systems: ```bash $GSETUP --auth-code "THE_URL_OR_CODE_THE_USER_PASTED" @@ -119,6 +121,7 @@ Should print `AUTHENTICATED`. Setup is complete — token refreshes automaticall ### Notes - Token is stored at `~/.hermes/google_token.json` and auto-refreshes. +- Pending OAuth session state/verifier are stored temporarily at `~/.hermes/google_oauth_pending.json` until exchange completes. - To revoke: `$GSETUP --revoke` ## Usage diff --git a/skills/productivity/google-workspace/scripts/setup.py b/skills/productivity/google-workspace/scripts/setup.py index 44a5a097..14f9c6bf 100644 --- a/skills/productivity/google-workspace/scripts/setup.py +++ b/skills/productivity/google-workspace/scripts/setup.py @@ -31,6 +31,7 @@ from pathlib import Path HERMES_HOME = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) TOKEN_PATH = HERMES_HOME / "google_token.json" CLIENT_SECRET_PATH = HERMES_HOME / "google_client_secret.json" +PENDING_AUTH_PATH = HERMES_HOME / "google_oauth_pending.json" SCOPES = [ "https://www.googleapis.com/auth/gmail.readonly", @@ -141,6 +142,58 @@ def store_client_secret(path: str): print(f"OK: Client secret saved to {CLIENT_SECRET_PATH}") +def _save_pending_auth(*, state: str, code_verifier: str): + """Persist the OAuth session bits needed for a later token exchange.""" + PENDING_AUTH_PATH.write_text( + json.dumps( + { + "state": state, + "code_verifier": code_verifier, + "redirect_uri": REDIRECT_URI, + }, + indent=2, + ) + ) + + +def _load_pending_auth() -> dict: + """Load the pending OAuth session created by get_auth_url().""" + if not PENDING_AUTH_PATH.exists(): + print("ERROR: No pending OAuth session found. Run --auth-url first.") + sys.exit(1) + + try: + data = json.loads(PENDING_AUTH_PATH.read_text()) + except Exception as e: + print(f"ERROR: Could not read pending OAuth session: {e}") + print("Run --auth-url again to start a fresh OAuth session.") + sys.exit(1) + + if not data.get("state") or not data.get("code_verifier"): + print("ERROR: Pending OAuth session is missing PKCE data.") + print("Run --auth-url again to start a fresh OAuth session.") + sys.exit(1) + + return data + + +def _extract_code_and_state(code_or_url: str) -> tuple[str, str | None]: + """Accept either a raw auth code or the full redirect URL pasted by the user.""" + if not code_or_url.startswith("http"): + return code_or_url, None + + from urllib.parse import parse_qs, urlparse + + parsed = urlparse(code_or_url) + params = parse_qs(parsed.query) + if "code" not in params: + print("ERROR: No 'code' parameter found in URL.") + sys.exit(1) + + state = params.get("state", [None])[0] + return params["code"][0], state + + def get_auth_url(): """Print the OAuth authorization URL. User visits this in a browser.""" if not CLIENT_SECRET_PATH.exists(): @@ -154,11 +207,13 @@ def get_auth_url(): str(CLIENT_SECRET_PATH), scopes=SCOPES, redirect_uri=REDIRECT_URI, + autogenerate_code_verifier=True, ) - auth_url, _ = flow.authorization_url( + auth_url, state = flow.authorization_url( access_type="offline", prompt="consent", ) + _save_pending_auth(state=state, code_verifier=flow.code_verifier) # Print just the URL so the agent can extract it cleanly print(auth_url) @@ -169,26 +224,23 @@ def exchange_auth_code(code: str): print("ERROR: No client secret stored. Run --client-secret first.") sys.exit(1) + pending_auth = _load_pending_auth() + code, returned_state = _extract_code_and_state(code) + if returned_state and returned_state != pending_auth["state"]: + print("ERROR: OAuth state mismatch. Run --auth-url again to start a fresh session.") + sys.exit(1) + _ensure_deps() from google_auth_oauthlib.flow import Flow flow = Flow.from_client_secrets_file( str(CLIENT_SECRET_PATH), scopes=SCOPES, - redirect_uri=REDIRECT_URI, + redirect_uri=pending_auth.get("redirect_uri", REDIRECT_URI), + state=pending_auth["state"], + code_verifier=pending_auth["code_verifier"], ) - # The code might come as a full redirect URL or just the code itself - if code.startswith("http"): - # Extract code from redirect URL: http://localhost:1/?code=CODE&scope=... - from urllib.parse import urlparse, parse_qs - parsed = urlparse(code) - params = parse_qs(parsed.query) - if "code" not in params: - print("ERROR: No 'code' parameter found in URL.") - sys.exit(1) - code = params["code"][0] - try: flow.fetch_token(code=code) except Exception as e: @@ -198,6 +250,7 @@ def exchange_auth_code(code: str): creds = flow.credentials TOKEN_PATH.write_text(creds.to_json()) + PENDING_AUTH_PATH.unlink(missing_ok=True) print(f"OK: Authenticated. Token saved to {TOKEN_PATH}") @@ -229,6 +282,7 @@ def revoke(): print(f"Remote revocation failed (token may already be invalid): {e}") TOKEN_PATH.unlink(missing_ok=True) + PENDING_AUTH_PATH.unlink(missing_ok=True) print(f"Deleted {TOKEN_PATH}") diff --git a/tests/skills/test_google_oauth_setup.py b/tests/skills/test_google_oauth_setup.py new file mode 100644 index 00000000..361bb7e2 --- /dev/null +++ b/tests/skills/test_google_oauth_setup.py @@ -0,0 +1,203 @@ +"""Regression tests for Google Workspace OAuth setup. + +These tests cover the headless/manual auth-code flow where the browser step and +code exchange happen in separate process invocations. +""" + +import importlib.util +import json +import sys +import types +from pathlib import Path + +import pytest + + +SCRIPT_PATH = ( + Path(__file__).resolve().parents[2] + / "skills/productivity/google-workspace/scripts/setup.py" +) + + +class FakeCredentials: + def __init__(self, payload=None): + self._payload = payload or { + "token": "access-token", + "refresh_token": "refresh-token", + "token_uri": "https://oauth2.googleapis.com/token", + "client_id": "client-id", + "client_secret": "client-secret", + "scopes": ["scope-a"], + } + + def to_json(self): + return json.dumps(self._payload) + + +class FakeFlow: + created = [] + default_state = "generated-state" + default_verifier = "generated-code-verifier" + credentials_payload = None + fetch_error = None + + def __init__( + self, + client_secrets_file, + scopes, + *, + redirect_uri=None, + state=None, + code_verifier=None, + autogenerate_code_verifier=False, + ): + self.client_secrets_file = client_secrets_file + self.scopes = scopes + self.redirect_uri = redirect_uri + self.state = state + self.code_verifier = code_verifier + self.autogenerate_code_verifier = autogenerate_code_verifier + self.authorization_kwargs = None + self.fetch_token_calls = [] + self.credentials = FakeCredentials(self.credentials_payload) + + if autogenerate_code_verifier and not self.code_verifier: + self.code_verifier = self.default_verifier + if not self.state: + self.state = self.default_state + + @classmethod + def reset(cls): + cls.created = [] + cls.default_state = "generated-state" + cls.default_verifier = "generated-code-verifier" + cls.credentials_payload = None + cls.fetch_error = None + + @classmethod + def from_client_secrets_file(cls, client_secrets_file, scopes, **kwargs): + inst = cls(client_secrets_file, scopes, **kwargs) + cls.created.append(inst) + return inst + + def authorization_url(self, **kwargs): + self.authorization_kwargs = kwargs + return f"https://auth.example/authorize?state={self.state}", self.state + + def fetch_token(self, **kwargs): + self.fetch_token_calls.append(kwargs) + if self.fetch_error: + raise self.fetch_error + + +@pytest.fixture +def setup_module(monkeypatch, tmp_path): + FakeFlow.reset() + + google_auth_module = types.ModuleType("google_auth_oauthlib") + flow_module = types.ModuleType("google_auth_oauthlib.flow") + flow_module.Flow = FakeFlow + google_auth_module.flow = flow_module + monkeypatch.setitem(sys.modules, "google_auth_oauthlib", google_auth_module) + monkeypatch.setitem(sys.modules, "google_auth_oauthlib.flow", flow_module) + + spec = importlib.util.spec_from_file_location("google_workspace_setup_test", SCRIPT_PATH) + module = importlib.util.module_from_spec(spec) + assert spec.loader is not None + spec.loader.exec_module(module) + + monkeypatch.setattr(module, "_ensure_deps", lambda: None) + monkeypatch.setattr(module, "CLIENT_SECRET_PATH", tmp_path / "google_client_secret.json") + monkeypatch.setattr(module, "TOKEN_PATH", tmp_path / "google_token.json") + monkeypatch.setattr(module, "PENDING_AUTH_PATH", tmp_path / "google_oauth_pending.json", raising=False) + + client_secret = { + "installed": { + "client_id": "client-id", + "client_secret": "client-secret", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + } + } + module.CLIENT_SECRET_PATH.write_text(json.dumps(client_secret)) + return module + + +class TestGetAuthUrl: + def test_persists_state_and_code_verifier_for_later_exchange(self, setup_module, capsys): + setup_module.get_auth_url() + + out = capsys.readouterr().out.strip() + assert out == "https://auth.example/authorize?state=generated-state" + + saved = json.loads(setup_module.PENDING_AUTH_PATH.read_text()) + assert saved["state"] == "generated-state" + assert saved["code_verifier"] == "generated-code-verifier" + + flow = FakeFlow.created[-1] + assert flow.autogenerate_code_verifier is True + assert flow.authorization_kwargs == {"access_type": "offline", "prompt": "consent"} + + +class TestExchangeAuthCode: + def test_reuses_saved_pkce_material_for_plain_code(self, setup_module): + setup_module.PENDING_AUTH_PATH.write_text( + json.dumps({"state": "saved-state", "code_verifier": "saved-verifier"}) + ) + + setup_module.exchange_auth_code("4/test-auth-code") + + flow = FakeFlow.created[-1] + assert flow.state == "saved-state" + assert flow.code_verifier == "saved-verifier" + assert flow.fetch_token_calls == [{"code": "4/test-auth-code"}] + assert json.loads(setup_module.TOKEN_PATH.read_text())["token"] == "access-token" + assert not setup_module.PENDING_AUTH_PATH.exists() + + def test_extracts_code_from_redirect_url_and_checks_state(self, setup_module): + setup_module.PENDING_AUTH_PATH.write_text( + json.dumps({"state": "saved-state", "code_verifier": "saved-verifier"}) + ) + + setup_module.exchange_auth_code( + "http://localhost:1/?code=4/extracted-code&state=saved-state&scope=gmail" + ) + + flow = FakeFlow.created[-1] + assert flow.fetch_token_calls == [{"code": "4/extracted-code"}] + + def test_rejects_state_mismatch(self, setup_module, capsys): + setup_module.PENDING_AUTH_PATH.write_text( + json.dumps({"state": "saved-state", "code_verifier": "saved-verifier"}) + ) + + with pytest.raises(SystemExit): + setup_module.exchange_auth_code( + "http://localhost:1/?code=4/extracted-code&state=wrong-state" + ) + + out = capsys.readouterr().out + assert "state mismatch" in out.lower() + assert not setup_module.TOKEN_PATH.exists() + + def test_requires_pending_auth_session(self, setup_module, capsys): + with pytest.raises(SystemExit): + setup_module.exchange_auth_code("4/test-auth-code") + + out = capsys.readouterr().out + assert "run --auth-url first" in out.lower() + assert not setup_module.TOKEN_PATH.exists() + + def test_keeps_pending_auth_session_when_exchange_fails(self, setup_module, capsys): + setup_module.PENDING_AUTH_PATH.write_text( + json.dumps({"state": "saved-state", "code_verifier": "saved-verifier"}) + ) + FakeFlow.fetch_error = Exception("invalid_grant: Missing code verifier") + + with pytest.raises(SystemExit): + setup_module.exchange_auth_code("4/test-auth-code") + + out = capsys.readouterr().out + assert "token exchange failed" in out.lower() + assert setup_module.PENDING_AUTH_PATH.exists() + assert not setup_module.TOKEN_PATH.exists()