Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions gateway/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5550,6 +5550,19 @@ def _auto_decompose_tick() -> int:
"kanban dispatcher: embedded in gateway (interval=%.1fs)", interval
)
while self._running:
try:
# Reap zombie children before per-board work so a board DB
# failure cannot block cleanup of unrelated workers.
pids = await asyncio.to_thread(_kb.reap_worker_zombies)
if pids:
logger.info(
"kanban dispatcher: reaped %d zombie worker(s), pids=%s",
len(pids),
pids,
)
except Exception:
logger.exception("kanban dispatcher: zombie reaper failed")

try:
if auto_decompose_enabled:
await asyncio.to_thread(_auto_decompose_tick)
Expand Down
59 changes: 27 additions & 32 deletions hermes_cli/kanban_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -4169,6 +4169,30 @@ def _classify_worker_exit(pid: int) -> "tuple[str, Optional[int]]":
return ("unknown", None)


def reap_worker_zombies() -> "list[int]":
"""Reap all zombie children of this process without blocking.

Returns the list of reaped PIDs. Safe to call when there are no
children (returns []). No-op on Windows.
"""
if os.name == "nt":
return []
reaped: "list[int]" = []
try:
while True:
try:
pid, status = os.waitpid(-1, os.WNOHANG)
except ChildProcessError:
break
if pid == 0:
break
_record_worker_exit(pid, status)
reaped.append(pid)
except Exception:
pass
return reaped


def _pid_alive(pid: Optional[int]) -> bool:
"""Return True if ``pid`` is still running on this host.

Expand Down Expand Up @@ -5125,38 +5149,9 @@ def dispatch_once(
``board`` pins workspace/log/db resolution for this tick to a specific
board. When omitted, the current-board resolution chain is used.
"""
# Reap zombie children from previously spawned workers.
# The gateway-embedded dispatcher is the parent of every worker spawned
# via _default_spawn (start_new_session=True only detaches the
# controlling tty, not the parent). Without an explicit waitpid, each
# completed worker becomes a <defunct> entry that lingers until gateway
# exit. WNOHANG keeps this non-blocking; ChildProcessError means no
# children to reap. Bounded: at most one tick's worth of completions
# can be in <defunct> at once.
#
# We also record the exit status keyed by pid, so
# ``detect_crashed_workers`` can distinguish a worker that exited
# cleanly without calling ``kanban_complete`` / ``kanban_block``
# (protocol violation — auto-block) from a real crash (OOM killer,
# SIGKILL, non-zero exit — existing counter behavior).
#
# Windows has no zombies / no os.WNOHANG — subprocess.Popen handles
# are freed when the Python object is garbage-collected or .wait() is
# called explicitly. The kanban dispatcher discards the Popen handle
# after spawn (``_default_spawn`` → abandon), so on Windows there's
# nothing to reap here — skip the whole block.
if os.name != "nt":
try:
while True:
try:
_pid, _status = os.waitpid(-1, os.WNOHANG)
except ChildProcessError:
break
if _pid == 0:
break
_record_worker_exit(_pid, _status)
except Exception:
pass
# Reap zombie children from previously spawned workers. See
# reap_worker_zombies() for the full rationale.
reap_worker_zombies()

result = DispatchResult()
result.reclaimed = release_stale_claims(conn)
Expand Down
1 change: 1 addition & 0 deletions scripts/release.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"dafeng@DafengdeMacBook-Pro.local": "WorldWriter",
"schepers.zander1@gmail.com": "Strontvod",
"anadi.jaggia@gmail.com": "Jaggia",
"steveonjava@gmail.com": "steveonjava",
"32201324+simpolism@users.noreply.github.com": "simpolism",
"simpolism@gmail.com": "simpolism",
"jake@nousresearch.com": "simpolism",
Expand Down
154 changes: 154 additions & 0 deletions tests/hermes_cli/test_kanban_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -3339,3 +3339,157 @@ def test_maybe_emit_scratch_tip_skips_non_scratch_workspaces(kanban_home, caplog
).fetchall()
assert "tip_scratch_workspace" not in [e["kind"] for e in events]



# ---------------------------------------------------------------------------
# reap_worker_zombies() tests
# ---------------------------------------------------------------------------


def test_reap_worker_zombies_returns_count():
"""reap_worker_zombies() returns the list of reaped PIDs."""
from unittest.mock import patch

fake_pids = [12345, 67890, 11111]
call_count = [0]

def fake_waitpid(pid, flags):
if call_count[0] < len(fake_pids):
p = fake_pids[call_count[0]]
call_count[0] += 1
return p, 0
return 0, 0

with patch("hermes_cli.kanban_db.os.waitpid", side_effect=fake_waitpid):
with patch("hermes_cli.kanban_db._record_worker_exit"):
pids = kb.reap_worker_zombies()
assert pids == [12345, 67890, 11111]


def test_reap_worker_zombies_noop_on_windows(monkeypatch):
"""reap_worker_zombies() returns 0 and never calls os.waitpid on Windows."""
from unittest.mock import patch

monkeypatch.setattr("hermes_cli.kanban_db.os.name", "nt")
with patch("hermes_cli.kanban_db.os.waitpid") as mock_waitpid:
result = kb.reap_worker_zombies()
mock_waitpid.assert_not_called()
assert result == []


def test_reap_worker_zombies_noop_no_children():
"""reap_worker_zombies() returns 0 without error when there are no children."""
from unittest.mock import patch

with patch("hermes_cli.kanban_db.os.waitpid", side_effect=ChildProcessError):
result = kb.reap_worker_zombies()
assert result == []


def test_reap_worker_zombies_records_exit_status():
"""reap_worker_zombies() calls _record_worker_exit for each reaped pid."""
from unittest.mock import patch

calls = []
call_count = [0]

def fake_waitpid(pid, flags):
call_count[0] += 1
if call_count[0] == 1:
return 12345, 0
return 0, 0

with patch("hermes_cli.kanban_db.os.waitpid", side_effect=fake_waitpid):
with patch(
"hermes_cli.kanban_db._record_worker_exit",
side_effect=lambda p, s: calls.append((p, s)),
):
kb.reap_worker_zombies()

assert calls == [(12345, 0)]


def test_reap_worker_zombies_handles_waitpid_os_error():
"""reap_worker_zombies() does not propagate generic OSError from os.waitpid."""
from unittest.mock import patch

with patch("hermes_cli.kanban_db.os.waitpid", side_effect=OSError("test error")):
result = kb.reap_worker_zombies()
assert result == []


def test_zombie_reaper_runs_despite_board_connect_failure():
"""reap_worker_zombies runs even when a board tick raises an error."""
from unittest.mock import patch

call_count = [0]

def fake_waitpid(pid, flags):
call_count[0] += 1
if call_count[0] <= 2:
return [12345, 67890][call_count[0] - 1], 0
return 0, 0

with patch("hermes_cli.kanban_db.os.waitpid", side_effect=fake_waitpid):
with patch("hermes_cli.kanban_db._record_worker_exit"):
# Simulate a board tick failure before reaping
try:
raise sqlite3.OperationalError("disk I/O error")
except sqlite3.OperationalError:
pass

# Reaper still runs independently
pids = kb.reap_worker_zombies()

assert pids == [12345, 67890]


def test_zombie_reaper_survives_all_boards_failing():
"""reap_worker_zombies runs each tick regardless of board tick failures."""
from unittest.mock import patch

total_reaped = 0

def make_fake_waitpid(zombie_pids):
call_count = [0]

def fake_waitpid(pid, flags):
if call_count[0] < len(zombie_pids):
p = zombie_pids[call_count[0]]
call_count[0] += 1
return p, 0
return 0, 0

return fake_waitpid

# 5 ticks, 2 zombies per tick = 10 total
for tick in range(5):
pids = [tick * 100 + 1, tick * 100 + 2]
with patch(
"hermes_cli.kanban_db.os.waitpid", side_effect=make_fake_waitpid(pids)
):
with patch("hermes_cli.kanban_db._record_worker_exit"):
pids = kb.reap_worker_zombies()
total_reaped += len(pids)

assert total_reaped == 10


def test_dispatch_once_still_reaps_via_extracted_fn(kanban_home):
"""The reaper inside dispatch_once still works after refactor to reap_worker_zombies()."""
from unittest.mock import patch

call_count = [0]

def fake_waitpid(pid, flags):
call_count[0] += 1
if call_count[0] == 1:
return 99999, 0
return 0, 0

with patch("hermes_cli.kanban_db.os.waitpid", side_effect=fake_waitpid):
with patch("hermes_cli.kanban_db._record_worker_exit"):
with patch("hermes_cli.kanban_db.os.name", "posix"):
pids = kb.reap_worker_zombies()

assert pids == [99999]