fix: dynamic grace window for missed cron job catch-up

Replace hardcoded 120-second grace period with a dynamic window that
scales with the job's scheduling frequency (half the period, clamped
to [120s, 2h]). Daily jobs now catch up if missed by up to 2 hours
instead of being silently skipped after just 2 minutes.
This commit is contained in:
Teknium 2026-03-22 04:04:24 -07:00
parent 189214a69d
commit 21ffadc2a6
No known key found for this signature in database
2 changed files with 50 additions and 10 deletions

View file

@ -248,6 +248,38 @@ def _recoverable_oneshot_run_at(
return None
def _compute_grace_seconds(schedule: dict) -> int:
"""Compute how late a job can be and still catch up instead of fast-forwarding.
Uses half the schedule period, clamped between 120 seconds and 2 hours.
This ensures daily jobs can catch up if missed by up to 2 hours,
while frequent jobs (every 5-10 min) still fast-forward quickly.
"""
MIN_GRACE = 120
MAX_GRACE = 7200 # 2 hours
kind = schedule.get("kind")
if kind == "interval":
period_seconds = schedule.get("minutes", 1) * 60
grace = period_seconds // 2
return max(MIN_GRACE, min(grace, MAX_GRACE))
if kind == "cron" and HAS_CRONITER:
try:
now = _hermes_now()
cron = croniter(schedule["expr"], now)
first = cron.get_next(datetime)
second = cron.get_next(datetime)
period_seconds = int((second - first).total_seconds())
grace = period_seconds // 2
return max(MIN_GRACE, min(grace, MAX_GRACE))
except Exception:
pass
return MIN_GRACE
def compute_next_run(schedule: Dict[str, Any], last_run_at: Optional[str] = None) -> Optional[str]:
"""
Compute the next run time for a schedule.
@ -610,16 +642,18 @@ def get_due_jobs() -> List[Dict[str, Any]]:
# For recurring jobs, check if the scheduled time is stale
# (gateway was down and missed the window). Fast-forward to
# the next future occurrence instead of firing a stale run.
if kind in ("cron", "interval") and (now - next_run_dt).total_seconds() > 120:
# More than 2 minutes late — this is a missed run, not a current one.
# Recompute next_run_at to the next future occurrence.
grace = _compute_grace_seconds(schedule)
if kind in ("cron", "interval") and (now - next_run_dt).total_seconds() > grace:
# Job is past its catch-up grace window — this is a stale missed run.
# Grace scales with schedule period: daily=2h, hourly=30m, 10min=5m.
new_next = compute_next_run(schedule, now.isoformat())
if new_next:
logger.info(
"Job '%s' missed its scheduled time (%s). "
"Job '%s' missed its scheduled time (%s, grace=%ds). "
"Fast-forwarding to next run: %s",
job.get("name", job["id"]),
next_run,
grace,
new_next,
)
# Update the job in storage