Skip to content
This repository was archived by the owner on May 8, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ void sessionCloseBeforeInit() throws Exception {
void sessionGoAwayTest() throws Exception {
SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew());

Duration goAwayDelay = Duration.ofMillis(100);
Duration goAwayDelay = Duration.ofMillis(500);
FakeSessionListener sessionListener = new FakeSessionListener();
session.start(
OpenSessionRequest.newBuilder()
Expand Down Expand Up @@ -215,9 +215,14 @@ void sessionGoAwayTest() throws Exception {
try {
f.get();
numOk++;
} catch (VRpcException e) {
if (e.getResult().getState() == State.UNCOMMITED) {
numUncommittedErrors++;
} catch (ExecutionException e) {
if (e.getCause() instanceof VRpcException) {
VRpcException vrpcException = (VRpcException) e.getCause();
if (vrpcException.getResult().getState() == State.UNCOMMITED) {
numUncommittedErrors++;
}
} else {
throw e;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -448,23 +448,44 @@ public void refreshConfigTest() throws Exception {

sessionPool.start(request, new Metadata());

Thread.sleep(500);
long deadline = System.currentTimeMillis() + 10_000;
boolean conditionMet = false;
List<SessionRequest> requests = null;
boolean containsHeader = false;

while (System.currentTimeMillis() < deadline) {
requests = fakeService.getSessionRequests();
List<Metadata> headers = headerInterceptor.getHeadersList();

boolean matchesRefreshRequest = false;
for (SessionRequest r : requests) {
if (OPEN_SESSION_REQUEST_CORRESPONDENCE.compare(r, refreshRequest)) {
matchesRefreshRequest = true;
break;
}
}

containsHeader = false;
for (Metadata header : headers) {
if (header.containsKey(metadataKey) && "refresh_value".equals(header.get(metadataKey))) {
containsHeader = true;
break;
}
}

List<SessionRequest> requests = fakeService.getSessionRequests();
if (requests.size() > 1 && matchesRefreshRequest && containsHeader) {
conditionMet = true;
break;
}

Thread.sleep(50);
}

assertThat(conditionMet).isTrue();
assertThat(requests.size()).isGreaterThan(1);
assertThat(requests)
.comparingElementsUsing(OPEN_SESSION_REQUEST_CORRESPONDENCE)
.contains(refreshRequest);

// Verify headers
List<Metadata> headers = headerInterceptor.getHeadersList();
boolean containsHeader = false;
for (Metadata header : headers) {
if (header.containsKey(metadataKey)) {
containsHeader = true;
assertThat(header.get(metadataKey)).isEqualTo("refresh_value");
}
}
assertThat(containsHeader).isTrue();
}

Expand Down Expand Up @@ -519,7 +540,7 @@ public Deadline getObservedDeadline() {
}

private static class HeaderInterceptor implements ServerInterceptor {
private final List<Metadata> headersList = new ArrayList<>();
private final List<Metadata> headersList = new CopyOnWriteArrayList<>();

@Override
public <ReqT, RespT> io.grpc.ServerCall.Listener<ReqT> interceptCall(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.google.bigtable.v2.SessionRequest;
import com.google.bigtable.v2.SessionResponse;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -30,7 +30,7 @@ public class FakeSessionService extends FakeSessionGrpc.FakeSessionImplBase {
private final ScheduledExecutorService executor;
private final AtomicInteger openRequestCount = new AtomicInteger(0);

private final List<SessionRequest> sessionRequests = new ArrayList<>();
private final List<SessionRequest> sessionRequests = new CopyOnWriteArrayList<>();

public FakeSessionService(ScheduledExecutorService executor) {
this.executor = executor;
Expand Down
Loading