Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ static Signature newSignature(final String sql, Schema resultSetSchema, Schema p
public void closeStatement(final StatementHandle statementHandle) {
PreparedStatement preparedStatement =
statementHandlePreparedStatementMap.remove(new StatementHandleKey(statementHandle));
// Testing if the prepared statement was created because the statement can be not created until
// Testing if the prepared statement was created because the statement can be
// not created until
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this change is unecessary (I guess it's due to reformating)

// this moment
if (preparedStatement != null) {
preparedStatement.close();
Expand Down Expand Up @@ -224,7 +225,8 @@ public ExecuteResult prepareAndExecute(
MetaResultSet.create(handle.connectionId, handle.id, false, handle.signature, null);
return new ExecuteResult(Collections.singletonList(metaResultSet));
} catch (SQLTimeoutException e) {
// So far AvaticaStatement(executeInternal) only handles NoSuchStatement and Runtime
// So far AvaticaStatement(executeInternal) only handles NoSuchStatement and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same here

// Runtime
// Exceptions.
throw new RuntimeException(e);
} catch (SQLException e) {
Expand Down Expand Up @@ -253,6 +255,20 @@ public boolean syncResults(
return false;
}

@Override
public ConnectionProperties connectionSync(ConnectionHandle ch, ConnectionProperties connProps) {
final ConnectionProperties result = super.connectionSync(ch, connProps);
final String newCatalog = this.connProps.getCatalog();
if (newCatalog != null) {
try {
((ArrowFlightConnection) connection).getClientHandler().setCatalog(newCatalog);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
return result;
}

void setDefaultConnectionProperties() {
// TODO Double-check this.
connProps
Expand All @@ -268,7 +284,8 @@ PreparedStatement getPreparedStatement(StatementHandle statementHandle) {
return statementHandlePreparedStatementMap.get(new StatementHandleKey(statementHandle));
}

// Helper used to look up prepared statement instances later. Avatica doesn't give us the
// Helper used to look up prepared statement instances later. Avatica doesn't
// give us the
// signature in
// an UPDATE code path so we can't directly use StatementHandle as a map key.
private static final class StatementHandleKey {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.arrow.flight.FlightStatusCode;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.LocationSchemes;
import org.apache.arrow.flight.SessionOptionValue;
import org.apache.arrow.flight.SessionOptionValueFactory;
import org.apache.arrow.flight.SetSessionOptionsRequest;
import org.apache.arrow.flight.SetSessionOptionsResult;
Expand Down Expand Up @@ -147,20 +146,26 @@ public List<CloseableEndpointStreamPair> getStreams(final FlightInfo flightInfo)
try {
for (FlightEndpoint endpoint : flightInfo.getEndpoints()) {
if (endpoint.getLocations().isEmpty()) {
// Create a stream using the current client only and do not close the client at the end.
// Create a stream using the current client only and do not close the client at
// the end.
endpoints.add(
new CloseableEndpointStreamPair(
sqlClient.getStream(endpoint.getTicket(), getOptions()), null));
} else {
// Clone the builder and then set the new endpoint on it.

// GH-38574: Currently a new FlightClient will be made for each partition that returns a
// non-empty Location then disposed of. It may be better to cache clients because a server
// may report the same Locations. It would also be good to identify when the reported
// GH-38574: Currently a new FlightClient will be made for each partition that
// returns a
// non-empty Location then disposed of. It may be better to cache clients
// because a server
// may report the same Locations. It would also be good to identify when the
// reported
// location
// is the same as the original connection's Location and skip creating a FlightClient in
// is the same as the original connection's Location and skip creating a
// FlightClient in
// that scenario.
// Also copy the cache to the client so we can share a cache. Cache needs to cache
// Also copy the cache to the client so we can share a cache. Cache needs to
// cache
// negative attempts too.
List<Exception> exceptions = new ArrayList<>();
CloseableEndpointStreamPair stream = null;
Expand Down Expand Up @@ -337,7 +342,8 @@ private boolean isBenignCloseException(FlightRuntimeException fre) {
*/
private void logSuppressedCloseException(
FlightRuntimeException fre, String operationDescription) {
// ARROW-17785 and GH-863: suppress exceptions caused by flaky gRPC layer during shutdown
// ARROW-17785 and GH-863: suppress exceptions caused by flaky gRPC layer during
// shutdown
LOGGER.debug("Suppressed error {}", operationDescription, fre);
}

Expand Down Expand Up @@ -388,25 +394,40 @@ public interface PreparedStatement extends AutoCloseable {
/** A connection is created with catalog set as a session option. */
private void setSetCatalogInSessionIfPresent() {
if (catalog.isPresent()) {
final SetSessionOptionsRequest setSessionOptionRequest =
new SetSessionOptionsRequest(
ImmutableMap.<String, SessionOptionValue>builder()
.put(CATALOG, SessionOptionValueFactory.makeSessionOptionValue(catalog.get()))
.build());
final SetSessionOptionsResult result =
sqlClient.setSessionOptions(setSessionOptionRequest, getOptions());
try {
setCatalog(catalog.get());
} catch (SQLException e) {
throw CallStatus.INVALID_ARGUMENT
.withDescription(e.getMessage())
.withCause(e)
.toRuntimeException();
}
}
}

/**
* Sets the catalog for the current session.
*
* @param catalog the catalog to set.
* @throws SQLException if an error occurs while setting the catalog.
*/
public void setCatalog(final String catalog) throws SQLException {
final SetSessionOptionsRequest request =
new SetSessionOptionsRequest(
ImmutableMap.of(CATALOG, SessionOptionValueFactory.makeSessionOptionValue(catalog)));
try {
final SetSessionOptionsResult result = sqlClient.setSessionOptions(request, getOptions());
if (result.hasErrors()) {
Map<String, SetSessionOptionsResult.Error> errors = result.getErrors();
for (Map.Entry<String, SetSessionOptionsResult.Error> error : errors.entrySet()) {
final Map<String, SetSessionOptionsResult.Error> errors = result.getErrors();
for (final Map.Entry<String, SetSessionOptionsResult.Error> error : errors.entrySet()) {
LOGGER.warn(error.toString());
}
throw CallStatus.INVALID_ARGUMENT
.withDescription(
String.format(
"Cannot set session option for catalog = %s. Check log for details.", catalog))
.toRuntimeException();
throw new SQLException(
String.format(
"Cannot set session option for catalog = %s. Check log for details.", catalog));
}
} catch (final FlightRuntimeException e) {
throw new SQLException(e);
}
}

Expand Down Expand Up @@ -654,7 +675,8 @@ public static final class Builder {

@VisibleForTesting @Nullable Duration connectTimeout;

// These two middleware are for internal use within build() and should not be exposed by builder
// These two middleware are for internal use within build() and should not be
// exposed by builder
// APIs.
// Note that these middleware may not necessarily be registered.
@VisibleForTesting
Expand Down Expand Up @@ -980,15 +1002,17 @@ public Location getLocation() {
* @throws SQLException on error.
*/
public ArrowFlightSqlClientHandler build() throws SQLException {
// Copy middleware so that the build method doesn't change the state of the builder fields
// Copy middleware so that the build method doesn't change the state of the
// builder fields
// itself.
Set<FlightClientMiddleware.Factory> buildTimeMiddlewareFactories =
new HashSet<>(this.middlewareFactories);
FlightClient client = null;
boolean isUsingUserPasswordAuth = username != null && token == null;

try {
// Token should take priority since some apps pass in a username/password even when a token
// Token should take priority since some apps pass in a username/password even
// when a token
// is provided
if (isUsingUserPasswordAuth) {
buildTimeMiddlewareFactories.add(authFactory);
Expand Down Expand Up @@ -1047,8 +1071,10 @@ public ArrowFlightSqlClientHandler build() throws SQLException {
allocator, channelBuilder.build(), clientBuilder.middleware());
final ArrayList<CallOption> credentialOptions = new ArrayList<>();
if (isUsingUserPasswordAuth) {
// If the authFactory has already been used for a handshake, use the existing token.
// This can occur if the authFactory is being re-used for a new connection spawned for
// If the authFactory has already been used for a handshake, use the existing
// token.
// This can occur if the authFactory is being re-used for a new connection
// spawned for
// getStream().
if (authFactory.getCredentialCallOption() != null) {
credentialOptions.add(authFactory.getCredentialCallOption());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.arrow.driver.jdbc;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -26,12 +27,15 @@
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;
import org.apache.arrow.driver.jdbc.authentication.UserPasswordAuthentication;
import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler;
import org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty;
import org.apache.arrow.driver.jdbc.utils.MockFlightSqlProducer;
import org.apache.arrow.flight.FlightMethod;
import org.apache.arrow.flight.NoOpSessionOptionValueVisitor;
import org.apache.arrow.flight.SessionOptionValue;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
Expand Down Expand Up @@ -614,12 +618,46 @@ public void testJdbcDriverVersionIntegration() throws Exception {

var expectedUserAgent =
"JDBC Flight SQL Driver " + driverVersion.getDriverVersion().versionString;
// Driver appends version to grpc user-agent header. Assert the header starts with the
// Driver appends version to grpc user-agent header. Assert the header starts
// with the
// expected
// value and ignored grpc version.
assertTrue(
actualUserAgent.startsWith(expectedUserAgent),
"Expected: " + expectedUserAgent + " but found: " + actualUserAgent);
}
}

@Test
public void testSetCatalogShouldUpdateSessionOptions() throws Exception {
final Properties properties = new Properties();
properties.put(ArrowFlightConnectionProperty.USER.camelName(), userTest);
properties.put(ArrowFlightConnectionProperty.PASSWORD.camelName(), passTest);
properties.put("useEncryption", false);

try (Connection connection =
DriverManager.getConnection(
"jdbc:arrow-flight-sql://"
+ FLIGHT_SERVER_TEST_EXTENSION.getHost()
+ ":"
+ FLIGHT_SERVER_TEST_EXTENSION.getPort(),
properties)) {
final String catalog = "new_catalog";
connection.setCatalog(catalog);

final Map<String, SessionOptionValue> options = PRODUCER.getSessionOptions();
assertTrue(options.containsKey("catalog"));
String actualCatalog =
options
.get("catalog")
.acceptVisitor(
new NoOpSessionOptionValueVisitor<String>() {
@Override
public String visit(String value) {
return value;
}
});
assertEquals(catalog, actualCatalog);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
import org.apache.arrow.flight.PutResult;
import org.apache.arrow.flight.Result;
import org.apache.arrow.flight.SchemaResult;
import org.apache.arrow.flight.SessionOptionValue;
import org.apache.arrow.flight.SetSessionOptionsRequest;
import org.apache.arrow.flight.SetSessionOptionsResult;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.sql.FlightSqlProducer;
import org.apache.arrow.flight.sql.SqlInfoBuilder;
Expand Down Expand Up @@ -664,6 +667,22 @@ public SqlInfoBuilder getSqlInfoBuilder() {
return sqlInfoBuilder;
}

private final Map<String, SessionOptionValue> sessionOptions = new HashMap<>();

@Override
public void setSessionOptions(
final SetSessionOptionsRequest request,
final CallContext context,
final StreamListener<SetSessionOptionsResult> listener) {
sessionOptions.putAll(request.getSessionOptions());
listener.onNext(new SetSessionOptionsResult(Collections.emptyMap()));
listener.onCompleted();
}

public Map<String, SessionOptionValue> getSessionOptions() {
return sessionOptions;
}

private static final class TicketConversionUtils {
private TicketConversionUtils() {
// Prevent instantiation.
Expand Down
Loading