Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/ida_pro_mcp/idalib_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ def idb_open(
init_hexrays: Annotated[bool, "Initialize Hex-Rays decompiler after open"] = True,
idle_ttl_sec: Annotated[
int,
"Minimum idle TTL in seconds before the headless worker self-exits.",
"Minimum idle TTL in seconds before the headless worker self-exits. "
"Use 0 (or any value <= 0) to keep the worker resident forever.",
] = 600,
preferred_session_id: Annotated[
str,
Expand Down
93 changes: 83 additions & 10 deletions src/ida_pro_mcp/idalib_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,26 @@
"idb_open",
"idb_list",
}
WORKER_TCP_HEALTH_TIMEOUT_SEC = 0.5
WORKER_RPC_HEALTH_TIMEOUT_SEC = 2.0
# Health-probe timeouts. A busy worker (mid auto-analysis / decompile) runs
# single-threaded and cannot answer a JSON-RPC ping until it yields, so a tight
# RPC timeout misclassifies "busy" as "dead" and gets the worker reaped. These
# default generously and are env-overridable for slow hosts / huge databases.
WORKER_TCP_HEALTH_TIMEOUT_SEC = float(os.environ.get("IDA_MCP_HEALTH_TCP_TIMEOUT", "2.0"))
WORKER_RPC_HEALTH_TIMEOUT_SEC = float(os.environ.get("IDA_MCP_HEALTH_RPC_TIMEOUT", "10.0"))

# When a health probe fails but the worker process is still alive, retry instead
# of reaping it immediately: a busy worker is not a dead worker. Only a worker
# whose OS process has actually exited is treated as unreachable.
WORKER_HEALTH_RETRIES = int(os.environ.get("IDA_MCP_HEALTH_RETRIES", "3"))
WORKER_HEALTH_RETRY_BACKOFF_SEC = float(os.environ.get("IDA_MCP_HEALTH_RETRY_BACKOFF", "1.0"))

# Grace window (seconds) before a live-but-unresponsive worker is declared
# wedged and reaped. A worker that keeps failing health probes for longer than
# this — while its process is still alive — is treated as permanently stuck
# (e.g. an analysis loop) rather than transiently busy, so it does not pin a
# worker slot forever. Set IDA_MCP_WEDGED_GRACE_SEC=0 to never reap a live
# worker (pure keep-forever behavior for long-lived single-user setups).
WORKER_WEDGED_GRACE_SEC = float(os.environ.get("IDA_MCP_WEDGED_GRACE_SEC", "300"))

# Upper bound (seconds) on how long a worker may take to open and auto-analyze a
# binary before it is reaped and an error is returned. A malformed or hostile
Expand Down Expand Up @@ -153,6 +171,11 @@ class WorkerSession:
owned: bool = True
pid: int | None = None
last_warmup: dict[str, Any] | None = None
# time.monotonic() of the first consecutive failed health probe while the
# process was still alive, or None when last seen healthy. Used to tell a
# transiently-busy worker (keep) from a permanently wedged one (reap after
# a grace window). See resolve_session().
unhealthy_since: float | None = None

def to_dict(self) -> IdalibSessionInfo:
return {
Expand Down Expand Up @@ -341,10 +364,13 @@ def _take_schema_worker_for_session(self) -> WorkerSession | None:
return None

def _prune_dead_worker_sessions_locked(self) -> None:
# Prune only workers whose OS process has actually exited. A live but
# busy worker fails the reachability probe yet must NOT be torn down —
# see resolve_session() for the same busy-vs-dead distinction.
stale_session_ids = [
session.session_id
for session in self.sessions.values()
if session.backend == "worker" and session.owned and not self._session_is_reachable(session)
if session.backend == "worker" and session.owned and not session.is_alive()
]
for session_id in stale_session_ids:
stale = self._unregister_session_locked(session_id)
Expand Down Expand Up @@ -887,16 +913,62 @@ def resolve_session(self, database: str) -> WorkerSession:
session = self.peek_session(database)
if self._session_is_reachable(session):
session.last_accessed = datetime.now()
session.unhealthy_since = None # recovered — clear wedged tracking
return session
if session.backend == "gui":
return self._reopen_gui_session_headless(session)

# A failed probe does not mean the worker is dead — a worker that is busy
# running auto-analysis or a long decompile is single-threaded and cannot
# answer a ping until it yields. Reaping it here destroys live sessions.
# Only treat the worker as gone once its OS process has actually exited;
# while the process is alive, retry the probe with backoff.
for attempt in range(max(0, WORKER_HEALTH_RETRIES)):
if not session.is_alive():
break # process really exited — stop retrying, reap below
if WORKER_HEALTH_RETRY_BACKOFF_SEC > 0:
time.sleep(WORKER_HEALTH_RETRY_BACKOFF_SEC)
if self._session_is_reachable(session):
session.last_accessed = datetime.now()
session.unhealthy_since = None # recovered — clear tracking
return session

session_id = session.session_id
with self._lock:
current = self.sessions.get(session_id)
if current is session:
self._unregister_session_locked(session_id)
self._terminate_worker(session)
raise RuntimeError(f"Worker for session '{session_id}' is not reachable")

# Process actually exited: genuinely dead, reap it.
if not session.is_alive():
with self._lock:
current = self.sessions.get(session_id)
if current is session:
self._unregister_session_locked(session_id)
self._terminate_worker(session)
raise RuntimeError(f"Worker for session '{session_id}' is not reachable")

# Process is alive but unresponsive. Distinguish transiently busy (keep,
# retryable) from permanently wedged (reap after a grace window so it
# does not pin a worker slot forever). A non-positive grace window means
# "never reap a live worker".
now = time.monotonic()
if session.unhealthy_since is None:
session.unhealthy_since = now
unhealthy_for = now - session.unhealthy_since

if WORKER_WEDGED_GRACE_SEC > 0 and unhealthy_for >= WORKER_WEDGED_GRACE_SEC:
with self._lock:
current = self.sessions.get(session_id)
if current is session:
self._unregister_session_locked(session_id)
self._terminate_worker(session)
raise RuntimeError(
f"Worker for session '{session_id}' was unresponsive for "
f"{unhealthy_for:.0f}s (>= {WORKER_WEDGED_GRACE_SEC:.0f}s grace) "
f"and was reaped as wedged. Reopen the database."
)

raise RuntimeError(
f"Worker for session '{session_id}' is alive but not responding "
f"(busy or wedged for {unhealthy_for:.0f}s); session kept. Retry shortly."
)

def peek_session(self, database: str) -> WorkerSession:
if not database:
Expand Down Expand Up @@ -1049,7 +1121,8 @@ def idb_open(
init_hexrays: Annotated[bool, "Initialize Hex-Rays decompiler after open"] = True,
idle_ttl_sec: Annotated[
int,
"Minimum idle TTL in seconds before the headless worker self-exits.",
"Minimum idle TTL in seconds before the headless worker self-exits. "
"Use 0 (or any value <= 0) to keep the worker resident forever.",
] = 600,
preferred_session_id: Annotated[
str, "Preferred session ID (auto-generated if empty). Ignored if the file is already open in a GUI or worker session."
Expand Down
10 changes: 10 additions & 0 deletions src/ida_pro_mcp/worker_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ def touch(self) -> None:

def set_idle_ttl(self, user_ttl_sec: float, load_time_sec: float = 0.0) -> None:
with self._lock:
# A non-positive TTL means "never self-exit" — for long-lived workers
# the user keeps attached to across many sessions. Preserved verbatim
# (no MIN clamp) so check_shutdown_reason() can detect the sentinel.
if user_ttl_sec <= 0:
self.idle_ttl_sec = 0.0
return
self.idle_ttl_sec = (
max(self.MIN_IDLE_TTL_SEC, user_ttl_sec) + max(0.0, load_time_sec)
)
Expand All @@ -80,6 +86,10 @@ def check_shutdown_reason(self) -> str | None:
with self._lock:
last_req = self._last_request_at
ttl = self.idle_ttl_sec
# ttl <= 0 is the "never self-exit" sentinel: the worker stays resident
# until explicitly stopped, regardless of idle time.
if ttl <= 0:
return None
now = time.monotonic()
if (now - last_req) > ttl:
return f"no requests for {now - last_req:.1f}s"
Expand Down
160 changes: 148 additions & 12 deletions tests/test_idalib_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,29 +644,165 @@ def test_open_session_ignores_dead_workers_for_max_worker_limit(tmp_path):
restore()


def test_resolve_session_removes_unreachable_worker(tmp_path):
def test_resolve_session_removes_dead_worker(tmp_path):
# A worker whose OS process has actually exited is reaped and reported
# unreachable. (Busy-but-alive workers are handled separately below.)
sample = tmp_path / "sample.bin"
sample.write_bytes(b"x")
sup = _FakeSupervisor()
session = sup.open_session(str(sample), session_id="sample")
unreachable = {session.session_id}
# Simulate the worker process having exited.
session.process = _DeadProcess()

def fake_reachable(candidate):
if candidate.session_id in unreachable:
return False
return candidate.is_alive()
try:
sup.resolve_session("sample")
except RuntimeError as e:
assert "not reachable" in str(e)
else:
raise AssertionError("expected RuntimeError")

assert "sample" not in sup.sessions

sup._session_is_reachable = fake_reachable

def test_resolve_session_keeps_busy_but_alive_worker(tmp_path, monkeypatch):
# A worker that fails the health probe while its process is still alive is
# busy/wedged, NOT dead: it must be kept registered and a retryable error
# surfaced, never reaped. This is the core "session never lost" guarantee.
monkeypatch.setattr(supmod, "WORKER_HEALTH_RETRY_BACKOFF_SEC", 0.0)
sample = tmp_path / "sample.bin"
sample.write_bytes(b"x")
sup = _FakeSupervisor()
session = sup.open_session(str(sample), session_id="sample")
# Process stays alive (_FakeProcess.poll() -> None) but probe says unreachable.
sup._session_is_reachable = lambda candidate: False

try:
sup.resolve_session("sample")
except RuntimeError as e:
assert "not reachable" in str(e)
assert "not responding" in str(e)
assert "kept" in str(e)
else:
raise AssertionError("expected RuntimeError")

# Session must still be registered and the process must NOT have been killed.
assert "sample" in sup.sessions
assert session.process.returncode is None


def test_resolve_session_retries_then_recovers_busy_worker(tmp_path, monkeypatch):
# If a transiently-busy worker becomes reachable on a retry, resolve_session
# returns it instead of erroring — no spurious session loss on a slow probe.
monkeypatch.setattr(supmod, "WORKER_HEALTH_RETRY_BACKOFF_SEC", 0.0)
monkeypatch.setattr(supmod, "WORKER_HEALTH_RETRIES", 3)
sample = tmp_path / "sample.bin"
sample.write_bytes(b"x")
sup = _FakeSupervisor()
session = sup.open_session(str(sample), session_id="sample")

calls = {"n": 0}

def flaky_reachable(candidate):
calls["n"] += 1
return calls["n"] >= 2 # first probe fails, second succeeds

sup._session_is_reachable = flaky_reachable

resolved = sup.resolve_session("sample")
assert resolved.session_id == "sample"
assert "sample" in sup.sessions


def test_resolve_session_reaps_wedged_worker_after_grace(tmp_path, monkeypatch):
# A live worker that stays unresponsive past the wedged-grace window is
# declared permanently stuck and reaped, so it cannot pin a worker slot
# forever. (Within the window it would be kept; see the busy test above.)
monkeypatch.setattr(supmod, "WORKER_HEALTH_RETRY_BACKOFF_SEC", 0.0)
monkeypatch.setattr(supmod, "WORKER_WEDGED_GRACE_SEC", 300.0)
sample = tmp_path / "sample.bin"
sample.write_bytes(b"x")
sup = _FakeSupervisor()
session = sup.open_session(str(sample), session_id="sample")
sup._session_is_reachable = lambda candidate: False # never responds

clock = {"t": 1000.0}
monkeypatch.setattr(supmod.time, "monotonic", lambda: clock["t"])

# First call: marks unhealthy_since, still within grace -> kept.
try:
sup.resolve_session("sample")
except RuntimeError as e:
assert "kept" in str(e)
else:
raise AssertionError("expected RuntimeError (kept)")
assert "sample" in sup.sessions
assert session.unhealthy_since == 1000.0

# Advance past the grace window: now reaped as wedged.
clock["t"] = 1000.0 + 301.0
try:
sup.resolve_session("sample")
except RuntimeError as e:
assert "wedged" in str(e)
else:
raise AssertionError("expected RuntimeError (wedged)")
assert "sample" not in sup.sessions
assert session.process.returncode == 0
assert session.process.returncode == 0 # terminated


def test_resolve_session_grace_zero_never_reaps_live_worker(tmp_path, monkeypatch):
# IDA_MCP_WEDGED_GRACE_SEC<=0 means a live worker is never reaped, no matter
# how long it stays unresponsive — pure keep-forever for long-lived setups.
monkeypatch.setattr(supmod, "WORKER_HEALTH_RETRY_BACKOFF_SEC", 0.0)
monkeypatch.setattr(supmod, "WORKER_WEDGED_GRACE_SEC", 0.0)
sample = tmp_path / "sample.bin"
sample.write_bytes(b"x")
sup = _FakeSupervisor()
session = sup.open_session(str(sample), session_id="sample")
sup._session_is_reachable = lambda candidate: False

clock = {"t": 1000.0}
monkeypatch.setattr(supmod.time, "monotonic", lambda: clock["t"])
for advance in (0.0, 10_000.0):
clock["t"] = 1000.0 + advance
try:
sup.resolve_session("sample")
except RuntimeError as e:
assert "kept" in str(e)
else:
raise AssertionError("expected RuntimeError (kept)")
assert "sample" in sup.sessions
assert session.process.returncode is None # never terminated


def test_resolve_session_recovery_clears_wedged_tracking(tmp_path, monkeypatch):
# Once a worker answers a probe again, its unhealthy_since stamp resets so a
# later blip starts a fresh grace window rather than counting stale time.
monkeypatch.setattr(supmod, "WORKER_HEALTH_RETRY_BACKOFF_SEC", 0.0)
monkeypatch.setattr(supmod, "WORKER_WEDGED_GRACE_SEC", 300.0)
sample = tmp_path / "sample.bin"
sample.write_bytes(b"x")
sup = _FakeSupervisor()
session = sup.open_session(str(sample), session_id="sample")

clock = {"t": 1000.0}
monkeypatch.setattr(supmod.time, "monotonic", lambda: clock["t"])

state = {"ok": False}
sup._session_is_reachable = lambda candidate: state["ok"]

# Unresponsive once -> stamped.
try:
sup.resolve_session("sample")
except RuntimeError:
pass
assert session.unhealthy_since == 1000.0

# Now healthy -> stamp cleared.
state["ok"] = True
clock["t"] = 1100.0
resolved = sup.resolve_session("sample")
assert resolved.session_id == "sample"
assert session.unhealthy_since is None


def test_open_session_prunes_unreachable_existing_mapping(tmp_path):
Expand All @@ -675,17 +811,17 @@ def test_open_session_prunes_unreachable_existing_mapping(tmp_path):
restore = _patch_discovery(instances=[], probe=False)
try:
sup = _FakeSupervisor()
# A genuinely dead worker process (poll() -> non-None) for the same path.
# Pruning keys off real process death, not just a failed probe.
stale = supmod.WorkerSession(
session_id="stale",
input_path=str(sample.resolve()),
filename="sample.bin",
process=_FakeProcess(),
process=_DeadProcess(),
)
with sup._lock:
sup._register_session_locked(stale, str(sample.resolve()))

sup._session_is_reachable = lambda session: session.session_id != "stale" and session.is_alive()

session = sup.open_session(str(sample), session_id="new")

assert session.session_id == "new"
Expand Down
Loading