feat(desktop): stream subagent activity into watch windows (#47060)

* feat(desktop): stream subagent replies into watch windows

A desktop watch window resumes a child session lazily (no full agent) and
mirrors the parent-relayed `subagent.*` events into native child-session
stream events. The child's streamed reply text was never relayed, so the
window sat blank while the subagent "talked".

- delegate_tool: forward the child's `run_conversation` stream tokens up the
  progress relay as `subagent.text` (inert under CLI/TUI — their progress
  handlers ignore non-tool event types; only a gateway watch window mirrors it).
- server: mirror `subagent.text` -> `message.delta` on the child sid only, and
  skip the parent emit (per-token frames are meaningless on the parent session,
  which shows the child via the spawn tree). Demote `subagent.start` to a
  one-time goal header and drop the noisy `subagent.progress` mirror — tools
  already mirror natively.
- server: guard `_start_agent_build` so a lazy watch session spectating an
  in-flight child stays lazy; incidental RPCs were upgrading it to a full
  agent mid-stream and silently killing the mirror.

* fix(desktop): keep watch-window chat clear of titlebar chrome

Secondary windows (new-session scratch, subagent watch, cmd-click pop-out)
hide the titlebar tool cluster + session header, so the transcript ran to the
window's top edge and streamed text slid up under the OS traffic lights.

- Gate the hidden chrome on `isSecondaryWindow()` everywhere (app-shell,
  chat header, thread list) instead of the narrower new-session flag.
- Add a fixed opaque drag-strip at the top of the secondary-window transcript:
  content padding alone scrolls away with the text, so the strip masks
  anything behind it and keeps the window draggable like the main header.

* fix: WSL subagent window

* fix: subagent window top padding

---------

Co-authored-by: Austin Pickett <pickett.austin@gmail.com>
Co-authored-by: Teknium <127238744+teknium1@users.noreply.github.com>
This commit is contained in:
brooklyn! 2026-06-16 13:30:11 -05:00 committed by GitHub
commit 44e5848e74
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 261 additions and 26 deletions

View file

@ -42,7 +42,7 @@ import {
$sessions,
sessionPinId
} from '@/store/session'
import { isNewSessionWindow, isSecondaryWindow } from '@/store/windows'
import { isSecondaryWindow } from '@/store/windows'
import type { ModelOptionsResponse } from '@/types/hermes'
import { routeSessionId } from '../routes'
@ -121,10 +121,10 @@ function ChatHeader({
? pinnedSessionIds.includes(selectedSessionId)
: false
// A brand-new session has no session to pin/delete/rename, so the header is
// just a dead "New session" label + chevron. Drop it (and its border)
// entirely until there's a real session to act on.
if (isNewSessionWindow() || (!selectedSessionId && !activeSessionId && !isRoutedSessionView)) {
// Secondary windows (new-session scratch, subagent watch, cmd-click pop-out)
// are compact side panels — they drop the session-actions header + border
// entirely. A brand-new draft has nothing to pin/delete/rename either.
if (isSecondaryWindow() || (!selectedSessionId && !activeSessionId && !isRoutedSessionView)) {
return null
}

View file

@ -16,7 +16,7 @@ import {
} from '@/store/layout'
import { $paneWidthOverride } from '@/store/panes'
import { $connection } from '@/store/session'
import { isNewSessionWindow, isSecondaryWindow } from '@/store/windows'
import { isSecondaryWindow } from '@/store/windows'
import { SIDEBAR_COLLAPSE_MEDIA_QUERY } from '../layout-constants'
@ -80,7 +80,10 @@ export function AppShell({
const connection = useStore($connection)
const viewportFullscreen = useSyncExternalStore(subscribeWindowSize, viewportIsFullscreen, () => false)
const isFullscreen = Boolean(connection?.isFullscreen) || viewportFullscreen
const hideTitlebarControls = isNewSessionWindow()
// Every secondary window (new-session scratch, subagent watch, cmd-click
// pop-out) is a compact side panel — none of them carry the full titlebar
// tool cluster. Gate on isSecondaryWindow, never the narrower new-session flag.
const hideTitlebarControls = isSecondaryWindow()
const titlebarControls = titlebarControlsPosition(connection?.windowButtonPosition, isFullscreen)
// Width Windows/Linux reserve for the OS-painted min/max/close overlay (zero
// on macOS, where window controls sit on the left and are reported via

View file

@ -22,7 +22,7 @@ import {
resetThreadScroll,
setThreadAtBottom
} from '@/store/thread-scroll'
import { isNewSessionWindow, isSecondaryWindow } from '@/store/windows'
import { isSecondaryWindow } from '@/store/windows'
import { MessageRenderBoundary } from './message-render-boundary'
@ -134,13 +134,20 @@ const ThreadMessageListInner: FC<ThreadMessageListProps> = ({
const hiddenCount = firstVisible
const visibleGroups = hiddenCount > 0 ? groups.slice(hiddenCount) : groups
const restoreFromBottomRef = useRef<number | null>(null)
const newSessionWindow = isNewSessionWindow()
const newSessionTitlebarGap = 'calc(var(--titlebar-height)+0.75rem)'
const threadContentTopPad = newSessionWindow
// Secondary windows (new-session scratch, subagent watch, cmd-click pop-out)
// hide the titlebar tool cluster + session header, but the OS traffic lights
// still sit in the top-left, so reserve the titlebar gap above the transcript.
const secondaryWindow = isSecondaryWindow()
// NB: CSS calc() requires whitespace around the +/- operator. This string is
// assigned verbatim to the --sticky-human-top inline style below (it does not
// go through Tailwind, which would auto-space it), so the spaces are load-
// bearing — without them the declaration is invalid, gets dropped, and the
// sticky user bubble falls back to its ~4px default and slides under the OS
// traffic lights.
const secondaryTitlebarGap = 'calc(var(--titlebar-height) + 0.75rem)'
const threadContentTopPad = secondaryWindow
? 'pt-[calc(var(--titlebar-height)+0.75rem)]'
: isSecondaryWindow()
? 'pt-6'
: 'pt-[calc(var(--titlebar-height)-0.5rem)]'
: 'pt-[calc(var(--titlebar-height)-0.5rem)]'
useEffect(() => setThreadAtBottom(isAtBottom), [isAtBottom])
useEffect(() => () => resetThreadScroll(), [])
@ -247,10 +254,21 @@ const ThreadMessageListInner: FC<ThreadMessageListProps> = ({
style={
{
height: clampToComposer ? 'var(--thread-viewport-height)' : '100%',
...(newSessionWindow ? { '--sticky-human-top': newSessionTitlebarGap } : {})
...(secondaryWindow ? { '--sticky-human-top': secondaryTitlebarGap } : {})
} as CSSProperties
}
>
{secondaryWindow && (
// Secondary windows hide the titlebar chrome, so the scroller runs to
// the window's top edge and streamed text slides up under the OS
// traffic lights. Content padding alone scrolls away with the text — a
// fixed opaque strip (the titlebar's drag region) masks anything behind
// it and keeps the window draggable, matching the main window's header.
<div
aria-hidden="true"
className="absolute inset-x-0 top-0 z-10 h-(--titlebar-height) bg-background [-webkit-app-region:drag]"
/>
)}
<div
className="size-full overflow-x-hidden overflow-y-auto overscroll-contain"
data-following={isAtBottom ? 'true' : 'false'}

View file

@ -499,7 +499,7 @@ class TestToolNamePreservation(unittest.TestCase):
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
def capture_and_return(user_message, task_id=None):
def capture_and_return(user_message, task_id=None, stream_callback=None):
captured["saved"] = list(mock_child._delegate_saved_tool_names)
return {"final_response": "ok", "completed": True, "api_calls": 1}
@ -2616,7 +2616,7 @@ class TestOrchestratorEndToEnd(unittest.TestCase):
m.thinking_callback = None
orch_mock["agent"] = m
def _orchestrator_run(user_message=None, task_id=None):
def _orchestrator_run(user_message=None, task_id=None, stream_callback=None):
# Re-entrant: orchestrator spawns two leaves
delegate_task(
tasks=[{"goal": "leaf-A"}, {"goal": "leaf-B"}],

View file

@ -73,7 +73,7 @@ class _StubChild:
"seconds_since_activity": 60,
}
def run_conversation(self, user_message, task_id=None):
def run_conversation(self, user_message, task_id=None, stream_callback=None):
self._hang.wait(self._hang_seconds)
return {"final_response": "", "completed": False, "api_calls": self._api_call_count}

View file

@ -509,6 +509,111 @@ def test_session_resume_lazy_reports_running_for_inflight_child(server, monkeypa
assert resp["result"]["status"] == "streaming"
def test_session_resume_lazy_tolerates_missing_row_for_active_child(server, monkeypatch):
"""Race regression: a watch window opens on a freshly-spawned subagent and
resumes BEFORE the child's first run_conversation() flushes its DB row.
The child relays ``subagent.start`` (carrying child_session_id, which opens
the window) before ``_ensure_db_session`` writes the row, so
``db.get_session(target)`` is momentarily empty. On slower hosts (WSL2) the
window's lazy resume consistently lands in this gap. It used to hard-fail
"session not found"; the frontend then 404'd on its REST messages fallback
and the watch window spun forever. Since the child is provably live
(``_child_run_active``), the lazy resume must instead register the live
session with empty history so the mirror can stream the turn.
"""
target = "20260616_131212_racey"
class _DB:
def get_session(self, _sid):
# Row not flushed yet — the whole point of the race.
return None
def get_session_by_title(self, _title):
return None
def reopen_session(self, _sid):
return None
def get_messages_as_conversation(self, _sid, include_ancestors=False):
# No rows for an unwritten session.
return []
monkeypatch.setattr(server, "_get_db", lambda: _DB())
monkeypatch.setattr(
server, "_make_agent", lambda *a, **k: (_ for _ in ()).throw(AssertionError("no build"))
)
# Child is live in the relay registry even though its row isn't written.
server._active_child_runs[target] = time.time()
try:
resp = server.handle_request(
{
"id": "r1",
"method": "session.resume",
"params": {"session_id": target, "cols": 100, "lazy": True},
}
)
finally:
server._active_child_runs.pop(target, None)
# The resume must succeed (no "session not found") and register a live,
# agent-less watch session the mirror can find by stored key.
assert "error" not in resp
result = resp["result"]
assert result["resumed"] == target
assert result["session_key"] == target
assert result["info"]["lazy"] is True
assert result["messages"] == []
# Live for the mirror; reported running so the window shows a busy state.
assert result["running"] is True
assert result["status"] == "streaming"
sid = result["session_id"]
assert server._find_live_session_by_key(target) == (sid, server._sessions[sid])
assert server._sessions[sid]["agent"] is None
def test_session_resume_missing_row_non_lazy_still_errors(server, monkeypatch):
"""The missing-row tolerance is scoped to lazy resumes of an ACTIVE child.
A normal (non-lazy) resume of a genuinely unknown id must still fail fast
with "session not found" rather than silently registering an empty session.
"""
target = "20260616_000000_ghost"
class _DB:
def get_session(self, _sid):
return None
def get_session_by_title(self, _title):
return None
monkeypatch.setattr(server, "_get_db", lambda: _DB())
# Non-lazy resume, no active child → hard error.
resp = server.handle_request(
{
"id": "r1",
"method": "session.resume",
"params": {"session_id": target, "cols": 100},
}
)
assert "error" in resp
assert "session not found" in resp["error"]["message"].lower()
# Lazy resume but the child is NOT live → still an error (no live mirror to
# justify an empty session; this would just be a dead, sessionless window).
resp2 = server.handle_request(
{
"id": "r2",
"method": "session.resume",
"params": {"session_id": target, "cols": 100, "lazy": True},
}
)
assert "error" in resp2
assert "session not found" in resp2["error"]["message"].lower()
def test_session_resume_reuses_existing_live_session(server, monkeypatch):
"""Repeated resume must not allocate duplicate live agents."""

View file

@ -201,9 +201,13 @@ def test_active_child_runs_registry_tracks_liveness(server, emits):
assert "child-1" not in server._active_child_runs
def test_start_and_progress_mirror_as_immediate_text_activity(server, emits):
def test_start_mirrors_as_immediate_header_line(server, emits):
server._sessions["live-1"] = {"session_key": "child-1", "agent": None}
# subagent.start emits a one-time header (the goal) so a freshly opened
# window shows context immediately. subagent.progress (batched tool-name
# rollups) no longer pollutes the message body — tools mirror natively via
# tool.start and the reply streams via subagent.text.
_relay(server, "subagent.start", preview="starting child branch", child_session_id="child-1")
_relay(server, "subagent.progress", preview="step 1/3", child_session_id="child-1")
@ -211,5 +215,57 @@ def test_start_and_progress_mirror_as_immediate_text_activity(server, emits):
assert child == [
("message.start", None),
("message.delta", {"text": "starting child branch\n"}),
("message.delta", {"text": "step 1/3\n"}),
]
def test_text_mirrors_as_message_delta(server, emits):
"""The child's streamed reply (subagent.text) becomes a native
message.delta on the live child sid the watch window streams it as the
agent 'talking', the piece that was previously missing entirely."""
server._sessions["live-1"] = {"session_key": "child-1", "agent": None}
_relay(server, "subagent.text", preview="Here is ", child_session_id="child-1")
_relay(server, "subagent.text", preview="the answer.", child_session_id="child-1")
child = [(e, p) for e, s, p in emits if s == "live-1"]
assert child == [
("message.start", None),
("message.delta", {"text": "Here is "}),
("message.delta", {"text": "the answer."}),
]
def test_text_routes_to_watch_transport_without_contextvar(server, monkeypatch):
"""Async/background path: the child runs on a detached daemon thread that
carries NO contextvar transport binding. Routing must still reach the
watch window because write_json keys event frames off the session's STORED
transport, not the current context. Exercises the real _emit/write_json."""
monkeypatch.setattr(server, "_tool_progress_enabled", lambda sid: True)
frames: list = []
class RecTransport:
def write(self, obj):
frames.append(obj)
return True
watch_t = RecTransport()
# A lazy watch resume stored its transport on the live child session.
server._sessions["live-1"] = {
"session_key": "child-1",
"agent": None,
"transport": watch_t,
}
# Relay with NO transport bound on the current context (the daemon worker
# thread never inherits the parent's contextvar) — mirrors the async case.
assert server.current_transport() is None
_relay(server, "subagent.text", preview="streamed reply", child_session_id="child-1")
routed = [
(f["params"]["type"], f["params"]["session_id"], f["params"].get("payload"))
for f in frames
if f.get("method") == "event" and f["params"]["session_id"] == "live-1"
]
assert ("message.start", "live-1", None) in routed
assert ("message.delta", "live-1", {"text": "streamed reply"}) in routed

View file

@ -867,6 +867,15 @@ def _build_child_progress_callback(
_relay("subagent.complete", preview=preview, **kwargs)
return
if event_type == "subagent.text":
# Streamed assistant reply text from the child. Relay verbatim so a
# gateway watch window can mirror the child "talking" as it streams.
# No spinner echo — the CLI shows the child via the tree, and the
# CLI/TUI progress handlers ignore non-tool event types, so this is
# inert there; only a gateway watch window consumes it.
_relay("subagent.text", preview=preview)
return
# Normalise legacy strings, new-style "delegate.*" strings, and
# DelegateEvent enum values all to a single DelegateEvent. The
# original implementation only accepted the five legacy strings;
@ -1626,11 +1635,23 @@ def _run_single_child(
# Python stack (see #14726 — 0-API-call hangs are opaque without it).
_worker_thread_holder: Dict[str, Optional[threading.Thread]] = {"t": None}
def _relay_child_text(delta: str) -> None:
# Forward the child's streamed reply text up the progress relay so
# gateway watch windows mirror it live (subagent.text → message.delta).
# Inert under CLI/TUI: their progress handlers ignore non-tool events.
if not delta or not child_progress_cb:
return
try:
child_progress_cb("subagent.text", preview=delta)
except Exception as e:
logger.debug("Child text relay failed: %s", e)
def _run_with_thread_capture():
_worker_thread_holder["t"] = threading.current_thread()
return child.run_conversation(
user_message=goal,
task_id=child_task_id,
stream_callback=_relay_child_text,
)
_child_future = _timeout_executor.submit(_run_with_thread_capture)

View file

@ -904,6 +904,14 @@ def _start_agent_build(sid: str, session: dict) -> None:
ready = session.get("agent_ready")
if ready is None:
return
# A lazy watch session spectating an in-flight child must stay lazy so the
# subagent live-mirror keeps flowing. Incidental RPCs (session.info, model
# metadata, etc.) resolve through _sess(), which would otherwise upgrade it
# to a full agent mid-stream and silently kill the mirror (the mirror bails
# once agent is set). Once the child completes, the guard lifts and the next
# prompt/RPC builds the agent normally so the user can talk to the session.
if session.get("lazy") and _child_run_active(str(session.get("session_key") or "")):
return
lock = session.setdefault("agent_build_lock", threading.Lock())
with lock:
if ready.is_set() or session.get("agent_build_started"):
@ -2867,7 +2875,14 @@ def _on_tool_progress(
if preview and event_type == "subagent.tool":
payload["tool_preview"] = str(preview)
payload["text"] = str(preview)
_emit(event_type, sid, payload)
# subagent.text is the child's per-token reply, relayed solely to feed a
# watch window's live mirror. It is meaningless on the parent session
# (which shows the child via the spawn tree, not its reply body), so
# skip the parent emit — sending hundreds of ignored token frames there
# is wasted traffic and a trap for any future parent-side subagent
# catch-all. The mirror keys off the child sid and is unaffected.
if event_type != "subagent.text":
_emit(event_type, sid, payload)
_mirror_subagent_to_child(event_type, payload)
@ -2927,11 +2942,15 @@ def _mirror_subagent_to_child(event_type: str, payload: dict) -> None:
if event_type == "subagent.thinking":
if text := str(payload.get("text") or ""):
_emit("reasoning.delta", csid, {"text": text})
elif event_type in {"subagent.start", "subagent.progress"}:
# Mirror branch-level progress lines so a just-opened child window
# shows immediate activity instead of waiting for the next tool or
# completion event. This matches the TUI /agents "live branch log"
# feel that users expect.
elif event_type == "subagent.text":
# The child's streamed reply text — the actual "agent talking".
# Relayed token-by-token from the child's run_conversation
# stream_callback, so the watch window streams the reply live.
if text := str(payload.get("text") or ""):
_emit("message.delta", csid, {"text": text})
elif event_type == "subagent.start":
# One-time header line (the child's goal) so a freshly opened window
# shows immediate context before the first reply token streams.
if text := str(payload.get("text") or ""):
_emit("message.delta", csid, {"text": f"{text}\n"})
elif event_type == "subagent.tool":
@ -4226,6 +4245,19 @@ def _(rid, params: dict) -> dict:
found = db.get_session_by_title(target)
if found:
target = found["id"]
elif is_truthy_value(params.get("lazy", False)) and _child_run_active(target):
# Race: a watch window opened on a freshly-spawned subagent. The
# child relays `subagent.start` (which carries child_session_id and
# triggers the window) BEFORE its first run_conversation() flushes
# the DB row via _ensure_db_session, so db.get_session(target) is
# momentarily empty. On slower hosts (notably WSL2, where SQLite +
# process scheduling widen the gap) the window's resume consistently
# lands inside this window and used to hard-fail "session not found"
# — the frontend then 404'd on the REST messages fallback and the
# window spun forever. The child is provably live (_child_run_active),
# so proceed into the lazy branch with empty history; the live mirror
# streams the whole turn anyway and the row exists by upgrade time.
found = {}
else:
return _err(rid, 4007, "session not found")
profile_resume_cwd = str(found.get("cwd") or "").strip() or _profile_configured_cwd(