Merge pull request #1303 from NousResearch/hermes/hermes-aa653753

feat(skills): integrate skills.sh as a hub source
This commit is contained in:
Teknium 2026-03-14 09:48:18 -07:00 committed by GitHub
commit 681f1068ea
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 1413 additions and 47 deletions

View file

@ -26,6 +26,7 @@ from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlparse, urlunparse
import httpx
import yaml
@ -69,6 +70,7 @@ class SkillMeta:
repo: Optional[str] = None
path: Optional[str] = None
tags: List[str] = field(default_factory=list)
extra: Dict[str, Any] = field(default_factory=dict)
@dataclass
@ -79,6 +81,7 @@ class SkillBundle:
source: str
identifier: str
trust_level: str
metadata: Dict[str, Any] = field(default_factory=dict)
# ---------------------------------------------------------------------------
@ -497,6 +500,643 @@ class GitHubSource(SkillSource):
return {}
# ---------------------------------------------------------------------------
# Well-known Agent Skills endpoint source adapter
# ---------------------------------------------------------------------------
class WellKnownSkillSource(SkillSource):
"""Read skills from a domain exposing /.well-known/skills/index.json."""
BASE_PATH = "/.well-known/skills"
def source_id(self) -> str:
return "well-known"
def trust_level_for(self, identifier: str) -> str:
return "community"
def search(self, query: str, limit: int = 10) -> List[SkillMeta]:
index_url = self._query_to_index_url(query)
if not index_url:
return []
parsed = self._parse_index(index_url)
if not parsed:
return []
results: List[SkillMeta] = []
for entry in parsed["skills"][:limit]:
name = entry.get("name")
if not isinstance(name, str) or not name:
continue
description = entry.get("description", "")
files = entry.get("files", ["SKILL.md"])
results.append(SkillMeta(
name=name,
description=str(description),
source="well-known",
identifier=self._wrap_identifier(parsed["base_url"], name),
trust_level="community",
path=name,
extra={
"index_url": parsed["index_url"],
"base_url": parsed["base_url"],
"files": files if isinstance(files, list) else ["SKILL.md"],
},
))
return results
def inspect(self, identifier: str) -> Optional[SkillMeta]:
parsed = self._parse_identifier(identifier)
if not parsed:
return None
entry = self._index_entry(parsed["index_url"], parsed["skill_name"])
if not entry:
return None
skill_md = self._fetch_text(f"{parsed['skill_url']}/SKILL.md")
if skill_md is None:
return None
fm = GitHubSource._parse_frontmatter_quick(skill_md)
description = str(fm.get("description") or entry.get("description") or "")
name = str(fm.get("name") or parsed["skill_name"])
return SkillMeta(
name=name,
description=description,
source="well-known",
identifier=self._wrap_identifier(parsed["base_url"], parsed["skill_name"]),
trust_level="community",
path=parsed["skill_name"],
extra={
"index_url": parsed["index_url"],
"base_url": parsed["base_url"],
"files": entry.get("files", ["SKILL.md"]),
"endpoint": parsed["skill_url"],
},
)
def fetch(self, identifier: str) -> Optional[SkillBundle]:
parsed = self._parse_identifier(identifier)
if not parsed:
return None
entry = self._index_entry(parsed["index_url"], parsed["skill_name"])
if not entry:
return None
files = entry.get("files", ["SKILL.md"])
if not isinstance(files, list) or not files:
files = ["SKILL.md"]
downloaded: Dict[str, str] = {}
for rel_path in files:
if not isinstance(rel_path, str) or not rel_path:
continue
text = self._fetch_text(f"{parsed['skill_url']}/{rel_path}")
if text is None:
return None
downloaded[rel_path] = text
if "SKILL.md" not in downloaded:
return None
return SkillBundle(
name=parsed["skill_name"],
files=downloaded,
source="well-known",
identifier=self._wrap_identifier(parsed["base_url"], parsed["skill_name"]),
trust_level="community",
metadata={
"index_url": parsed["index_url"],
"base_url": parsed["base_url"],
"endpoint": parsed["skill_url"],
"files": files,
},
)
def _query_to_index_url(self, query: str) -> Optional[str]:
query = query.strip()
if not query.startswith(("http://", "https://")):
return None
if query.endswith("/index.json"):
return query
if f"{self.BASE_PATH}/" in query:
base_url = query.split(f"{self.BASE_PATH}/", 1)[0] + self.BASE_PATH
return f"{base_url}/index.json"
return query.rstrip("/") + f"{self.BASE_PATH}/index.json"
def _parse_identifier(self, identifier: str) -> Optional[dict]:
raw = identifier[len("well-known:"):] if identifier.startswith("well-known:") else identifier
if not raw.startswith(("http://", "https://")):
return None
parsed_url = urlparse(raw)
clean_url = urlunparse(parsed_url._replace(fragment=""))
fragment = parsed_url.fragment
if clean_url.endswith("/index.json"):
if not fragment:
return None
base_url = clean_url[:-len("/index.json")]
skill_name = fragment
skill_url = f"{base_url}/{skill_name}"
return {
"index_url": clean_url,
"base_url": base_url,
"skill_name": skill_name,
"skill_url": skill_url,
}
if clean_url.endswith("/SKILL.md"):
skill_url = clean_url[:-len("/SKILL.md")]
else:
skill_url = clean_url.rstrip("/")
if f"{self.BASE_PATH}/" not in skill_url:
return None
base_url, skill_name = skill_url.rsplit("/", 1)
return {
"index_url": f"{base_url}/index.json",
"base_url": base_url,
"skill_name": skill_name,
"skill_url": skill_url,
}
def _parse_index(self, index_url: str) -> Optional[dict]:
cache_key = f"well_known_index_{hashlib.md5(index_url.encode()).hexdigest()}"
cached = _read_index_cache(cache_key)
if isinstance(cached, dict) and isinstance(cached.get("skills"), list):
return cached
try:
resp = httpx.get(index_url, timeout=20, follow_redirects=True)
if resp.status_code != 200:
return None
data = resp.json()
except (httpx.HTTPError, json.JSONDecodeError):
return None
skills = data.get("skills", []) if isinstance(data, dict) else []
if not isinstance(skills, list):
return None
parsed = {
"index_url": index_url,
"base_url": index_url[:-len("/index.json")],
"skills": skills,
}
_write_index_cache(cache_key, parsed)
return parsed
def _index_entry(self, index_url: str, skill_name: str) -> Optional[dict]:
parsed = self._parse_index(index_url)
if not parsed:
return None
for entry in parsed["skills"]:
if isinstance(entry, dict) and entry.get("name") == skill_name:
return entry
return None
@staticmethod
def _fetch_text(url: str) -> Optional[str]:
try:
resp = httpx.get(url, timeout=20, follow_redirects=True)
if resp.status_code == 200:
return resp.text
except httpx.HTTPError:
return None
return None
@staticmethod
def _wrap_identifier(base_url: str, skill_name: str) -> str:
return f"well-known:{base_url.rstrip('/')}/{skill_name}"
# ---------------------------------------------------------------------------
# skills.sh source adapter
# ---------------------------------------------------------------------------
class SkillsShSource(SkillSource):
"""Discover skills via skills.sh and fetch content from the underlying GitHub repo."""
BASE_URL = "https://skills.sh"
SEARCH_URL = f"{BASE_URL}/api/search"
_SKILL_LINK_RE = re.compile(r'href=["\']/(?P<id>(?!agents/|_next/|api/)[^"\'/]+/[^"\'/]+/[^"\'/]+)["\']')
_INSTALL_CMD_RE = re.compile(
r'npx\s+skills\s+add\s+(?P<repo>https?://github\.com/[^\s<]+|[^\s<]+)'
r'(?:\s+--skill\s+(?P<skill>[^\s<]+))?',
re.IGNORECASE,
)
_PAGE_H1_RE = re.compile(r'<h1[^>]*>(?P<title>.*?)</h1>', re.IGNORECASE | re.DOTALL)
_PROSE_H1_RE = re.compile(
r'<div[^>]*class=["\'][^"\']*prose[^"\']*["\'][^>]*>.*?<h1[^>]*>(?P<title>.*?)</h1>',
re.IGNORECASE | re.DOTALL,
)
_PROSE_P_RE = re.compile(
r'<div[^>]*class=["\'][^"\']*prose[^"\']*["\'][^>]*>.*?<p[^>]*>(?P<body>.*?)</p>',
re.IGNORECASE | re.DOTALL,
)
_WEEKLY_INSTALLS_RE = re.compile(r'Weekly Installs.*?children\\":\\"(?P<count>[0-9.,Kk]+)\\"', re.DOTALL)
def __init__(self, auth: GitHubAuth):
self.auth = auth
self.github = GitHubSource(auth=auth)
def source_id(self) -> str:
return "skills-sh"
def trust_level_for(self, identifier: str) -> str:
return self.github.trust_level_for(self._normalize_identifier(identifier))
def search(self, query: str, limit: int = 10) -> List[SkillMeta]:
if not query.strip():
return self._featured_skills(limit)
cache_key = f"skills_sh_search_{hashlib.md5(f'{query}|{limit}'.encode()).hexdigest()}"
cached = _read_index_cache(cache_key)
if cached is not None:
return [SkillMeta(**item) for item in cached][:limit]
try:
resp = httpx.get(
self.SEARCH_URL,
params={"q": query, "limit": limit},
timeout=20,
)
if resp.status_code != 200:
return []
data = resp.json()
except (httpx.HTTPError, json.JSONDecodeError):
return []
items = data.get("skills", []) if isinstance(data, dict) else []
if not isinstance(items, list):
return []
results: List[SkillMeta] = []
for item in items[:limit]:
meta = self._meta_from_search_item(item)
if meta:
results.append(meta)
_write_index_cache(cache_key, [_skill_meta_to_dict(item) for item in results])
return results
def fetch(self, identifier: str) -> Optional[SkillBundle]:
canonical = self._normalize_identifier(identifier)
detail = self._fetch_detail_page(canonical)
for candidate in self._candidate_identifiers(canonical):
bundle = self.github.fetch(candidate)
if bundle:
bundle.source = "skills.sh"
bundle.identifier = self._wrap_identifier(canonical)
bundle.metadata.update(self._detail_to_metadata(canonical, detail))
return bundle
resolved = self._discover_identifier(canonical, detail=detail)
if resolved:
bundle = self.github.fetch(resolved)
if bundle:
bundle.source = "skills.sh"
bundle.identifier = self._wrap_identifier(canonical)
bundle.metadata.update(self._detail_to_metadata(canonical, detail))
return bundle
return None
def inspect(self, identifier: str) -> Optional[SkillMeta]:
canonical = self._normalize_identifier(identifier)
detail: Optional[dict] = None
for candidate in self._candidate_identifiers(canonical):
meta = self.github.inspect(candidate)
if meta:
detail = self._fetch_detail_page(canonical)
return self._finalize_inspect_meta(meta, canonical, detail)
detail = self._fetch_detail_page(canonical)
resolved = self._discover_identifier(canonical, detail=detail)
if resolved:
meta = self.github.inspect(resolved)
if meta:
return self._finalize_inspect_meta(meta, canonical, detail)
return None
def _featured_skills(self, limit: int) -> List[SkillMeta]:
cache_key = "skills_sh_featured"
cached = _read_index_cache(cache_key)
if cached is not None:
return [SkillMeta(**item) for item in cached][:limit]
try:
resp = httpx.get(self.BASE_URL, timeout=20)
if resp.status_code != 200:
return []
except httpx.HTTPError:
return []
seen: set[str] = set()
results: List[SkillMeta] = []
for match in self._SKILL_LINK_RE.finditer(resp.text):
canonical = match.group("id")
if canonical in seen:
continue
seen.add(canonical)
parts = canonical.split("/", 2)
if len(parts) < 3:
continue
repo = f"{parts[0]}/{parts[1]}"
skill_path = parts[2]
results.append(SkillMeta(
name=skill_path.split("/")[-1],
description=f"Featured on skills.sh from {repo}",
source="skills.sh",
identifier=self._wrap_identifier(canonical),
trust_level=self.github.trust_level_for(canonical),
repo=repo,
path=skill_path,
))
if len(results) >= limit:
break
_write_index_cache(cache_key, [_skill_meta_to_dict(item) for item in results])
return results
def _meta_from_search_item(self, item: dict) -> Optional[SkillMeta]:
if not isinstance(item, dict):
return None
canonical = item.get("id")
repo = item.get("source")
skill_path = item.get("skillId")
if not isinstance(canonical, str) or canonical.count("/") < 2:
if not (isinstance(repo, str) and isinstance(skill_path, str)):
return None
canonical = f"{repo}/{skill_path}"
parts = canonical.split("/", 2)
if len(parts) < 3:
return None
repo = f"{parts[0]}/{parts[1]}"
skill_path = parts[2]
installs = item.get("installs")
installs_label = f" · {int(installs):,} installs" if isinstance(installs, int) else ""
return SkillMeta(
name=str(item.get("name") or skill_path.split("/")[-1]),
description=f"Indexed by skills.sh from {repo}{installs_label}",
source="skills.sh",
identifier=self._wrap_identifier(canonical),
trust_level=self.github.trust_level_for(canonical),
repo=repo,
path=skill_path,
extra={
"installs": installs,
"detail_url": f"{self.BASE_URL}/{canonical}",
"repo_url": f"https://github.com/{repo}",
},
)
def _fetch_detail_page(self, identifier: str) -> Optional[dict]:
cache_key = f"skills_sh_detail_{hashlib.md5(identifier.encode()).hexdigest()}"
cached = _read_index_cache(cache_key)
if isinstance(cached, dict):
return cached
try:
resp = httpx.get(f"{self.BASE_URL}/{identifier}", timeout=20)
if resp.status_code != 200:
return None
except httpx.HTTPError:
return None
detail = self._parse_detail_page(identifier, resp.text)
if detail:
_write_index_cache(cache_key, detail)
return detail
def _parse_detail_page(self, identifier: str, html: str) -> Optional[dict]:
parts = identifier.split("/", 2)
if len(parts) < 3:
return None
default_repo = f"{parts[0]}/{parts[1]}"
skill_token = parts[2]
repo = default_repo
install_skill = skill_token
install_command = None
install_match = self._INSTALL_CMD_RE.search(html)
if install_match:
install_command = install_match.group(0).strip()
repo_value = (install_match.group("repo") or "").strip()
install_skill = (install_match.group("skill") or install_skill).strip()
repo = self._extract_repo_slug(repo_value) or repo
page_title = self._extract_first_match(self._PAGE_H1_RE, html)
body_title = self._extract_first_match(self._PROSE_H1_RE, html)
body_summary = self._extract_first_match(self._PROSE_P_RE, html)
weekly_installs = self._extract_weekly_installs(html)
security_audits = self._extract_security_audits(html, identifier)
return {
"repo": repo,
"install_skill": install_skill,
"page_title": page_title,
"body_title": body_title,
"body_summary": body_summary,
"weekly_installs": weekly_installs,
"install_command": install_command,
"repo_url": f"https://github.com/{repo}",
"detail_url": f"{self.BASE_URL}/{identifier}",
"security_audits": security_audits,
}
def _discover_identifier(self, identifier: str, detail: Optional[dict] = None) -> Optional[str]:
parts = identifier.split("/", 2)
if len(parts) < 3:
return None
default_repo = f"{parts[0]}/{parts[1]}"
repo = detail.get("repo", default_repo) if isinstance(detail, dict) else default_repo
skill_token = parts[2]
tokens = [skill_token]
if isinstance(detail, dict):
tokens.extend([
detail.get("install_skill", ""),
detail.get("page_title", ""),
detail.get("body_title", ""),
])
for base_path in ("skills/", ".agents/skills/", ".claude/skills/"):
try:
skills = self.github._list_skills_in_repo(repo, base_path)
except Exception:
continue
for meta in skills:
if self._matches_skill_tokens(meta, tokens):
return meta.identifier
return None
def _finalize_inspect_meta(self, meta: SkillMeta, canonical: str, detail: Optional[dict]) -> SkillMeta:
meta.source = "skills.sh"
meta.identifier = self._wrap_identifier(canonical)
meta.trust_level = self.trust_level_for(canonical)
merged_extra = dict(meta.extra)
merged_extra.update(self._detail_to_metadata(canonical, detail))
meta.extra = merged_extra
if isinstance(detail, dict):
body_summary = detail.get("body_summary")
weekly_installs = detail.get("weekly_installs")
if body_summary:
meta.description = body_summary
elif meta.description and weekly_installs:
meta.description = f"{meta.description} · {weekly_installs} weekly installs on skills.sh"
return meta
@classmethod
def _matches_skill_tokens(cls, meta: SkillMeta, skill_tokens: List[str]) -> bool:
candidates = set()
candidates.update(cls._token_variants(meta.name))
candidates.update(cls._token_variants(meta.path))
candidates.update(cls._token_variants(meta.identifier.split("/", 2)[-1] if meta.identifier else None))
for token in skill_tokens:
variants = cls._token_variants(token)
if variants & candidates:
return True
return False
@staticmethod
def _token_variants(value: Optional[str]) -> set[str]:
if not value:
return set()
plain = SkillsShSource._strip_html(str(value)).strip().strip("/").lower()
if not plain:
return set()
base = plain.split("/")[-1]
sanitized = re.sub(r'[^a-z0-9/_-]+', '-', plain).strip('-')
sanitized_base = sanitized.split("/")[-1] if sanitized else ""
slash_tail = plain.split("/")[-1]
slash_tail_clean = slash_tail.lstrip('@')
slash_tail_clean = slash_tail_clean.split('/')[-1]
variants = {
plain,
plain.replace("_", "-"),
plain.replace("/", "-"),
base,
base.replace("_", "-"),
base.replace("/", "-"),
sanitized,
sanitized.replace("/", "-") if sanitized else "",
sanitized_base,
slash_tail_clean,
slash_tail_clean.replace("_", "-"),
}
return {v for v in variants if v}
@staticmethod
def _extract_repo_slug(repo_value: str) -> Optional[str]:
repo_value = repo_value.strip()
if repo_value.startswith("https://github.com/"):
repo_value = repo_value[len("https://github.com/"):]
repo_value = repo_value.strip("/")
parts = repo_value.split("/")
if len(parts) >= 2:
return f"{parts[0]}/{parts[1]}"
return None
@staticmethod
def _extract_first_match(pattern: re.Pattern, text: str) -> Optional[str]:
match = pattern.search(text)
if not match:
return None
value = next((group for group in match.groups() if group), None)
if value is None:
return None
return SkillsShSource._strip_html(value).strip() or None
def _detail_to_metadata(self, canonical: str, detail: Optional[dict]) -> Dict[str, Any]:
parts = canonical.split("/", 2)
repo = f"{parts[0]}/{parts[1]}" if len(parts) >= 2 else ""
metadata = {
"detail_url": f"{self.BASE_URL}/{canonical}",
}
if repo:
metadata["repo_url"] = f"https://github.com/{repo}"
if isinstance(detail, dict):
for key in ("weekly_installs", "install_command", "repo_url", "detail_url", "security_audits"):
value = detail.get(key)
if value:
metadata[key] = value
return metadata
@staticmethod
def _extract_weekly_installs(html: str) -> Optional[str]:
match = SkillsShSource._WEEKLY_INSTALLS_RE.search(html)
if not match:
return None
return match.group("count")
@staticmethod
def _extract_security_audits(html: str, identifier: str) -> Dict[str, str]:
audits: Dict[str, str] = {}
for audit in ("agent-trust-hub", "socket", "snyk"):
idx = html.find(f"/security/{audit}")
if idx == -1:
continue
window = html[idx:idx + 500]
match = re.search(r'(Pass|Warn|Fail)', window, re.IGNORECASE)
if match:
audits[audit] = match.group(1).title()
return audits
@staticmethod
def _strip_html(value: str) -> str:
return re.sub(r'<[^>]+>', '', value)
@staticmethod
def _normalize_identifier(identifier: str) -> str:
if identifier.startswith("skills-sh/"):
return identifier[len("skills-sh/"):]
if identifier.startswith("skills.sh/"):
return identifier[len("skills.sh/"):]
return identifier
@staticmethod
def _candidate_identifiers(identifier: str) -> List[str]:
parts = identifier.split("/", 2)
if len(parts) < 3:
return [identifier]
repo = f"{parts[0]}/{parts[1]}"
skill_path = parts[2].lstrip("/")
candidates = [
f"{repo}/{skill_path}",
f"{repo}/skills/{skill_path}",
f"{repo}/.agents/skills/{skill_path}",
f"{repo}/.claude/skills/{skill_path}",
]
seen = set()
deduped: List[str] = []
for candidate in candidates:
if candidate not in seen:
seen.add(candidate)
deduped.append(candidate)
return deduped
@staticmethod
def _wrap_identifier(identifier: str) -> str:
return f"skills-sh/{identifier}"
# ---------------------------------------------------------------------------
# ClawHub source adapter
# ---------------------------------------------------------------------------
@ -1213,6 +1853,7 @@ def _skill_meta_to_dict(meta: SkillMeta) -> dict:
"repo": meta.repo,
"path": meta.path,
"tags": meta.tags,
"extra": meta.extra,
}
@ -1248,6 +1889,7 @@ class HubLockFile:
skill_hash: str,
install_path: str,
files: List[str],
metadata: Optional[Dict[str, Any]] = None,
) -> None:
data = self.load()
data["installed"][name] = {
@ -1258,6 +1900,7 @@ class HubLockFile:
"content_hash": skill_hash,
"install_path": install_path,
"files": files,
"metadata": metadata or {},
"installed_at": datetime.now(timezone.utc).isoformat(),
"updated_at": datetime.now(timezone.utc).isoformat(),
}
@ -1412,6 +2055,7 @@ def install_from_quarantine(
skill_hash=content_hash(install_dir),
install_path=str(install_dir.relative_to(SKILLS_DIR)),
files=list(bundle.files.keys()),
metadata=bundle.metadata,
)
append_audit_log(
@ -1440,6 +2084,78 @@ def uninstall_skill(skill_name: str) -> Tuple[bool, str]:
return True, f"Uninstalled '{skill_name}' from {entry['install_path']}"
def bundle_content_hash(bundle: SkillBundle) -> str:
"""Compute a deterministic hash for an in-memory skill bundle."""
h = hashlib.sha256()
for rel_path in sorted(bundle.files):
h.update(bundle.files[rel_path].encode("utf-8"))
return f"sha256:{h.hexdigest()[:16]}"
def _source_matches(source: SkillSource, source_name: str) -> bool:
aliases = {
"skills.sh": "skills-sh",
}
normalized = aliases.get(source_name, source_name)
return source.source_id() == normalized
def check_for_skill_updates(
name: Optional[str] = None,
*,
lock: Optional[HubLockFile] = None,
sources: Optional[List[SkillSource]] = None,
auth: Optional[GitHubAuth] = None,
) -> List[dict]:
"""Check installed hub skills for upstream changes."""
lock = lock or HubLockFile()
installed = lock.list_installed()
if name:
installed = [entry for entry in installed if entry.get("name") == name]
if sources is None:
sources = create_source_router(auth=auth)
results: List[dict] = []
for entry in installed:
identifier = entry.get("identifier", "")
source_name = entry.get("source", "")
candidate_sources = [src for src in sources if _source_matches(src, source_name)] or sources
bundle = None
for src in candidate_sources:
try:
bundle = src.fetch(identifier)
except Exception:
bundle = None
if bundle:
break
if not bundle:
results.append({
"name": entry.get("name", ""),
"identifier": identifier,
"source": source_name,
"status": "unavailable",
})
continue
current_hash = entry.get("content_hash", "")
latest_hash = bundle_content_hash(bundle)
status = "up_to_date" if current_hash == latest_hash else "update_available"
results.append({
"name": entry.get("name", ""),
"identifier": identifier,
"source": source_name,
"status": status,
"current_hash": current_hash,
"latest_hash": latest_hash,
"bundle": bundle,
})
return results
def create_source_router(auth: Optional[GitHubAuth] = None) -> List[SkillSource]:
"""
Create all configured source adapters.
@ -1453,6 +2169,8 @@ def create_source_router(auth: Optional[GitHubAuth] = None) -> List[SkillSource]
sources: List[SkillSource] = [
OptionalSkillSource(), # Official optional skills (highest priority)
SkillsShSource(auth=auth),
WellKnownSkillSource(),
GitHubSource(auth=auth, extra_taps=extra_taps),
ClawHubSource(),
ClaudeMarketplaceSource(auth=auth),