Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: address PR #49 review feedback — sink swap, null-safety, iterati…
…ve readLine

- TaskHandle: AtomicReference<OutputSink> enables runtime sink swap for /bg and /fg
- TaskStreamReader: read sink from handle.outputSink() instead of holding direct ref
- TerminalRepl /bg: atomically swap sink to BackgroundOutputBuffer
- TerminalRepl /fg: swap to ForegroundOutputSink + replay buffered events
- ForegroundOutputSink: fix NPE with error.path() instead of error.get()
- ForegroundOutputSink: synchronize stopSpinner() for signal handler safety
- ForegroundOutputSink: fix backtick fence tracking across chunk boundaries
- DaemonConnection: convert recursive readLine() to iterative loop
- BackgroundOutputBuffer: null-guard in replay()
- TaskHandle/TaskManager/DaemonClient: null-guards per coding conventions
- PermissionBridge: log warning when no matching future found
- TaskStreamReader: validate non-empty requestId on permission requests
- notifyCompletedBackgroundTasks: actually print notifications instead of dead code
  • Loading branch information
Xinhua Gu committed Feb 21, 2026
commit 4b48d8ca8b4b1daa562b9304cc1a2c5b16b773a6
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
import java.util.Objects;

/**
* Buffers streaming events for background tasks.
Expand Down Expand Up @@ -58,6 +59,7 @@ public void onConnectionClosed() {
* @param sink the foreground sink to replay events to
*/
public void replay(ForegroundOutputSink sink) {
Objects.requireNonNull(sink, "sink");
List<OutputEvent> snapshot;
synchronized (events) {
snapshot = new ArrayList<>(events);
Expand Down
3 changes: 2 additions & 1 deletion aceclaw-cli/src/main/java/dev/aceclaw/cli/DaemonClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.io.IOException;
import java.nio.file.Path;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand Down Expand Up @@ -46,7 +47,7 @@ public DaemonClient() {
* @param socketPath path to the daemon Unix domain socket
*/
public DaemonClient(Path socketPath) {
this.socketPath = socketPath;
this.socketPath = Objects.requireNonNull(socketPath, "socketPath");
this.objectMapper = new ObjectMapper();
}

Expand Down
63 changes: 33 additions & 30 deletions aceclaw-cli/src/main/java/dev/aceclaw/cli/DaemonConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,19 @@ public long nextRequestId() {
* @throws IOException if I/O fails
*/
public String readLine() throws IOException {
int newlineIdx = lineBuffer.indexOf("\n");
if (newlineIdx != -1) {
var line = lineBuffer.substring(0, newlineIdx).trim();
lineBuffer.delete(0, newlineIdx + 1);
return line.isEmpty() ? readLine() : line;
}

var buffer = ByteBuffer.allocate(BUFFER_SIZE);
while (true) {
// Check if we already have a complete line in the buffer
int newlineIdx = lineBuffer.indexOf("\n");
if (newlineIdx != -1) {
var line = lineBuffer.substring(0, newlineIdx).trim();
lineBuffer.delete(0, newlineIdx + 1);
if (!line.isEmpty()) {
return line;
}
continue; // skip empty lines without recursion
}

buffer.clear();
int bytesRead = channel.read(buffer);
if (bytesRead == -1) {
Expand All @@ -181,15 +185,6 @@ public String readLine() throws IOException {
buffer.flip();
String chunk = StandardCharsets.UTF_8.decode(buffer).toString();
lineBuffer.append(chunk);

newlineIdx = lineBuffer.indexOf("\n");
if (newlineIdx != -1) {
var line = lineBuffer.substring(0, newlineIdx).trim();
lineBuffer.delete(0, newlineIdx + 1);
if (!line.isEmpty()) {
return line;
}
}
}
}

Expand All @@ -203,13 +198,18 @@ public String readLine() throws IOException {
public String readLine(long timeoutMs) throws IOException {
long deadline = System.currentTimeMillis() + timeoutMs;

int newlineIdx = lineBuffer.indexOf("\n");
if (newlineIdx != -1) {
var line = lineBuffer.substring(0, newlineIdx).trim();
lineBuffer.delete(0, newlineIdx + 1);
return line.isEmpty()
? readLine(Math.max(0, deadline - System.currentTimeMillis()))
: line;
// Check buffer first — may already contain a complete line
while (true) {
int newlineIdx = lineBuffer.indexOf("\n");
if (newlineIdx != -1) {
var line = lineBuffer.substring(0, newlineIdx).trim();
lineBuffer.delete(0, newlineIdx + 1);
if (!line.isEmpty()) {
return line;
}
continue; // skip empty lines without recursion
}
break;
}

channel.configureBlocking(false);
Expand All @@ -230,13 +230,16 @@ public String readLine(long timeoutMs) throws IOException {
}
buffer.flip();
lineBuffer.append(StandardCharsets.UTF_8.decode(buffer));
newlineIdx = lineBuffer.indexOf("\n");
if (newlineIdx != -1) {
var line = lineBuffer.substring(0, newlineIdx).trim();
lineBuffer.delete(0, newlineIdx + 1);
return line.isEmpty()
? readLine(Math.max(0, deadline - System.currentTimeMillis()))
: line;

// Drain any complete lines (skip empty ones)
while (true) {
int idx = lineBuffer.indexOf("\n");
if (idx == -1) break;
var line = lineBuffer.substring(0, idx).trim();
lineBuffer.delete(0, idx + 1);
if (!line.isEmpty()) {
return line;
}
}
}
return null;
Expand Down
32 changes: 20 additions & 12 deletions aceclaw-cli/src/main/java/dev/aceclaw/cli/ForegroundOutputSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public final class ForegroundOutputSink implements OutputSink {
private boolean receivedTextOutput = false;
private boolean wasThinking = false;
private boolean inCodeFence = false;
private int backtickRun = 0; // tracks consecutive backticks across chunk boundaries
private volatile TerminalSpinner spinner;
Comment thread
coderabbitai[bot] marked this conversation as resolved.

public ForegroundOutputSink(PrintWriter out, TerminalMarkdownRenderer markdownRenderer) {
Expand Down Expand Up @@ -69,13 +70,16 @@ public void onTextDelta(String delta) {
textBuffer.append(delta);
receivedTextOutput = true;

// Track code fence state
// Track code fence state across chunk boundaries
for (int i = 0; i < delta.length(); i++) {
if (i + 2 < delta.length()
&& delta.charAt(i) == '`'
&& delta.charAt(i + 1) == '`'
&& delta.charAt(i + 2) == '`') {
inCodeFence = !inCodeFence;
if (delta.charAt(i) == '`') {
backtickRun++;
if (backtickRun == 3) {
inCodeFence = !inCodeFence;
backtickRun = 0;
}
} else {
backtickRun = 0;
}
}

Expand Down Expand Up @@ -144,8 +148,8 @@ public void onTurnComplete(JsonNode message, boolean hasError) {
if (hasError) {
JsonNode error = message.get("error");
if (error != null) {
int code = error.get("code").asInt();
String errorMessage = error.get("message").asText();
int code = error.path("code").asInt(-1);
String errorMessage = error.path("message").asText("Unknown error");
if (code == -32601) {
out.println(ERROR + "[Agent not available. Is the daemon configured correctly?]" + RESET);
} else {
Expand All @@ -170,6 +174,7 @@ public void onTurnComplete(JsonNode message, boolean hasError) {
receivedTextOutput = false;
wasThinking = false;
inCodeFence = false;
backtickRun = 0;
}
}

Expand All @@ -193,12 +198,15 @@ public void onCompaction(JsonNode params) {

/**
* Stops the active spinner if one is running.
* Synchronized because this can be called from the signal handler thread (Ctrl+C).
*/
public void stopSpinner() {
var s = spinner;
if (s != null && s.isSpinning()) {
s.clear();
spinner = null;
synchronized (lock) {
var s = spinner;
if (s != null && s.isSpinning()) {
s.clear();
spinner = null;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package dev.aceclaw.cli;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
Expand All @@ -19,6 +22,8 @@
*/
public final class PermissionBridge {

private static final Logger log = LoggerFactory.getLogger(PermissionBridge.class);

private final BlockingQueue<PermissionRequest> pending = new LinkedBlockingQueue<>();
private final ConcurrentHashMap<String, CompletableFuture<PermissionAnswer>> futures =
new ConcurrentHashMap<>();
Expand Down Expand Up @@ -85,6 +90,8 @@ public void submitAnswer(String requestId, PermissionAnswer answer) {
var future = futures.get(requestId);
if (future != null) {
future.complete(answer);
} else {
log.warn("No pending permission future for requestId={}, answer dropped", requestId);
}
}

Expand Down
32 changes: 29 additions & 3 deletions aceclaw-cli/src/main/java/dev/aceclaw/cli/TaskHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@
import com.fasterxml.jackson.databind.JsonNode;

import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Comment thread
coderabbitai[bot] marked this conversation as resolved.
/**
* Represents a running or completed agent task.
*
* <p>Each task maps to one {@code agent.prompt} request sent on its own
* {@link DaemonConnection}. The streaming loop runs on a virtual thread,
* and the task state is updated atomically as the task progresses.
*
* <p>The output sink is held in an {@link AtomicReference} so that
* it can be swapped at runtime (e.g. foreground → background buffer).
*/
public final class TaskHandle {

Expand All @@ -23,6 +28,9 @@ public final class TaskHandle {
/** Dedicated connection for this task's streaming I/O. */
private final DaemonConnection connection;

/** Swappable output sink — enables /bg and /fg transitions. */
private final AtomicReference<OutputSink> sinkRef;

/** Virtual thread running the stream reader loop. */
private volatile Thread streamThread;

Expand All @@ -38,12 +46,14 @@ public final class TaskHandle {
/** Final JSON-RPC result (null while running). */
private volatile JsonNode result;

public TaskHandle(String taskId, String promptSummary, DaemonConnection connection) {
this.taskId = taskId;
public TaskHandle(String taskId, String promptSummary, DaemonConnection connection,
OutputSink initialSink) {
this.taskId = Objects.requireNonNull(taskId, "taskId");
this.promptSummary = promptSummary != null && promptSummary.length() > 60
? promptSummary.substring(0, 60) + "..."
: promptSummary;
this.connection = connection;
this.connection = Objects.requireNonNull(connection, "connection");
this.sinkRef = new AtomicReference<>(Objects.requireNonNull(initialSink, "initialSink"));
this.startedAt = Instant.now();
this.state = TaskState.RUNNING;
}
Expand All @@ -56,6 +66,22 @@ public TaskHandle(String taskId, String promptSummary, DaemonConnection connecti
public JsonNode result() { return result; }
public AtomicBoolean cancelled() { return cancelled; }

/**
* Returns the current output sink.
*/
public OutputSink outputSink() { return sinkRef.get(); }

/**
* Atomically swaps the output sink and returns the previous one.
* Used for /bg (swap to BackgroundOutputBuffer) and /fg (swap to ForegroundOutputSink).
*
* @param newSink the new sink to install
* @return the previous sink
*/
public OutputSink swapOutputSink(OutputSink newSink) {
return sinkRef.getAndSet(Objects.requireNonNull(newSink, "newSink"));
}

public Thread streamThread() { return streamThread; }
public void setStreamThread(Thread thread) { this.streamThread = thread; }

Expand Down
15 changes: 11 additions & 4 deletions aceclaw-cli/src/main/java/dev/aceclaw/cli/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import java.io.IOException;
import java.util.*;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -46,19 +47,25 @@ public void setOnTaskComplete(TaskCompleteCallback callback) {
* @param prompt the user prompt
* @param connection dedicated connection for this task
* @param sessionId daemon session ID
* @param outputSink where streaming output goes
* @param outputSink where streaming output goes initially
* @param permissionBridge bridge for permission requests
* @return the created TaskHandle
*/
public TaskHandle submit(String prompt, DaemonConnection connection, String sessionId,
OutputSink outputSink, PermissionBridge permissionBridge) {
Objects.requireNonNull(prompt, "prompt");
Objects.requireNonNull(connection, "connection");
Objects.requireNonNull(sessionId, "sessionId");
Objects.requireNonNull(outputSink, "outputSink");
Objects.requireNonNull(permissionBridge, "permissionBridge");

String taskId = String.valueOf(taskSeq.getAndIncrement());
var handle = new TaskHandle(taskId, prompt, connection);
var handle = new TaskHandle(taskId, prompt, connection, outputSink);
tasks.put(taskId, handle);

// Send agent.prompt request on the task connection
// TaskStreamReader reads sink from handle.outputSink() — supports /bg and /fg swaps
var reader = new TaskStreamReader(handle, connection, sessionId,
prompt, outputSink, permissionBridge, this::handleTaskComplete);
prompt, permissionBridge, this::handleTaskComplete);

Thread thread = Thread.ofVirtual()
.name("aceclaw-task-" + taskId)
Expand Down
Loading