diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Client.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Client.java index f1a67df33053..dd957f9da2f1 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Client.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Client.java @@ -1303,15 +1303,6 @@ public Client(Class valueClass, Configuration conf, CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT); } - /** - * Construct an IPC client with the default SocketFactory. - * @param valueClass input valueClass. - * @param conf input Configuration. - */ - public Client(Class valueClass, Configuration conf) { - this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf)); - } - @Override public String toString() { return getClass().getSimpleName() + "-" diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ObserverRetryOnActiveException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ObserverRetryOnActiveException.java deleted file mode 100644 index b32791bb1494..000000000000 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ObserverRetryOnActiveException.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ipc_; - - -/** - * Thrown by a remote ObserverNode indicating the operation has failed and the - * client should retry active namenode directly (instead of retry other - * ObserverNodes). - */ -public class ObserverRetryOnActiveException extends StandbyException { - static final long serialVersionUID = 1L; - public ObserverRetryOnActiveException(String msg) { - super(msg); - } -} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtoUtil.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtoUtil.java index 2fe400b72174..ce4fc4d8e4b6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtoUtil.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtoUtil.java @@ -18,9 +18,6 @@ package org.apache.hadoop.ipc_; -import java.io.DataInput; -import java.io.IOException; - import org.apache.hadoop.ipc_.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; import org.apache.hadoop.ipc_.protobuf.IpcConnectionContextProtos.UserInformationProto; import org.apache.hadoop.ipc_.protobuf.RpcHeaderProtos.*; @@ -31,47 +28,7 @@ public abstract class ProtoUtil { - /** - * Read a variable length integer in the same format that ProtoBufs encodes. - * @param in the input stream to read from - * @return the integer - * @throws IOException if it is malformed or EOF. - */ - public static int readRawVarint32(DataInput in) throws IOException { - byte tmp = in.readByte(); - if (tmp >= 0) { - return tmp; - } - int result = tmp & 0x7f; - if ((tmp = in.readByte()) >= 0) { - result |= tmp << 7; - } else { - result |= (tmp & 0x7f) << 7; - if ((tmp = in.readByte()) >= 0) { - result |= tmp << 14; - } else { - result |= (tmp & 0x7f) << 14; - if ((tmp = in.readByte()) >= 0) { - result |= tmp << 21; - } else { - result |= (tmp & 0x7f) << 21; - result |= (tmp = in.readByte()) << 28; - if (tmp < 0) { - // Discard upper 32 bits. - for (int i = 0; i < 5; i++) { - if (in.readByte() >= 0) { - return result; - } - } - throw new IOException("Malformed varint"); - } - } - } - } - return result; - } - /** * This method creates the connection context using exactly the same logic * as the old connection context as was done for writable where diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtobufRpcEngine.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtobufRpcEngine.java index c2b7c4a01fcf..990ee83d0afd 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtobufRpcEngine.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProtobufRpcEngine.java @@ -65,32 +65,6 @@ public static AsyncGet getAsyncReturnMessage() { return ASYNC_RETURN_MESSAGE.get(); } - @Override - @SuppressWarnings("unchecked") - public ProtocolProxy getProxy(Class protocol, long clientVersion, - ConnectionId connId, Configuration conf, SocketFactory factory) - throws IOException { - final Invoker invoker = new Invoker(protocol, connId, conf, factory); - return new ProtocolProxy(protocol, (T) Proxy.newProxyInstance( - protocol.getClassLoader(), new Class[] {protocol}, invoker)); - } - - public ProtocolProxy getProxy(Class protocol, long clientVersion, - InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, - SocketFactory factory, int rpcTimeout) throws IOException { - return getProxy(protocol, clientVersion, addr, ticket, conf, factory, - rpcTimeout, null); - } - - @Override - public ProtocolProxy getProxy(Class protocol, long clientVersion, - InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, - SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy - ) throws IOException { - return getProxy(protocol, clientVersion, addr, ticket, conf, factory, - rpcTimeout, connectionRetryPolicy, null, null); - } - @Override @SuppressWarnings("unchecked") public ProtocolProxy getProxy(Class protocol, long clientVersion, @@ -309,22 +283,6 @@ public ConnectionId getConnectionId() { return remoteId; } - protected long getClientProtocolVersion() { - return clientProtocolVersion; - } - - protected String getProtocolName() { - return protocolName; - } - } - - static Client getClient(Configuration conf) { - return CLIENTS.getClient(conf, SocketFactory.getDefault(), - RpcWritable.Buffer.class); - } - - public static void clearClientCache() { - CLIENTS.clearCache(); } @Override @@ -556,6 +514,7 @@ static class RpcProtobufRequest extends RpcWritable.Buffer { private volatile RequestHeaderProto requestHeader; private Message payload; + @SuppressWarnings("unused") // required for Server#procesRpcRequest public RpcProtobufRequest() { } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProxyCombiner.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProxyCombiner.java deleted file mode 100644 index 7a2410dc00c6..000000000000 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/ProxyCombiner.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ipc_; - -import com.google.common.base.Joiner; -import java.io.Closeable; -import java.io.IOException; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; - -import org.apache.hadoop.io.MultipleIOException; -import org.apache.hadoop.ipc_.Client.ConnectionId; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * A utility class used to combine two protocol proxies. - * See {@link #combine(Class, Object...)}. - */ -public final class ProxyCombiner { - - private static final Logger LOG = - LoggerFactory.getLogger(ProxyCombiner.class); - - private ProxyCombiner() { } - - /** - * Combine two or more proxies which together comprise a single proxy - * interface. This can be used for a protocol interface which {@code extends} - * multiple other protocol interfaces. The returned proxy will implement - * all of the methods of the combined proxy interface, delegating calls - * to which proxy implements that method. If multiple proxies implement the - * same method, the first in the list will be used for delegation. - * - *

This will check that every method on the combined interface is - * implemented by at least one of the supplied proxy objects. - * - * @param combinedProxyInterface The interface of the combined proxy. - * @param proxies The proxies which should be used as delegates. - * @param The type of the proxy that will be returned. - * @return The combined proxy. - */ - @SuppressWarnings("unchecked") - public static T combine(Class combinedProxyInterface, - Object... proxies) { - methodLoop: - for (Method m : combinedProxyInterface.getMethods()) { - for (Object proxy : proxies) { - try { - proxy.getClass().getMethod(m.getName(), m.getParameterTypes()); - continue methodLoop; // go to the next method - } catch (NoSuchMethodException nsme) { - // Continue to try the next proxy - } - } - throw new IllegalStateException("The proxies specified for " - + combinedProxyInterface + " do not cover method " + m); - } - - InvocationHandler handler = - new CombinedProxyInvocationHandler(combinedProxyInterface, proxies); - return (T) Proxy.newProxyInstance(combinedProxyInterface.getClassLoader(), - new Class[] {combinedProxyInterface}, handler); - } - - private static final class CombinedProxyInvocationHandler - implements RpcInvocationHandler { - - private final Class proxyInterface; - private final Object[] proxies; - - private CombinedProxyInvocationHandler(Class proxyInterface, - Object[] proxies) { - this.proxyInterface = proxyInterface; - this.proxies = proxies; - } - - @Override - public Object invoke(Object proxy, Method method, Object[] args) - throws Throwable { - Exception lastException = null; - for (Object underlyingProxy : proxies) { - try { - return method.invoke(underlyingProxy, args); - } catch (IllegalAccessException|IllegalArgumentException e) { - lastException = e; - } catch (InvocationTargetException ite) { - throw ite.getCause(); - } - } - // This shouldn't happen since the method coverage was verified in build() - LOG.error("BUG: Method {} was unable to be found on any of the " - + "underlying proxies for {}", method, proxy.getClass()); - throw new IllegalArgumentException("Method " + method + " not supported", - lastException); - } - - /** - * Since this is incapable of returning multiple connection IDs, simply - * return the first one. In most cases, the connection ID should be the same - * for all proxies. - */ - @Override - public ConnectionId getConnectionId() { - return RPC.getConnectionIdForProxy(proxies[0]); - } - - @Override - public String toString() { - return "CombinedProxy[" + proxyInterface.getSimpleName() + "][" - + Joiner.on(",").join(proxies) + "]"; - } - - @Override - public void close() throws IOException { - MultipleIOException.Builder exceptionBuilder = - new MultipleIOException.Builder(); - for (Object proxy : proxies) { - if (proxy instanceof Closeable) { - try { - ((Closeable) proxy).close(); - } catch (IOException ioe) { - exceptionBuilder.add(ioe); - } - } - } - if (!exceptionBuilder.isEmpty()) { - throw exceptionBuilder.build(); - } - } - } -} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RPC.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RPC.java index 610bdbac1a39..56d647f5eb7a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RPC.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RPC.java @@ -19,15 +19,11 @@ package org.apache.hadoop.ipc_; import java.io.IOException; -import java.io.InterruptedIOException; import java.lang.reflect.Field; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; -import java.net.ConnectException; import java.net.InetSocketAddress; -import java.net.NoRouteToHostException; -import java.net.SocketTimeoutException; import java.io.Closeable; import java.util.ArrayList; import java.util.Map; @@ -45,14 +41,12 @@ import org.apache.hadoop.ipc_.Client.ConnectionId; import org.apache.hadoop.ipc_.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto; import org.apache.hadoop.ipc_.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -250,233 +244,6 @@ public RpcErrorCodeProto getRpcErrorCodeProto() { } } - /** - * Get a proxy connection to a remote server. - * - * @param Generics Type T. - * @param protocol protocol class - * @param clientVersion client version - * @param addr remote address - * @param conf configuration to use - * @return the proxy - * @throws IOException if the far end through a RemoteException - */ - public static T waitForProxy( - Class protocol, - long clientVersion, - InetSocketAddress addr, - Configuration conf - ) throws IOException { - return waitForProtocolProxy(protocol, clientVersion, addr, conf).getProxy(); - } - - /** - * Get a protocol proxy that contains a proxy connection to a remote server - * and a set of methods that are supported by the server. - * - * @param Generics Type T. - * @param protocol protocol class - * @param clientVersion client version - * @param addr remote address - * @param conf configuration to use - * @return the protocol proxy - * @throws IOException if the far end through a RemoteException - */ - public static ProtocolProxy waitForProtocolProxy(Class protocol, - long clientVersion, - InetSocketAddress addr, - Configuration conf) throws IOException { - return waitForProtocolProxy( - protocol, clientVersion, addr, conf, Long.MAX_VALUE); - } - - /** - * Get a proxy connection to a remote server. - * - * @param Generics Type T. - * @param protocol protocol class - * @param clientVersion client version - * @param addr remote address - * @param conf configuration to use - * @param connTimeout time in milliseconds before giving up - * @return the proxy - * @throws IOException if the far end through a RemoteException - */ - public static T waitForProxy(Class protocol, long clientVersion, - InetSocketAddress addr, Configuration conf, - long connTimeout) throws IOException { - return waitForProtocolProxy(protocol, clientVersion, addr, - conf, connTimeout).getProxy(); - } - - /** - * Get a protocol proxy that contains a proxy connection to a remote server - * and a set of methods that are supported by the server - * - * @param Generics Type T. - * @param protocol protocol class - * @param clientVersion client version - * @param addr remote address - * @param conf configuration to use - * @param connTimeout time in milliseconds before giving up - * @return the protocol proxy - * @throws IOException if the far end through a RemoteException - */ - public static ProtocolProxy waitForProtocolProxy(Class protocol, - long clientVersion, - InetSocketAddress addr, Configuration conf, - long connTimeout) throws IOException { - return waitForProtocolProxy(protocol, clientVersion, addr, conf, - getRpcTimeout(conf), null, connTimeout); - } - - /** - * Get a proxy connection to a remote server. - * - * @param Generics Type T. - * @param protocol protocol class - * @param clientVersion client version - * @param addr remote address - * @param conf configuration to use - * @param rpcTimeout timeout for each RPC - * @param timeout time in milliseconds before giving up - * @return the proxy - * @throws IOException if the far end through a RemoteException - */ - public static T waitForProxy(Class protocol, - long clientVersion, - InetSocketAddress addr, Configuration conf, - int rpcTimeout, - long timeout) throws IOException { - return waitForProtocolProxy(protocol, clientVersion, addr, - conf, rpcTimeout, null, timeout).getProxy(); - } - - /** - * Get a protocol proxy that contains a proxy connection to a remote server - * and a set of methods that are supported by the server. - * - * @param Generics Type. - * @param protocol protocol class - * @param clientVersion client version - * @param addr remote address - * @param conf configuration to use - * @param rpcTimeout timeout for each RPC - * @param connectionRetryPolicy input connectionRetryPolicy. - * @param timeout time in milliseconds before giving up - * @return the proxy - * @throws IOException if the far end through a RemoteException. - */ - public static ProtocolProxy waitForProtocolProxy(Class protocol, - long clientVersion, - InetSocketAddress addr, Configuration conf, - int rpcTimeout, - RetryPolicy connectionRetryPolicy, - long timeout) throws IOException { - long startTime = Time.now(); - IOException ioe; - while (true) { - try { - return getProtocolProxy(protocol, clientVersion, addr, - UserGroupInformation.getCurrentUser(), conf, NetUtils - .getDefaultSocketFactory(conf), rpcTimeout, connectionRetryPolicy); - } catch(ConnectException se) { // namenode has not been started - LOG.info("Server at " + addr + " not available yet, Zzzzz..."); - ioe = se; - } catch(SocketTimeoutException te) { // namenode is busy - LOG.info("Problem connecting to server: " + addr); - ioe = te; - } catch(NoRouteToHostException nrthe) { // perhaps a VIP is failing over - LOG.info("No route to host for server: " + addr); - ioe = nrthe; - } - // check if timed out - if (Time.now()-timeout >= startTime) { - throw ioe; - } - - if (Thread.currentThread().isInterrupted()) { - // interrupted during some IO; this may not have been caught - throw new InterruptedIOException("Interrupted waiting for the proxy"); - } - - // wait for retry - try { - Thread.sleep(1000); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw (IOException) new InterruptedIOException( - "Interrupted waiting for the proxy").initCause(ioe); - } - } - } - - /** - * Construct a client-side proxy object that implements the named protocol, - * talking to a server at the named address. - * @param Generics Type T. - * @param protocol input protocol. - * @param clientVersion input clientVersion. - * @param addr input addr. - * @param conf input Configuration. - * @param factory input factory. - * @throws IOException raised on errors performing I/O. - * @return proxy. - */ - public static T getProxy(Class protocol, - long clientVersion, - InetSocketAddress addr, Configuration conf, - SocketFactory factory) throws IOException { - return getProtocolProxy( - protocol, clientVersion, addr, conf, factory).getProxy(); - } - - /** - * Get a protocol proxy that contains a proxy connection to a remote server - * and a set of methods that are supported by the server. - * - * @param Generics Type T. - * @param protocol protocol class - * @param clientVersion client version - * @param addr remote address - * @param conf configuration to use - * @param factory socket factory - * @return the protocol proxy - * @throws IOException if the far end through a RemoteException - */ - public static ProtocolProxy getProtocolProxy(Class protocol, - long clientVersion, - InetSocketAddress addr, Configuration conf, - SocketFactory factory) throws IOException { - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - return getProtocolProxy(protocol, clientVersion, addr, ugi, conf, factory); - } - - /** - * Construct a client-side proxy object that implements the named protocol, - * talking to a server at the named address. - * - * @param Generics Type T. - * @param protocol input protocol. - * @param clientVersion input clientVersion. - * @param addr input addr. - * @param ticket input tocket. - * @param conf input conf. - * @param factory input factory. - * @return the protocol proxy. - * @throws IOException raised on errors performing I/O. - * - */ - public static T getProxy(Class protocol, - long clientVersion, - InetSocketAddress addr, - UserGroupInformation ticket, - Configuration conf, - SocketFactory factory) throws IOException { - return getProtocolProxy( - protocol, clientVersion, addr, ticket, conf, factory).getProxy(); - } - /** * Get a protocol proxy that contains a proxy connection to a remote server * and a set of methods that are supported by the server @@ -501,32 +268,6 @@ public static ProtocolProxy getProtocolProxy(Class protocol, factory, getRpcTimeout(conf), null); } - /** - * Construct a client-side proxy that implements the named protocol, - * talking to a server at the named address. - * - * @param Generics Type T. - * @param protocol protocol - * @param clientVersion client's version - * @param addr server address - * @param ticket security ticket - * @param conf configuration - * @param factory socket factory - * @param rpcTimeout max time for each rpc; 0 means no timeout - * @return the proxy - * @throws IOException if any error occurs - */ - public static T getProxy(Class protocol, - long clientVersion, - InetSocketAddress addr, - UserGroupInformation ticket, - Configuration conf, - SocketFactory factory, - int rpcTimeout) throws IOException { - return getProtocolProxy(protocol, clientVersion, addr, ticket, - conf, factory, rpcTimeout, null).getProxy(); - } - /** * Get a protocol proxy that contains a proxy connection to a remote server * and a set of methods that are supported by the server. @@ -591,63 +332,6 @@ public static ProtocolProxy getProtocolProxy(Class protocol, fallbackToSimpleAuth, null); } - /** - * Get a protocol proxy that contains a proxy connection to a remote server - * and a set of methods that are supported by the server. - * - * @param protocol protocol - * @param clientVersion client's version - * @param addr server address - * @param ticket security ticket - * @param conf configuration - * @param factory socket factory - * @param rpcTimeout max time for each rpc; 0 means no timeout - * @param connectionRetryPolicy retry policy - * @param fallbackToSimpleAuth set to true or false during calls to indicate - * if a secure client falls back to simple auth - * @param alignmentContext state alignment context - * @param Generics Type T. - * @return the proxy - * @throws IOException if any error occurs - */ - public static ProtocolProxy getProtocolProxy(Class protocol, - long clientVersion, - InetSocketAddress addr, - UserGroupInformation ticket, - Configuration conf, - SocketFactory factory, - int rpcTimeout, - RetryPolicy connectionRetryPolicy, - AtomicBoolean fallbackToSimpleAuth, - AlignmentContext alignmentContext) - throws IOException { - if (UserGroupInformation.isSecurityEnabled()) { - SaslRpcServer.init(conf); - } - return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion, - addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy, - fallbackToSimpleAuth, alignmentContext); - } - - /** - * Construct a client-side proxy object with the default SocketFactory. - * - * @param Generics Type T. - * @param protocol input protocol. - * @param clientVersion input clientVersion. - * @param addr input addr. - * @param conf input Configuration. - * @return a proxy instance - * @throws IOException if the thread is interrupted. - */ - public static T getProxy(Class protocol, - long clientVersion, - InetSocketAddress addr, Configuration conf) - throws IOException { - - return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy(); - } - /** * @return Returns the server address for a given proxy. * @param proxy input proxy. @@ -673,27 +357,6 @@ public static ConnectionId getConnectionIdForProxy(Object proxy) { return inv.getConnectionId(); } - /** - * Get a protocol proxy that contains a proxy connection to a remote server - * and a set of methods that are supported by the server - * - * @param protocol input protocol. - * @param clientVersion input clientVersion. - * @param addr input addr. - * @param conf input configuration. - * @param Generics Type T. - * @return a protocol proxy - * @throws IOException if the thread is interrupted. - */ - public static ProtocolProxy getProtocolProxy(Class protocol, - long clientVersion, - InetSocketAddress addr, Configuration conf) - throws IOException { - - return getProtocolProxy(protocol, clientVersion, addr, conf, NetUtils - .getDefaultSocketFactory(conf)); - } - /** * Stop the proxy. Proxy must either implement {@link Closeable} or must have * associated {@link RpcInvocationHandler}. diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RetryCache.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RetryCache.java deleted file mode 100644 index 47edb5c26fb9..000000000000 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RetryCache.java +++ /dev/null @@ -1,391 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ipc_; - - -import java.util.Arrays; -import java.util.Objects; -import java.util.UUID; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.hadoop.ipc_.metrics.RetryCacheMetrics; -import org.apache.hadoop.util.LightWeightCache; -import org.apache.hadoop.util.LightWeightGSet; -import org.apache.hadoop.util.LightWeightGSet.LinkedElement; - -import com.google.common.base.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Maintains a cache of non-idempotent requests that have been successfully - * processed by the RPC server implementation, to handle the retries. A request - * is uniquely identified by the unique client ID + call ID of the RPC request. - * On receiving retried request, an entry will be found in the - * {@link RetryCache} and the previous response is sent back to the request. - *

- * To look an implementation using this cache, see HDFS FSNamesystem class. - */ -public class RetryCache { - public static final Logger LOG = LoggerFactory.getLogger(RetryCache.class); - private final RetryCacheMetrics retryCacheMetrics; - private static final int MAX_CAPACITY = 16; - - /** - * CacheEntry is tracked using unique client ID and callId of the RPC request. - */ - public static class CacheEntry implements LightWeightCache.Entry { - /** - * Processing state of the requests. - */ - private static byte INPROGRESS = 0; - private static byte SUCCESS = 1; - private static byte FAILED = 2; - - private byte state = INPROGRESS; - - // Store uuid as two long for better memory utilization - private final long clientIdMsb; // Most signficant bytes - private final long clientIdLsb; // Least significant bytes - - private final int callId; - private final long expirationTime; - private LightWeightGSet.LinkedElement next; - - CacheEntry(byte[] clientId, int callId, long expirationTime) { - // ClientId must be a UUID - that is 16 octets. - Preconditions.checkArgument(clientId.length == ClientId.BYTE_LENGTH, - "Invalid clientId - length is " + clientId.length - + " expected length " + ClientId.BYTE_LENGTH); - // Convert UUID bytes to two longs - clientIdMsb = ClientId.getMsb(clientId); - clientIdLsb = ClientId.getLsb(clientId); - this.callId = callId; - this.expirationTime = expirationTime; - } - - CacheEntry(byte[] clientId, int callId, long expirationTime, - boolean success) { - this(clientId, callId, expirationTime); - this.state = success ? SUCCESS : FAILED; - } - - private static int hashCode(long value) { - return (int)(value ^ (value >>> 32)); - } - - @Override - public int hashCode() { - return (hashCode(clientIdMsb) * 31 + hashCode(clientIdLsb)) * 31 + callId; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (!(obj instanceof CacheEntry)) { - return false; - } - CacheEntry other = (CacheEntry) obj; - return callId == other.callId && clientIdMsb == other.clientIdMsb - && clientIdLsb == other.clientIdLsb; - } - - @Override - public void setNext(LinkedElement next) { - this.next = next; - } - - @Override - public LinkedElement getNext() { - return next; - } - - synchronized void completed(boolean success) { - state = success ? SUCCESS : FAILED; - this.notifyAll(); - } - - public synchronized boolean isSuccess() { - return state == SUCCESS; - } - - @Override - public void setExpirationTime(long timeNano) { - // expiration time does not change - } - - @Override - public long getExpirationTime() { - return expirationTime; - } - - @Override - public String toString() { - return (new UUID(this.clientIdMsb, this.clientIdLsb)).toString() + ":" - + this.callId + ":" + this.state; - } - } - - /** - * CacheEntry with payload that tracks the previous response or parts of - * previous response to be used for generating response for retried requests. - */ - public static class CacheEntryWithPayload extends CacheEntry { - private Object payload; - - CacheEntryWithPayload(byte[] clientId, int callId, Object payload, - long expirationTime) { - super(clientId, callId, expirationTime); - this.payload = payload; - } - - CacheEntryWithPayload(byte[] clientId, int callId, Object payload, - long expirationTime, boolean success) { - super(clientId, callId, expirationTime, success); - this.payload = payload; - } - - /** Override equals to avoid findbugs warnings */ - @Override - public boolean equals(Object obj) { - return super.equals(obj); - } - - /** Override hashcode to avoid findbugs warnings */ - @Override - public int hashCode() { - return super.hashCode(); - } - - public Object getPayload() { - return payload; - } - } - - private final LightWeightGSet set; - private final long expirationTime; - private String cacheName; - - private final ReentrantLock lock = new ReentrantLock(); - - /** - * Constructor - * @param cacheName name to identify the cache by - * @param percentage percentage of total java heap space used by this cache - * @param expirationTime time for an entry to expire in nanoseconds - */ - public RetryCache(String cacheName, double percentage, long expirationTime) { - int capacity = LightWeightGSet.computeCapacity(percentage, cacheName); - capacity = capacity > MAX_CAPACITY ? capacity : MAX_CAPACITY; - this.set = new LightWeightCache(capacity, capacity, - expirationTime, 0); - this.expirationTime = expirationTime; - this.cacheName = cacheName; - this.retryCacheMetrics = RetryCacheMetrics.create(this); - } - - private static boolean skipRetryCache() { - // Do not track non RPC invocation or RPC requests with - // invalid callId or clientId in retry cache - return !Server.isRpcInvocation() || Server.getCallId() < 0 - || Arrays.equals(Server.getClientId(), RpcConstants.DUMMY_CLIENT_ID); - } - - public void lock() { - this.lock.lock(); - } - - public void unlock() { - this.lock.unlock(); - } - - private void incrCacheClearedCounter() { - retryCacheMetrics.incrCacheCleared(); - } - - public LightWeightGSet getCacheSet() { - return set; - } - - public RetryCacheMetrics getMetricsForTests() { - return retryCacheMetrics; - } - - /** - * @return This method returns cache name for metrics. - */ - public String getCacheName() { - return cacheName; - } - - /** - * This method handles the following conditions: - *

    - *
  • If retry is not to be processed, return null
  • - *
  • If there is no cache entry, add a new entry {@code newEntry} and return - * it.
  • - *
  • If there is an existing entry, wait for its completion. If the - * completion state is {@link CacheEntry#FAILED}, the expectation is that the - * thread that waited for completion, retries the request. the - * {@link CacheEntry} state is set to {@link CacheEntry#INPROGRESS} again. - *
  • If the completion state is {@link CacheEntry#SUCCESS}, the entry is - * returned so that the thread that waits for it can can return previous - * response.
  • - *
      - * - * @return {@link CacheEntry}. - */ - private CacheEntry waitForCompletion(CacheEntry newEntry) { - CacheEntry mapEntry = null; - lock.lock(); - try { - mapEntry = set.get(newEntry); - // If an entry in the cache does not exist, add a new one - if (mapEntry == null) { - if (LOG.isTraceEnabled()) { - LOG.trace("Adding Rpc request clientId " - + newEntry.clientIdMsb + newEntry.clientIdLsb + " callId " - + newEntry.callId + " to retryCache"); - } - set.put(newEntry); - retryCacheMetrics.incrCacheUpdated(); - return newEntry; - } else { - retryCacheMetrics.incrCacheHit(); - } - } finally { - lock.unlock(); - } - // Entry already exists in cache. Wait for completion and return its state - Objects.requireNonNull(mapEntry, "Entry from the cache should not be null"); - // Wait for in progress request to complete - synchronized (mapEntry) { - while (mapEntry.state == CacheEntry.INPROGRESS) { - try { - mapEntry.wait(); - } catch (InterruptedException ie) { - // Restore the interrupted status - Thread.currentThread().interrupt(); - } - } - // Previous request has failed, the expectation is is that it will be - // retried again. - if (mapEntry.state != CacheEntry.SUCCESS) { - mapEntry.state = CacheEntry.INPROGRESS; - } - } - return mapEntry; - } - - /** - * Add a new cache entry into the retry cache. The cache entry consists of - * clientId and callId extracted from editlog. - * - * @param clientId input clientId. - * @param callId input callId. - */ - public void addCacheEntry(byte[] clientId, int callId) { - CacheEntry newEntry = new CacheEntry(clientId, callId, System.nanoTime() - + expirationTime, true); - lock.lock(); - try { - set.put(newEntry); - } finally { - lock.unlock(); - } - retryCacheMetrics.incrCacheUpdated(); - } - - public void addCacheEntryWithPayload(byte[] clientId, int callId, - Object payload) { - // since the entry is loaded from editlog, we can assume it succeeded. - CacheEntry newEntry = new CacheEntryWithPayload(clientId, callId, payload, - System.nanoTime() + expirationTime, true); - lock.lock(); - try { - set.put(newEntry); - } finally { - lock.unlock(); - } - retryCacheMetrics.incrCacheUpdated(); - } - - private static CacheEntry newEntry(long expirationTime) { - return new CacheEntry(Server.getClientId(), Server.getCallId(), - System.nanoTime() + expirationTime); - } - - private static CacheEntryWithPayload newEntry(Object payload, - long expirationTime) { - return new CacheEntryWithPayload(Server.getClientId(), Server.getCallId(), - payload, System.nanoTime() + expirationTime); - } - - /** - * Static method that provides null check for retryCache. - * @param cache input Cache. - * @return CacheEntry. - */ - public static CacheEntry waitForCompletion(RetryCache cache) { - if (skipRetryCache()) { - return null; - } - return cache != null ? cache - .waitForCompletion(newEntry(cache.expirationTime)) : null; - } - - /** - * Static method that provides null check for retryCache. - * @param cache input cache. - * @param payload input payload. - * @return CacheEntryWithPayload. - */ - public static CacheEntryWithPayload waitForCompletion(RetryCache cache, - Object payload) { - if (skipRetryCache()) { - return null; - } - return (CacheEntryWithPayload) (cache != null ? cache - .waitForCompletion(newEntry(payload, cache.expirationTime)) : null); - } - - public static void setState(CacheEntry e, boolean success) { - if (e == null) { - return; - } - e.completed(success); - } - - public static void setState(CacheEntryWithPayload e, boolean success, - Object payload) { - if (e == null) { - return; - } - e.payload = payload; - e.completed(success); - } - - public static void clear(RetryCache cache) { - if (cache != null) { - cache.set.clear(); - cache.incrCacheClearedCounter(); - } - } -} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RpcEngine.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RpcEngine.java index c01f94872a5b..741ea1c016b3 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RpcEngine.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RpcEngine.java @@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ipc_.Client.ConnectionId; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; @@ -34,43 +33,6 @@ /** An RPC implementation. */ public interface RpcEngine { - /** - * Construct a client-side proxy object. - * - * @param Generics Type T. - * @param protocol input protocol. - * @param clientVersion input clientVersion. - * @param addr input addr. - * @param ticket input ticket. - * @param conf input Configuration. - * @param factory input factory. - * @param rpcTimeout input rpcTimeout. - * @param connectionRetryPolicy input connectionRetryPolicy. - * @throws IOException raised on errors performing I/O. - * @return ProtocolProxy. - */ - ProtocolProxy getProxy(Class protocol, - long clientVersion, InetSocketAddress addr, - UserGroupInformation ticket, Configuration conf, - SocketFactory factory, int rpcTimeout, - RetryPolicy connectionRetryPolicy) throws IOException; - - /** - * Construct a client-side proxy object with a ConnectionId. - * - * @param Generics Type T. - * @param protocol input protocol. - * @param clientVersion input clientVersion. - * @param connId input ConnectionId. - * @param conf input Configuration. - * @param factory input factory. - * @throws IOException raised on errors performing I/O. - * @return ProtocolProxy. - */ - ProtocolProxy getProxy(Class protocol, long clientVersion, - Client.ConnectionId connId, Configuration conf, SocketFactory factory) - throws IOException; - /** * Construct a client-side proxy object. * diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java index aabe01b17628..cf323bb4b854 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java @@ -36,7 +36,6 @@ import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; -import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.channels.CancelledKeyException; import java.nio.channels.Channels; @@ -53,7 +52,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -99,7 +97,6 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security_.SaslMechanismFactory; import org.apache.hadoop.security.SaslPropertiesResolver; -import org.apache.hadoop.security_.SaslRpcClient; import org.apache.hadoop.security_.SaslRpcServer; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.SecurityUtil; @@ -318,14 +315,6 @@ static Class getProtocolClass(String protocolName, Configuration conf) return protocol; } - /** @return Returns the server instance called under or null. May be called under - * {@link #call(Writable, long)} implementations, and under {@link Writable} - * methods of paramters and return values. Permits applications to access - * the server context.*/ - public static Server get() { - return SERVER.get(); - } - /** This is set to Call object before Handler invokes an RPC and reset * after the call returns. */ @@ -348,15 +337,6 @@ public static int getCallId() { return call != null ? call.callId : RpcConstants.INVALID_CALL_ID; } - /** - * @return The current active RPC call's retry count. -1 indicates the retry - * cache is not supported in the client side. - */ - public static int getCallRetryCount() { - Call call = CurCall.get(); - return call != null ? call.retryCount : RpcConstants.INVALID_RETRY_COUNT; - } - /** * @return Returns the remote side ip address when invoked inside an RPC * Returns null incase of an error. @@ -366,32 +346,6 @@ public static InetAddress getRemoteIp() { return (call != null ) ? call.getHostInetAddress() : null; } - /** - * Returns the SASL qop for the current call, if the current call is - * set, and the SASL negotiation is done. Otherwise return null - * Note this only returns established QOP for auxiliary port, and - * returns null for primary (non-auxiliary) port. - * - * Also note that CurCall is thread local object. So in fact, different - * handler threads will process different CurCall object. - * - * Also, only return for RPC calls, not supported for other protocols. - * @return the QOP of the current connection. - */ - public static String getAuxiliaryPortEstablishedQOP() { - Call call = CurCall.get(); - if (!(call instanceof RpcCall)) { - return null; - } - RpcCall rpcCall = (RpcCall)call; - if (rpcCall.connection.isOnAuxiliaryPort()) { - return rpcCall.connection.getEstablishedQOP(); - } else { - // Not sending back QOP for primary port - return null; - } - } - /** * @return Returns the clientId from the current RPC request. */ @@ -417,26 +371,6 @@ public static UserGroupInformation getRemoteUser() { return (call != null) ? call.getRemoteUser() : null; } - public static String getProtocol() { - Call call = CurCall.get(); - return (call != null) ? call.getProtocol() : null; - } - - /** @return Return true if the invocation was through an RPC. - */ - public static boolean isRpcInvocation() { - return CurCall.get() != null; - } - - /** - * @return Return the priority level assigned by call queue to an RPC - * Returns 0 in case no priority is assigned. - */ - public static int getPriorityLevel() { - Call call = CurCall.get(); - return call != null? call.getPriorityLevel() : 0; - } - private String bindAddress; private int port; // port we listen on private int handlerCount; // number of handler threads @@ -473,10 +407,6 @@ protected ResponseBuffer initialValue() { // maintains the set of client connections and handles idle timeouts private ConnectionManager connectionManager; private Listener listener = null; - // Auxiliary listeners maintained as in a map, to allow - // arbitrary number of of auxiliary listeners. A map from - // the port to the listener binding to it. - private Map auxiliaryListenerMap; private Responder responder = null; private Handler[] handlers = null; @@ -508,10 +438,6 @@ private void setPurgeIntervalNanos(int purgeInterval) { tmpPurgeInterval, TimeUnit.MINUTES); } - public long getPurgeIntervalNanos() { - return this.purgeIntervalNanos; - } - /** * Logs a Slow RPC Request. * @@ -593,21 +519,6 @@ void updateDeferredMetrics(String name, long processingTime) { rpcDetailedMetrics.addDeferredProcessingTime(name, processingTime); } - /** - * A convenience method to bind to a given address and report - * better exceptions if the address is not a valid host. - * @param socket the socket to bind - * @param address the address to bind to - * @param backlog the number of connections allowed in the queue - * @throws BindException if the address can't be bound - * @throws UnknownHostException if the address isn't a valid host name - * @throws IOException other random errors from bind - */ - public static void bind(ServerSocket socket, InetSocketAddress address, - int backlog) throws IOException { - bind(socket, address, backlog, null, null); - } - public static void bind(ServerSocket socket, InetSocketAddress address, int backlog, Configuration conf, String rangeConf) throws IOException { try { @@ -640,38 +551,10 @@ public static void bind(ServerSocket socket, InetSocketAddress address, } } - int getPriorityLevel(Schedulable e) { - return callQueue.getPriorityLevel(e); - } - - int getPriorityLevel(UserGroupInformation ugi) { - return callQueue.getPriorityLevel(ugi); - } - void setPriorityLevel(UserGroupInformation ugi, int priority) { callQueue.setPriorityLevel(ugi, priority); } - /** - * Returns a handle to the rpcMetrics (required in tests) - * @return rpc metrics - */ - public RpcMetrics getRpcMetrics() { - return rpcMetrics; - } - - public RpcDetailedMetrics getRpcDetailedMetrics() { - return rpcDetailedMetrics; - } - - Iterable getHandlers() { - return Arrays.asList(handlers); - } - - Connection[] getConnections() { - return connectionManager.toArray(); - } - /** * Refresh the service authorization ACL for the service handled by this server. * @@ -682,25 +565,6 @@ public void refreshServiceAcl(Configuration conf, PolicyProvider provider) { serviceAuthorizationManager.refresh(conf, provider); } - /** - * Refresh the service authorization ACL for the service handled by this server - * using the specified Configuration. - * - * @param conf input Configuration. - * @param provider input provider. - */ - public void refreshServiceAclWithLoadedConfiguration(Configuration conf, - PolicyProvider provider) { - serviceAuthorizationManager.refreshWithLoadedConfiguration(conf, provider); - } - /** - * Returns a handle to the serviceAuthorizationManager (required in tests) - * @return instance of ServiceAuthorizationManager for this server - */ - public ServiceAuthorizationManager getServiceAuthorizationManager() { - return serviceAuthorizationManager; - } - private String getQueueClassPrefix() { return CommonConfigurationKeys.IPC_NAMESPACE + "." + port; } @@ -844,22 +708,6 @@ public String getHostAddress() { return (addr != null) ? addr.getHostAddress() : null; } - public String getProtocol() { - return null; - } - - /** - * Allow a IPC response to be postponed instead of sent immediately - * after the handler returns from the proxy method. The intended use - * case is freeing up the handler thread when the response is known, - * but an expensive pre-condition must be satisfied before it's sent - * to the client. - */ - public final void postponeResponse() { - int count = responseWaitCount.incrementAndGet(); - assert count > 0 : "response has already been sent"; - } - public final void sendResponse() throws IOException { int count = responseWaitCount.decrementAndGet(); assert count >= 0 : "response has already been sent"; @@ -978,11 +826,6 @@ void setResponseFields(Writable returnValue, this.responseParams = responseParams; } - @Override - public String getProtocol() { - return "rpc"; - } - @Override public UserGroupInformation getRemoteUser() { return connection.user; @@ -1182,7 +1025,6 @@ private class Listener extends Thread { private int backlogLength = conf.getInt( CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY, CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT); - private boolean isOnAuxiliaryPort; Listener(int port) throws IOException { address = new InetSocketAddress(bindAddress, port); @@ -1209,13 +1051,8 @@ private class Listener extends Thread { acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); - this.isOnAuxiliaryPort = false; } - void setIsAuxiliary() { - this.isOnAuxiliaryPort = true; - } - private class Reader extends Thread { final private BlockingQueue pendingConnections; private final Selector readSelector; @@ -1383,7 +1220,7 @@ void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOf Reader reader = getReader(); Connection c = connectionManager.register(channel, - this.listenPort, this.isOnAuxiliaryPort); + this.listenPort); // If the connectionManager can't take it, close the connection. if (c == null) { if (channel.isOpen()) { @@ -1804,17 +1641,14 @@ public class Connection { IpcConnectionContextProto connectionContext; String protocolName; SaslServer saslServer; - private String establishedQOP; private AuthMethod authMethod; private AuthProtocol authProtocol; private boolean saslContextEstablished; private ByteBuffer connectionHeaderBuf = null; private ByteBuffer unwrappedData; private ByteBuffer unwrappedDataLengthBuffer; - private int serviceClass; private boolean shouldClose = false; private int ingressPort; - private boolean isOnAuxiliaryPort; UserGroupInformation user = null; public UserGroupInformation attemptingUser = null; // user name before auth @@ -1827,7 +1661,7 @@ public class Connection { private boolean useWrap = false; public Connection(SocketChannel channel, long lastContact, - int ingressPort, boolean isOnAuxiliaryPort) { + int ingressPort) { this.channel = channel; this.lastContact = lastContact; this.data = null; @@ -1840,7 +1674,6 @@ public Connection(SocketChannel channel, long lastContact, this.socket = channel.socket(); this.addr = socket.getInetAddress(); this.ingressPort = ingressPort; - this.isOnAuxiliaryPort = isOnAuxiliaryPort; if (addr == null) { this.hostAddress = "*Unknown*"; } else { @@ -1876,22 +1709,10 @@ public String getHostAddress() { return hostAddress; } - public int getIngressPort() { - return ingressPort; - } - public InetAddress getHostInetAddress() { return addr; } - public String getEstablishedQOP() { - return establishedQOP; - } - - public boolean isOnAuxiliaryPort() { - return isOnAuxiliaryPort; - } - public void setLastContact(long lastContact) { this.lastContact = lastContact; } @@ -1961,7 +1782,7 @@ private void saslReadAndProcess(RpcWritable.Buffer buffer) throws } /** - * Some exceptions ({@link RetriableException} and {@link StandbyException}) + * Some exceptions (e.g. {@link RetriableException}) * that are wrapped as a cause of parameter e are unwrapped so that they can * be sent as the true cause to the client side. In case of * {@link InvalidToken} we go one level deeper to get the true cause. @@ -1974,8 +1795,6 @@ private Throwable getTrueCause(IOException e) { while (cause != null) { if (cause instanceof RetriableException) { return cause; - } else if (cause instanceof StandbyException) { - return cause; } else if (cause instanceof InvalidToken) { // FIXME: hadoop method signatures are restricting the SASL // callbacks to only returning InvalidToken, but some services @@ -1999,7 +1818,7 @@ private Throwable getTrueCause(IOException e) { * failure, premature or invalid connection context, or other state * errors. This exception needs to be sent to the client. This * exception will wrap {@link RetriableException}, - * {@link InvalidToken}, {@link StandbyException} or + * {@link InvalidToken}, or * {@link SaslException}. * @throws IOException if sending reply fails * @throws InterruptedException @@ -2070,7 +1889,6 @@ private void saslProcess(RpcSaslProto saslMessage) // do NOT enable wrapping until the last auth response is sent if (saslContextEstablished) { String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP); - establishedQOP = qop; // SASL wrapping is only used if the connection has a QOP, and // the value is not auth. ex. auth-int & auth-priv useWrap = (qop != null && !"auth".equalsIgnoreCase(qop)); @@ -2267,8 +2085,6 @@ public int readAndProcess() throws IOException, InterruptedException { return count; } int version = connectionHeaderBuf.get(0); - // TODO we should add handler for service class later - this.setServiceClass(connectionHeaderBuf.get(1)); dataLengthBuffer.flip(); // Check if it looks like the user is hitting an IPC port @@ -2838,22 +2654,6 @@ private void sendResponse(RpcCall call) throws IOException { responder.doRespond(call); } - /** - * Get service class for connection - * @return the serviceClass - */ - public int getServiceClass() { - return serviceClass; - } - - /** - * Set service class for connection - * @param serviceClass the serviceClass to set - */ - public void setServiceClass(int serviceClass) { - this.serviceClass = serviceClass; - } - private synchronized void close() { disposeSasl(); data = null; @@ -2869,15 +2669,6 @@ private synchronized void close() { } } - public void queueCall(Call call) throws IOException, InterruptedException { - // external non-rpc calls don't need server exception wrapper. - try { - internalQueueCall(call); - } catch (RpcServerException rse) { - throw (IOException)rse.getCause(); - } - } - private void internalQueueCall(Call call) throws IOException, InterruptedException { internalQueueCall(call, true); @@ -3014,25 +2805,7 @@ void logException(Logger logger, Throwable e, Call call) { } } - protected Server(String bindAddress, int port, - Class paramClass, int handlerCount, - Configuration conf) - throws IOException - { - this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer - .toString(port), null, null); - } - - protected Server(String bindAddress, int port, - Class rpcRequestClass, int handlerCount, - int numReaders, int queueSizePerHandler, Configuration conf, - String serverName, SecretManager secretManager) - throws IOException { - this(bindAddress, port, rpcRequestClass, handlerCount, numReaders, - queueSizePerHandler, conf, serverName, secretManager, null); - } - - /** + /** * Constructs a server listening on the named port and address. Parameters passed must * be of the named class. The handlerCount determines * the number of handler threads that will be used to process calls. @@ -3072,7 +2845,6 @@ protected Server(String bindAddress, int port, this.handlerCount = handlerCount; this.socketSendBufferSize = 0; this.serverName = serverName; - this.auxiliaryListenerMap = null; this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT); if (queueSizePerHandler != -1) { @@ -3137,8 +2909,6 @@ protected Server(String bindAddress, int port, SaslRpcServer.init(conf); saslPropsResolver = SaslPropertiesResolver.getInstance(conf); } - - this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class); } private synchronized void doKerberosRelogin() throws IOException { @@ -3161,24 +2931,6 @@ private synchronized void doKerberosRelogin() throws IOException { } } - public synchronized void addAuxiliaryListener(int auxiliaryPort) - throws IOException { - if (auxiliaryListenerMap == null) { - auxiliaryListenerMap = new HashMap<>(); - } - if (auxiliaryListenerMap.containsKey(auxiliaryPort) && auxiliaryPort != 0) { - throw new IOException( - "There is already a listener binding to: " + auxiliaryPort); - } - Listener newListener = new Listener(auxiliaryPort); - newListener.setIsAuxiliary(); - - // in the case of port = 0, the listener would be on a != 0 port. - LOG.info("Adding a server listener on port " + - newListener.getAddress().getPort()); - auxiliaryListenerMap.put(newListener.getAddress().getPort(), newListener); - } - private RpcSaslProto buildNegotiateResponse(List authMethods) throws IOException { RpcSaslProto.Builder negotiateBuilder = RpcSaslProto.newBuilder(); @@ -3403,22 +3155,11 @@ private void wrapWithSasl(RpcCall call) throws IOException { Configuration getConf() { return conf; } - - /** - * Sets the socket buffer size used for responding to RPCs. - * @param size input size. - */ - public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; } /** Starts the service. Must be called before any calls will be handled. */ public synchronized void start() { responder.start(); listener.start(); - if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) { - for (Listener newListener : auxiliaryListenerMap.values()) { - newListener.start(); - } - } handlers = new Handler[handlerCount]; @@ -3441,12 +3182,6 @@ public synchronized void stop() { } listener.interrupt(); listener.doStop(); - if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) { - for (Listener newListener : auxiliaryListenerMap.values()) { - newListener.interrupt(); - newListener.doStop(); - } - } responder.interrupt(); notifyAll(); this.rpcMetrics.shutdown(); @@ -3474,23 +3209,6 @@ public synchronized InetSocketAddress getListenerAddress() { } /** - * Return the set of all the configured auxiliary socket addresses NameNode - * RPC is listening on. If there are none, or it is not configured at all, an - * empty set is returned. - * @return the set of all the auxiliary addresses on which the - * RPC server is listening on. - */ - public synchronized Set getAuxiliaryListenerAddresses() { - Set allAddrs = new HashSet<>(); - if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) { - for (Listener auxListener : auxiliaryListenerMap.values()) { - allAddrs.add(auxListener.getAddress()); - } - } - return allAddrs; - } - - /** * Called for each call. * @deprecated Use {@link #call(RPC.RpcKind, String, * Writable, long)} instead @@ -3590,30 +3308,6 @@ public int getCallQueueLen() { return callQueue.size(); } - public boolean isClientBackoffEnabled() { - return callQueue.isClientBackoffEnabled(); - } - - public void setClientBackoffEnabled(boolean value) { - callQueue.setClientBackoffEnabled(value); - } - - /** - * The maximum size of the rpc call queue of this server. - * @return The maximum size of the rpc call queue. - */ - public int getMaxQueueSize() { - return maxQueueSize; - } - - /** - * The number of reader threads for this server. - * @return The number of reader threads. - */ - public int getNumReaders() { - return readThreads; - } - /** * When the read or write buffer size is larger than this limit, i/o will be * done in chunks of this size. Most RPC requests and responses would be @@ -3805,13 +3499,12 @@ Connection[] toArray() { return connections.toArray(new Connection[0]); } - Connection register(SocketChannel channel, int ingressPort, - boolean isOnAuxiliaryPort) { + Connection register(SocketChannel channel, int ingressPort) { if (isFull()) { return null; } Connection connection = new Connection(channel, Time.now(), - ingressPort, isOnAuxiliaryPort); + ingressPort); add(connection); if (LOG.isDebugEnabled()) { LOG.debug("Server connection from " + connection + diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/StandbyException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/StandbyException.java deleted file mode 100644 index 351e90ca18ab..000000000000 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/StandbyException.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ipc_; - -import java.io.IOException; - - -/** - * Thrown by a remote server when it is up, but is not the active server in a - * set of servers in which only a subset may be active. - */ -public class StandbyException extends IOException { - static final long serialVersionUID = 0x12308AD010L; - public StandbyException(String msg) { - super(msg); - } -} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/UnexpectedServerException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/UnexpectedServerException.java deleted file mode 100644 index 5ac3a9809108..000000000000 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/UnexpectedServerException.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ipc_; - -/** - * Indicates that the RPC server encountered an undeclared exception from the - * service - */ -public class UnexpectedServerException extends RpcException { - private static final long serialVersionUID = 1L; - - /** - * Constructs exception with the specified detail message. - * - * @param messages detailed message. - */ - UnexpectedServerException(final String message) { - super(message); - } - - /** - * Constructs exception with the specified detail message and cause. - * - * @param message message. - * @param cause that cause this exception - * @param cause the cause (can be retried by the {@link #getCause()} method). - * (A null value is permitted, and indicates that the cause - * is nonexistent or unknown.) - */ - UnexpectedServerException(final String message, final Throwable cause) { - super(message, cause); - } -} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RetryCacheMetrics.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RetryCacheMetrics.java deleted file mode 100644 index 321a41cbe2c9..000000000000 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/metrics/RetryCacheMetrics.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ipc_.metrics; - -import org.apache.hadoop.ipc_.RetryCache; -import org.apache.hadoop.metrics2.annotation.Metric; -import org.apache.hadoop.metrics2.annotation.Metrics; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.lib.MetricsRegistry; -import org.apache.hadoop.metrics2.lib.MutableCounterLong; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class is for maintaining the various RetryCache-related statistics - * and publishing them through the metrics interfaces. - */ -@Metrics(about="Aggregate RetryCache metrics", context="rpc") -public class RetryCacheMetrics { - - static final Logger LOG = LoggerFactory.getLogger(RetryCacheMetrics.class); - final MetricsRegistry registry; - final String name; - - RetryCacheMetrics(RetryCache retryCache) { - name = "RetryCache."+ retryCache.getCacheName(); - registry = new MetricsRegistry(name); - if (LOG.isDebugEnabled()) { - LOG.debug("Initialized "+ registry); - } - } - - public String getName() { return name; } - - public static RetryCacheMetrics create(RetryCache cache) { - RetryCacheMetrics m = new RetryCacheMetrics(cache); - return DefaultMetricsSystem.instance().register(m.name, null, m); - } - - @Metric("Number of RetryCache hit") MutableCounterLong cacheHit; - @Metric("Number of RetryCache cleared") MutableCounterLong cacheCleared; - @Metric("Number of RetryCache updated") MutableCounterLong cacheUpdated; - - /** - * One cache hit event - */ - public void incrCacheHit() { - cacheHit.incr(); - } - - /** - * One cache cleared - */ - public void incrCacheCleared() { - cacheCleared.incr(); - } - - /** - * One cache updated - */ - public void incrCacheUpdated() { - cacheUpdated.incr(); - } - - public long getCacheHit() { - return cacheHit.value(); - } - - public long getCacheCleared() { - return cacheCleared.value(); - } - - public long getCacheUpdated() { - return cacheUpdated.value(); - } - -}