Merge PR #298: Make process_registry checkpoint writes atomic
Authored by aydnOktay. Companion to PR #297 (batch_runner). Applies the same atomic write pattern (temp file + fsync + os.replace) to both _write_checkpoint() and recover_from_checkpoint() in process_registry.py. Prevents checkpoint corruption on gateway crashes. Also improves error handling: bare 'pass' replaced with logger.debug(..., exc_info=True) for better debugging.
This commit is contained in:
commit
c05c60665e
1 changed files with 42 additions and 7 deletions
|
|
@ -37,6 +37,7 @@ import shlex
|
||||||
import shutil
|
import shutil
|
||||||
import signal
|
import signal
|
||||||
import subprocess
|
import subprocess
|
||||||
|
import tempfile
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
|
|
@ -690,7 +691,7 @@ class ProcessRegistry:
|
||||||
# ----- Checkpoint (crash recovery) -----
|
# ----- Checkpoint (crash recovery) -----
|
||||||
|
|
||||||
def _write_checkpoint(self):
|
def _write_checkpoint(self):
|
||||||
"""Write running process metadata to checkpoint file."""
|
"""Write running process metadata to checkpoint file atomically."""
|
||||||
try:
|
try:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
entries = []
|
entries = []
|
||||||
|
|
@ -705,12 +706,28 @@ class ProcessRegistry:
|
||||||
"task_id": s.task_id,
|
"task_id": s.task_id,
|
||||||
"session_key": s.session_key,
|
"session_key": s.session_key,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
# Atomic write: temp file + os.replace to avoid corruption on crash
|
||||||
CHECKPOINT_PATH.parent.mkdir(parents=True, exist_ok=True)
|
CHECKPOINT_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||||
CHECKPOINT_PATH.write_text(
|
fd, tmp_path = tempfile.mkstemp(
|
||||||
json.dumps(entries, indent=2), encoding="utf-8"
|
dir=str(CHECKPOINT_PATH.parent),
|
||||||
|
prefix='.checkpoint_',
|
||||||
|
suffix='.tmp',
|
||||||
)
|
)
|
||||||
except Exception:
|
try:
|
||||||
pass # Best-effort
|
with os.fdopen(fd, 'w', encoding='utf-8') as f:
|
||||||
|
json.dump(entries, f, indent=2, ensure_ascii=False)
|
||||||
|
f.flush()
|
||||||
|
os.fsync(f.fileno())
|
||||||
|
os.replace(tmp_path, CHECKPOINT_PATH)
|
||||||
|
except BaseException:
|
||||||
|
try:
|
||||||
|
os.unlink(tmp_path)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug("Failed to write checkpoint file: %s", e, exc_info=True)
|
||||||
|
|
||||||
def recover_from_checkpoint(self) -> int:
|
def recover_from_checkpoint(self) -> int:
|
||||||
"""
|
"""
|
||||||
|
|
@ -757,10 +774,28 @@ class ProcessRegistry:
|
||||||
logger.info("Recovered detached process: %s (pid=%d)", session.command[:60], pid)
|
logger.info("Recovered detached process: %s (pid=%d)", session.command[:60], pid)
|
||||||
|
|
||||||
# Clear the checkpoint (will be rewritten as processes finish)
|
# Clear the checkpoint (will be rewritten as processes finish)
|
||||||
|
# Use atomic write to avoid corruption
|
||||||
try:
|
try:
|
||||||
CHECKPOINT_PATH.write_text("[]", encoding="utf-8")
|
CHECKPOINT_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
fd, tmp_path = tempfile.mkstemp(
|
||||||
|
dir=str(CHECKPOINT_PATH.parent),
|
||||||
|
prefix='.checkpoint_',
|
||||||
|
suffix='.tmp',
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
with os.fdopen(fd, 'w', encoding='utf-8') as f:
|
||||||
|
f.write("[]")
|
||||||
|
f.flush()
|
||||||
|
os.fsync(f.fileno())
|
||||||
|
os.replace(tmp_path, CHECKPOINT_PATH)
|
||||||
|
except BaseException:
|
||||||
|
try:
|
||||||
|
os.unlink(tmp_path)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("Could not write checkpoint file: %s", e)
|
logger.debug("Could not clear checkpoint file: %s", e, exc_info=True)
|
||||||
|
|
||||||
return recovered
|
return recovered
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue