Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Complete CallCredentials support in gRPC webclient
The CallCredentials API on GrpcServiceDescriptor and
GrpcClientMethodDescriptor was introduced in helidon-io#8423 with builder support
and Javadoc describing override semantics, but was never wired through
to outgoing RPCs. GrpcServiceClientImpl.ensureMethod() always passed
CallOptions.DEFAULT, and GrpcBaseClientCall.start() never read
callOptions.getCredentials(). Credentials configured via either the
native Helidon API or stub.withCallCredentials() were silently
discarded.

GrpcServiceClientImpl.ensureMethod() now resolves credentials from the
descriptor chain (method-level overrides service-level) and injects them
into CallOptions. GrpcBaseClientCall.start() applies
callOptions.getCredentials() before writeHeaders, using a
CompletableFuture to bridge the async MetadataApplier callback to the
blocking call path. Timeout is derived from the call deadline when set,
falling back to pollWaitTime. A startFailed flag guards subclass stream
I/O methods against use after start() exits early.

Fixes helidon-io#11741
Relates to helidon-io#10527
  • Loading branch information
MariusVolkhart committed Apr 22, 2026
commit 31a666b1a48320a85c1510957e808e69f3f09798
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -66,6 +67,7 @@
import io.helidon.webclient.http2.Http2StreamConfig;
import io.helidon.webclient.http2.StreamTimeoutException;

import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Context;
Expand Down Expand Up @@ -122,6 +124,7 @@ record MethodMetrics(Counter callStarted,
private volatile HelidonSocket socket;
private volatile MethodMetrics methodMetrics;
private volatile long startMillis;
private volatile boolean startFailed;

/**
* The lesser of the {@link CallOptions} deadline and the ambient {@link Context} deadline,
Expand Down Expand Up @@ -176,16 +179,72 @@ public void start(Listener<ResT> responseListener, Metadata metadata) {
methodMetrics.callStarted.increment();
}

// Fail fast if deadline already expired — avoids wasting a TCP connection
// Resolve URI and assemble headers before opening a connection so that
// credential failure can abort without leaking network resources.
ClientUri clientUri = nextClientUri();
WritableHeaders<?> headers = setupHeaders(metadata, clientUri.authority(), methodDescriptor.getFullMethodName());

// Apply per-call credentials before opening the connection. Credential
// metadata is included in the initial HTTP/2 HEADERS frame.
CallCredentials credentials = callOptions.getCredentials();
if (credentials != null) {
long timeoutMs = credentialTimeoutMs();
Metadata[] credentialResult = new Metadata[1];
Status[] credentialFailure = new Status[1];
CountDownLatch latch = new CountDownLatch(1);
credentials.applyRequestMetadata(
new HelidonRequestInfo(methodDescriptor, callOptions,
clientUri.authority(), grpcConfig.tls().enabled()),
Thread::startVirtualThread,
new CallCredentials.MetadataApplier() {
@Override
public void apply(Metadata credentialHeaders) {
credentialResult[0] = credentialHeaders;
latch.countDown();
}

@Override
public void fail(Status status) {
credentialFailure[0] = status;
latch.countDown();
}
});
try {
if (!latch.await(timeoutMs, TimeUnit.MILLISECONDS)) {
startFailed = true;
responseListener.onClose(
Status.DEADLINE_EXCEEDED.withDescription("Timed out waiting for call credentials"),
new Metadata());
return;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
startFailed = true;
responseListener.onClose(
Status.CANCELLED.withDescription("Interrupted waiting for call credentials"),
new Metadata());
return;
}
if (credentialFailure[0] != null) {
startFailed = true;
responseListener.onClose(credentialFailure[0], new Metadata());
return;
}
GrpcHeadersUtil.updateHeaders(headers, credentialResult[0]);
}

// Fail fast if deadline already expired — avoids wasting a TCP connection.
// Checked after credentials so that locally-resolved credentials are not
// discarded unnecessarily.
if (effectiveDeadline != null && effectiveDeadline.isExpired()) {
startFailed = true;
responseListener.onClose(Status.DEADLINE_EXCEEDED
.withDescription("deadline expired before call started"), EMPTY_METADATA);
unblockUnaryExecutor();
return;
}

// obtain HTTP2 connection
ClientUri clientUri = nextClientUri();
ClientConnection clientConnection = clientConnection(clientUri);
socket = clientConnection.helidonSocket();
connection = Http2ClientConnection.create((Http2ClientImpl) grpcClient.http2Client(),
Expand Down Expand Up @@ -225,9 +284,7 @@ public Duration readTimeout() {
// start streaming threads
startStreamingThreads();

// send HEADERS frame
WritableHeaders<?> headers = setupHeaders(metadata, clientUri.authority(), methodDescriptor.getFullMethodName());
// Write grpc-timeout header after metadata conversion so it cannot be overwritten
// Write grpc-timeout header after credential metadata so it cannot be overwritten
if (effectiveDeadline != null) {
long timeoutNanos = effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS);
String encoded = GrpcHeadersUtil.encodeTimeout(timeoutNanos);
Expand Down Expand Up @@ -411,6 +468,14 @@ GrpcClientStream clientStream() {
return clientStream;
}

/**
* Returns {@code true} when {@link #start} closed the call early (for example, because
* credentials timed out). Subclasses must skip stream I/O when this flag is set.
*/
boolean isStartFailed() {
return startFailed;
}

Listener<ResT> responseListener() {
return responseListener;
}
Expand Down Expand Up @@ -470,6 +535,22 @@ private Http2Settings http2Settings(Http2ClientProtocolConfig config) {
.build();
}

/**
* Compute the timeout in milliseconds to wait for {@link io.grpc.CallCredentials}
* to apply their metadata.
*
* <p>Uses remaining time from the call deadline if one is set; otherwise falls back
* to {@code pollWaitTime} from the protocol config.
*
* @return timeout in milliseconds; zero or negative means the deadline has already passed
*/
private long credentialTimeoutMs() {
if (effectiveDeadline != null) {
return effectiveDeadline.timeRemaining(TimeUnit.MILLISECONDS);
}
return pollWaitTime.toMillis();
}

void initMetrics() {
String baseUri = grpcChannel.baseUri().toString();
String methodName = methodDescriptor.getFullMethodName();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, 2025 Oracle and/or its affiliates.
* Copyright (c) 2024, 2026 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -73,13 +73,19 @@ class GrpcClientCall<ReqT, ResT> extends GrpcBaseClientCall<ReqT, ResT> {

@Override
public void request(int numMessages) {
if (isStartFailed()) {
return;
}
socket().log(LOGGER, DEBUG, "request called %d", numMessages);
messageRequest.release(numMessages);
startReadBarrier.countDown();
}

@Override
public void cancel(String message, Throwable cause) {
if (isStartFailed()) {
return;
}
socket().log(LOGGER, DEBUG, "cancel called %s", message);
if (closeCalled.compareAndSet(false, true)) {
responseListener().onClose(Status.CANCELLED, EMPTY_METADATA);
Expand All @@ -93,13 +99,19 @@ public void cancel(String message, Throwable cause) {

@Override
public void halfClose() {
if (isStartFailed()) {
return;
}
socket().log(LOGGER, DEBUG, "halfClose called");
sendingQueue.add(EMPTY_BUFFER_DATA); // end marker
startWriteBarrier.countDown();
}

@Override
public void sendMessage(ReqT message) {
if (isStartFailed()) {
return;
}
// serialize and queue message for writing
byte[] serialized = serializeMessage(message);
BufferData messageData = BufferData.createReadOnly(serialized, 0, serialized.length);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024 Oracle and/or its affiliates.
* Copyright (c) 2024, 2026 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@

import io.helidon.grpc.core.WeightedBag;

import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
Expand Down Expand Up @@ -181,9 +182,20 @@ private <ReqT, ResT> ClientCall<ReqT, ResT> ensureMethod(String methodName, Meth
+ ", yet " + methodType + " was requested.");
}

// Resolve credentials: method-level overrides service-level.
// GrpcClientMethodDescriptor.callCredentials() returns nullable CallCredentials.
// GrpcServiceDescriptorBlueprint.callCredentials() returns Optional<CallCredentials>.
CallCredentials credentials = methodDescriptor.callCredentials();
if (credentials == null) {
credentials = serviceDescriptor.callCredentials().orElse(null);
}
CallOptions callOptions = credentials != null
? CallOptions.DEFAULT.withCallCredentials(credentials)
: CallOptions.DEFAULT;

// use channel that contains all service and method interceptors
if (methodDescriptor.interceptors().isEmpty()) {
return serviceChannel.newCall(methodDescriptor.descriptor(), CallOptions.DEFAULT);
return serviceChannel.newCall(methodDescriptor.descriptor(), callOptions);
} else {
Channel methodChannel = methodCache.computeIfAbsent(methodName, k -> {
WeightedBag<ClientInterceptor> interceptors = WeightedBag.create();
Expand All @@ -194,7 +206,7 @@ private <ReqT, ResT> ClientCall<ReqT, ResT> ensureMethod(String methodName, Meth
List<ClientInterceptor> orderedInterceptors = interceptors.stream().toList().reversed();
return ClientInterceptors.intercept(grpcClient.channel(), orderedInterceptors);
});
return methodChannel.newCall(methodDescriptor.descriptor(), CallOptions.DEFAULT);
return methodChannel.newCall(methodDescriptor.descriptor(), callOptions);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, 2025 Oracle and/or its affiliates.
* Copyright (c) 2024, 2026 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,6 +61,9 @@ class GrpcUnaryClientCall<ReqT, ResT> extends GrpcBaseClientCall<ReqT, ResT> {

@Override
public void request(int numMessages) {
if (isStartFailed()) {
return;
}
socket().log(LOGGER, DEBUG, "request called %d", numMessages);
if (numMessages < 1) {
close(Status.INVALID_ARGUMENT);
Expand All @@ -69,12 +72,18 @@ public void request(int numMessages) {

@Override
public void cancel(String message, Throwable cause) {
if (isStartFailed()) {
return;
}
socket().log(LOGGER, DEBUG, "cancel called %s", message);
close(Status.CANCELLED);
}

@Override
public void halfClose() {
if (isStartFailed()) {
return;
}
socket().log(LOGGER, DEBUG, "halfClose called");
if (responseReceived) {
if (responseHeaders != null) {
Expand All @@ -93,6 +102,10 @@ public void halfClose() {

@Override
public void sendMessage(ReqT message) {
// start() closed the call early (e.g. credential timeout); nothing to send
if (isStartFailed()) {
return;
}
// should only be called once
if (requestSent) {
close(Status.FAILED_PRECONDITION);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2026 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.webclient.grpc;

import io.grpc.Attributes;
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.MethodDescriptor;
import io.grpc.SecurityLevel;

/**
* Per-RPC context supplied to {@link io.grpc.CallCredentials#applyRequestMetadata} before each call.
*
* <p>The security level reflects the channel's TLS configuration:
* {@link SecurityLevel#PRIVACY_AND_INTEGRITY} when TLS is enabled,
* {@link SecurityLevel#NONE} on plaintext connections. Credentials that refuse to transmit
* tokens on insecure channels should check {@link #getSecurityLevel()}.
*
* <p>{@link #getTransportAttrs()} always returns {@link Attributes#EMPTY}; Helidon does not
* expose a gRPC transport attribute bag.
*/
class HelidonRequestInfo extends CallCredentials.RequestInfo {

private final MethodDescriptor<?, ?> methodDescriptor;
private final CallOptions callOptions;
private final String authority;
private final boolean tlsEnabled;

HelidonRequestInfo(MethodDescriptor<?, ?> methodDescriptor,
CallOptions callOptions,
String authority,
boolean tlsEnabled) {
this.methodDescriptor = methodDescriptor;
this.callOptions = callOptions;
this.authority = authority;
this.tlsEnabled = tlsEnabled;
}

@Override
public MethodDescriptor<?, ?> getMethodDescriptor() {
return methodDescriptor;
}

@Override
public SecurityLevel getSecurityLevel() {
// Report the channel-level security so credentials can decide whether
// it is safe to send bearer tokens (e.g., refuse on plaintext channels).
return tlsEnabled ? SecurityLevel.PRIVACY_AND_INTEGRITY : SecurityLevel.NONE;
}

@Override
public String getAuthority() {
return authority;
}

@Override
public Attributes getTransportAttrs() {
// Helidon does not expose a gRPC transport attribute bag; return empty.
return Attributes.EMPTY;
}

@Override
public CallOptions getCallOptions() {
return callOptions;
}
}
Loading
Loading