Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ protected void doStart() throws Exception {
}
}

public int getLocalPort() {
if (asyncServerTransport != null) {
return asyncServerTransport.getPort();
}
if (syncServerTransport != null) {
return syncServerTransport.getServerSocket().getLocalPort();
}
return -1;
}

@Override
protected void doStop() throws Exception {
if (server != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.camel.component.thrift.generated.Calculator;
import org.apache.camel.component.thrift.generated.Operation;
import org.apache.camel.component.thrift.generated.Work;
import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.test.junit6.CamelTestSupport;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.async.TAsyncClientManager;
Expand All @@ -40,7 +39,6 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,8 +48,7 @@

public class ThriftConsumerAsyncTest extends CamelTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(ThriftConsumerAsyncTest.class);
@RegisterExtension
AvailablePortFinder.Port thriftTestPort = AvailablePortFinder.find();

private static final int THRIFT_TEST_NUM1 = 12;
private static final int THRIFT_TEST_NUM2 = 13;
private Calculator.AsyncClient thriftClient;
Expand All @@ -63,11 +60,16 @@ public class ThriftConsumerAsyncTest extends CamelTestSupport {
private int allTypesResult;
private Work echoResult;

private int getActualPort() {
return ((ThriftConsumer) context.getRoutes().get(0).getConsumer()).getLocalPort();
}

@BeforeEach
public void startThriftClient() throws IOException, TTransportException {
if (transport == null) {
LOG.info("Connecting to the Thrift server on port: {}", thriftTestPort.getPort());
transport = new TNonblockingSocket("localhost", thriftTestPort.getPort());
int thriftTestPort = getActualPort();
LOG.info("Connecting to the Thrift server on port: {}", thriftTestPort);
transport = new TNonblockingSocket("localhost", thriftTestPort);
thriftClient = (new Calculator.AsyncClient.Factory(new TAsyncClientManager(), new TBinaryProtocol.Factory()))
.getAsyncClient(transport);
}
Expand Down Expand Up @@ -243,8 +245,7 @@ protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() {
from("thrift://localhost:" + thriftTestPort.getPort()
+ "/org.apache.camel.component.thrift.generated.Calculator")
from("thrift://localhost:0/org.apache.camel.component.thrift.generated.Calculator")
.to("mock:thrift-service").choice()
.when(header(ThriftConstants.THRIFT_METHOD_NAME_HEADER).isEqualTo("calculate"))
.setBody(simple(Integer.valueOf(THRIFT_TEST_NUM1 * THRIFT_TEST_NUM2).toString()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.camel.component.thrift.generated.Calculator;
import org.apache.camel.component.thrift.generated.Operation;
import org.apache.camel.component.thrift.generated.Work;
import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.test.junit6.CamelTestSupport;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
Expand All @@ -41,7 +40,6 @@
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.layered.TFramedTransport;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -51,16 +49,16 @@
public class ThriftConsumerConcurrentTest extends CamelTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(ThriftConsumerConcurrentTest.class);

@RegisterExtension
AvailablePortFinder.Port thriftSyncPort = AvailablePortFinder.find();
@RegisterExtension
AvailablePortFinder.Port thriftAsyncPort = AvailablePortFinder.find();
private static final int THRIFT_TEST_NUM1 = 12;
private static final int CONCURRENT_THREAD_COUNT = 30;
private static final int ROUNDS_PER_THREAD_COUNT = 10;

private static AtomicInteger idCounter = new AtomicInteger();

private int getPortForRoute(int index) {
return ((ThriftConsumer) context.getRoutes().get(index).getConsumer()).getLocalPort();
}

public static Integer createId() {
return idCounter.getAndIncrement();
}
Expand All @@ -75,7 +73,7 @@ public void testSyncWithConcurrentThreads() {

@Override
public void run() throws TTransportException {
TTransport transport = new TSocket("localhost", thriftSyncPort.getPort());
TTransport transport = new TSocket("localhost", getPortForRoute(0));
transport.open();
TProtocol protocol = new TBinaryProtocol(new TFramedTransport(transport));
Calculator.Client client = (new Calculator.Client.Factory()).getClient(protocol);
Expand Down Expand Up @@ -108,7 +106,7 @@ public void testAsyncWithConcurrentThreads() {
public void run() throws TTransportException, IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);

TNonblockingTransport transport = new TNonblockingSocket("localhost", thriftAsyncPort.getPort());
TNonblockingTransport transport = new TNonblockingSocket("localhost", getPortForRoute(1));
Calculator.AsyncClient client
= (new Calculator.AsyncClient.Factory(new TAsyncClientManager(), new TBinaryProtocol.Factory()))
.getAsyncClient(transport);
Expand Down Expand Up @@ -165,12 +163,10 @@ protected RouteBuilder createRouteBuilder() {
@Override
public void configure() {

from("thrift://localhost:" + thriftSyncPort.getPort()
+ "/org.apache.camel.component.thrift.generated.Calculator?synchronous=true")
from("thrift://localhost:0/org.apache.camel.component.thrift.generated.Calculator?synchronous=true")
.setBody(simple("${body[1]}")).bean(new CalculatorMessageBuilder(), "multiply");

from("thrift://localhost:" + thriftAsyncPort.getPort()
+ "/org.apache.camel.component.thrift.generated.Calculator")
from("thrift://localhost:0/org.apache.camel.component.thrift.generated.Calculator")
.setBody(simple("${body[1]}")).bean(new CalculatorMessageBuilder(), "multiply");
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.camel.support.jsse.KeyManagersParameters;
import org.apache.camel.support.jsse.KeyStoreParameters;
import org.apache.camel.support.jsse.SSLContextParameters;
import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.test.junit6.CamelTestSupport;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
Expand All @@ -35,7 +34,6 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,8 +43,7 @@

public class ThriftConsumerSecurityTest extends CamelTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(ThriftConsumerSecurityTest.class);
@RegisterExtension
AvailablePortFinder.Port thriftTestPort = AvailablePortFinder.find();

private static final int THRIFT_TEST_NUM1 = 12;
private static final int THRIFT_TEST_NUM2 = 13;
private static final String TRUST_STORE_RESOURCE = "file:src/test/resources/certs/truststore.jks";
Expand All @@ -59,15 +56,20 @@ public class ThriftConsumerSecurityTest extends CamelTestSupport {
private TProtocol protocol;
private TTransport transport;

private int getActualPort() {
return ((ThriftConsumer) context.getRoutes().get(0).getConsumer()).getLocalPort();
}

@BeforeEach
public void startThriftSecureClient() throws TTransportException {
if (transport == null) {
LOG.info("Connecting to the secured Thrift server on port: {}", thriftTestPort.getPort());
int thriftTestPort = getActualPort();
LOG.info("Connecting to the secured Thrift server on port: {}", thriftTestPort);

TSSLTransportFactory.TSSLTransportParameters sslParams = new TSSLTransportFactory.TSSLTransportParameters();

sslParams.setTrustStore(TRUST_STORE_RESOURCE, SECURITY_STORE_PASSWORD);
transport = TSSLTransportFactory.getClientSocket("localhost", thriftTestPort.getPort(), THRIFT_CLIENT_TIMEOUT,
transport = TSSLTransportFactory.getClientSocket("localhost", thriftTestPort, THRIFT_CLIENT_TIMEOUT,
sslParams);

protocol = new TBinaryProtocol(transport);
Expand Down Expand Up @@ -140,8 +142,7 @@ protected RouteBuilder createRouteBuilder() {
@Override
public void configure() {

from("thrift://localhost:" + thriftTestPort.getPort()
+ "/org.apache.camel.component.thrift.generated.Calculator?negotiationType=SSL&sslParameters=#sslParams&synchronous=true")
from("thrift://localhost:0/org.apache.camel.component.thrift.generated.Calculator?negotiationType=SSL&sslParameters=#sslParams&synchronous=true")
.to("mock:thrift-secure-service").choice()
.when(header(ThriftConstants.THRIFT_METHOD_NAME_HEADER).isEqualTo("calculate"))
.setBody(simple(Integer.valueOf(THRIFT_TEST_NUM1 * THRIFT_TEST_NUM2).toString()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.camel.component.thrift.generated.Calculator;
import org.apache.camel.component.thrift.generated.Operation;
import org.apache.camel.component.thrift.generated.Work;
import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.test.junit6.CamelTestSupport;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
Expand All @@ -32,7 +31,6 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,20 +40,24 @@

public class ThriftConsumerSyncTest extends CamelTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(ThriftConsumerSyncTest.class);
@RegisterExtension
AvailablePortFinder.Port thriftTestPort = AvailablePortFinder.find();

private static final int THRIFT_TEST_NUM1 = 12;
private static final int THRIFT_TEST_NUM2 = 13;
private Calculator.Client thriftClient;

private TProtocol protocol;
private TTransport transport;

private int getActualPort() {
return ((ThriftConsumer) context.getRoutes().get(0).getConsumer()).getLocalPort();
}

@BeforeEach
public void startThriftClient() throws TTransportException {
if (transport == null) {
LOG.info("Connecting to the Thrift server on port: {}", thriftTestPort.getPort());
transport = new TSocket("localhost", thriftTestPort.getPort());
int thriftTestPort = getActualPort();
LOG.info("Connecting to the Thrift server on port: {}", thriftTestPort);
transport = new TSocket("localhost", thriftTestPort);
transport.open();
protocol = new TBinaryProtocol(new TFramedTransport(transport));
thriftClient = (new Calculator.Client.Factory()).getClient(protocol);
Expand Down Expand Up @@ -106,8 +108,7 @@ protected RouteBuilder createRouteBuilder() {
@Override
public void configure() {

from("thrift://localhost:" + thriftTestPort.getPort()
+ "/org.apache.camel.component.thrift.generated.Calculator?synchronous=true")
from("thrift://localhost:0/org.apache.camel.component.thrift.generated.Calculator?synchronous=true")
.to("mock:thrift-service").choice()
.when(header(ThriftConstants.THRIFT_METHOD_NAME_HEADER).isEqualTo("calculate"))
.setBody(simple(Integer.valueOf(THRIFT_TEST_NUM1 * THRIFT_TEST_NUM2).toString()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.camel.component.thrift.generated.Calculator;
import org.apache.camel.component.thrift.generated.Operation;
import org.apache.camel.component.thrift.generated.Work;
import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.test.junit6.CamelTestSupport;
import org.apache.thrift.TConfiguration;
import org.apache.thrift.protocol.TBinaryProtocol;
Expand All @@ -33,7 +32,6 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,8 +41,7 @@

public class ThriftConsumerZlibCompressionTest extends CamelTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(ThriftConsumerZlibCompressionTest.class);
@RegisterExtension
AvailablePortFinder.Port thriftTestPort = AvailablePortFinder.find();

private static final int THRIFT_TEST_NUM1 = 12;
private static final int THRIFT_TEST_NUM2 = 13;
private static final int THRIFT_CLIENT_TIMEOUT = 2000;
Expand All @@ -54,12 +51,17 @@ public class ThriftConsumerZlibCompressionTest extends CamelTestSupport {
private TProtocol protocol;
private TTransport transport;

private int getActualPort() {
return ((ThriftConsumer) context.getRoutes().get(0).getConsumer()).getLocalPort();
}

@BeforeEach
public void startThriftZlibClient() throws TTransportException {
if (transport == null) {
LOG.info("Connecting to the Thrift server with zlib compression on port: {}", thriftTestPort.getPort());
int thriftTestPort = getActualPort();
LOG.info("Connecting to the Thrift server with zlib compression on port: {}", thriftTestPort);

transport = new TSocket(new TConfiguration(), "localhost", thriftTestPort.getPort(), THRIFT_CLIENT_TIMEOUT);
transport = new TSocket(new TConfiguration(), "localhost", thriftTestPort, THRIFT_CLIENT_TIMEOUT);
protocol = new TBinaryProtocol(new TZlibTransport(transport));
thriftClient = new Calculator.Client(protocol);
transport.open();
Expand Down Expand Up @@ -115,8 +117,7 @@ protected RouteBuilder createRouteBuilder() {
@Override
public void configure() {

from("thrift://localhost:" + thriftTestPort.getPort()
+ "/org.apache.camel.component.thrift.generated.Calculator?compressionType=ZLIB&synchronous=true")
from("thrift://localhost:0/org.apache.camel.component.thrift.generated.Calculator?compressionType=ZLIB&synchronous=true")
.to("mock:thrift-secure-service").choice()
.when(header(ThriftConstants.THRIFT_METHOD_NAME_HEADER).isEqualTo("calculate"))
.setBody(simple(Integer.valueOf(THRIFT_TEST_NUM1 * THRIFT_TEST_NUM2).toString()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,16 @@

import org.apache.camel.component.thrift.generated.Calculator;
import org.apache.camel.component.thrift.impl.CalculatorSyncServerImpl;
import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.test.junit6.CamelTestSupport;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.THsHaServer.Args;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ThriftProducerBaseTest extends CamelTestSupport {
@RegisterExtension
AvailablePortFinder.Port thriftTestPort = AvailablePortFinder.find();
protected int thriftTestPort;
protected static final int THRIFT_TEST_NUM1 = 12;
protected static final int THRIFT_TEST_NUM2 = 13;
@SuppressWarnings({ "rawtypes" })
Expand All @@ -42,13 +39,14 @@ public abstract class ThriftProducerBaseTest extends CamelTestSupport {

@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
protected void setupResources() throws Exception {
public void doPreSetup() throws Exception {
processor = new Calculator.Processor(new CalculatorSyncServerImpl());
serverTransport = new TNonblockingServerSocket(thriftTestPort.getPort());
serverTransport = new TNonblockingServerSocket(0);
thriftTestPort = serverTransport.getPort();
server = new THsHaServer(new Args(serverTransport).processor(processor));
Runnable simple = new Runnable() {
public void run() {
LOG.info("Thrift server started on port: {}", thriftTestPort.getPort());
LOG.info("Thrift server started on port: {}", thriftTestPort);
server.serve();
}
};
Expand Down
Loading
Loading