feat: compress cron management into one tool

This commit is contained in:
teknium1 2026-03-14 12:21:50 -07:00
parent c6cc92295c
commit df5c61b37c
15 changed files with 574 additions and 397 deletions

View file

@ -16,6 +16,8 @@ from cron.jobs import (
get_job,
list_jobs,
update_job,
pause_job,
resume_job,
remove_job,
mark_job_run,
get_due_jobs,
@ -233,14 +235,18 @@ class TestUpdateJob:
job = create_job(prompt="Daily report", schedule="every 1h")
assert job["schedule"]["kind"] == "interval"
assert job["schedule"]["minutes"] == 60
old_next_run = job["next_run_at"]
new_schedule = parse_schedule("every 2h")
updated = update_job(job["id"], {"schedule": new_schedule})
updated = update_job(job["id"], {"schedule": new_schedule, "schedule_display": new_schedule["display"]})
assert updated is not None
assert updated["schedule"]["kind"] == "interval"
assert updated["schedule"]["minutes"] == 120
assert updated["schedule_display"] == "every 120m"
assert updated["next_run_at"] != old_next_run
# Verify persisted to disk
fetched = get_job(job["id"])
assert fetched["schedule"]["minutes"] == 120
assert fetched["schedule_display"] == "every 120m"
def test_update_enable_disable(self, tmp_cron_dir):
job = create_job(prompt="Toggle me", schedule="every 1h")
@ -255,6 +261,26 @@ class TestUpdateJob:
assert result is None
class TestPauseResumeJob:
def test_pause_sets_state(self, tmp_cron_dir):
job = create_job(prompt="Pause me", schedule="every 1h")
paused = pause_job(job["id"], reason="user paused")
assert paused is not None
assert paused["enabled"] is False
assert paused["state"] == "paused"
assert paused["paused_reason"] == "user paused"
def test_resume_reenables_job(self, tmp_cron_dir):
job = create_job(prompt="Resume me", schedule="every 1h")
pause_job(job["id"], reason="user paused")
resumed = resume_job(job["id"])
assert resumed is not None
assert resumed["enabled"] is True
assert resumed["state"] == "scheduled"
assert resumed["paused_at"] is None
assert resumed["paused_reason"] is None
class TestMarkJobRun:
def test_increments_completed(self, tmp_cron_dir):
job = create_job(prompt="Test", schedule="every 1h")