diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToAnSyncRequestType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToAnSyncRequestType.java new file mode 100644 index 0000000000000..9636d2ecf5ff4 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToAnSyncRequestType.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.client.sync; + +public enum CnToAnSyncRequestType { + // Node Maintenance + STOP_AI_NODE, +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncAINodeClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncAINodeClientPool.java new file mode 100644 index 0000000000000..5ef9df1c99735 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncAINodeClientPool.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.client.sync; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.ClientPoolFactory; +import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.sync.SyncAINodeClient; +import org.apache.iotdb.commons.exception.UncheckedStartupException; +import org.apache.iotdb.rpc.TSStatusCode; + +import com.google.common.collect.ImmutableMap; +import org.apache.ratis.util.function.CheckedBiFunction; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class SyncAINodeClientPool { + + private static final Logger LOGGER = LoggerFactory.getLogger(SyncAINodeClientPool.class); + + private static final int DEFAULT_RETRY_NUM = 10; + + private final IClientManager clientManager; + + protected ImmutableMap< + CnToAnSyncRequestType, CheckedBiFunction> + actionMap; + + private SyncAINodeClientPool() { + clientManager = + new IClientManager.Factory() + .createClientManager(new ClientPoolFactory.SyncAINodeClientPoolFactory()); + buildActionMap(); + checkActionMapCompleteness(); + } + + private void buildActionMap() { + ImmutableMap.Builder< + CnToAnSyncRequestType, CheckedBiFunction> + actionMapBuilder = ImmutableMap.builder(); + actionMapBuilder.put(CnToAnSyncRequestType.STOP_AI_NODE, (req, client) -> client.stopAINode()); + actionMap = actionMapBuilder.build(); + } + + private void checkActionMapCompleteness() { + List lackList = + Arrays.stream(CnToAnSyncRequestType.values()) + .filter(type -> !actionMap.containsKey(type)) + .collect(Collectors.toList()); + if (!lackList.isEmpty()) { + throw new UncheckedStartupException( + String.format("These request types should be added to actionMap: %s", lackList)); + } + } + + public Object sendSyncRequestToAINodeWithRetry( + TEndPoint endPoint, Object req, CnToAnSyncRequestType requestType) { + Throwable lastException = new TException(); + for (int retry = 0; retry < DEFAULT_RETRY_NUM; retry++) { + try (SyncAINodeClient client = clientManager.borrowClient(endPoint)) { + return executeSyncRequest(requestType, client, req); + } catch (Exception e) { + lastException = e; + if (retry != DEFAULT_RETRY_NUM - 1) { + LOGGER.warn("{} failed on AINode {}, retrying {}...", requestType, endPoint, retry + 1); + doRetryWait(retry); + } + } + } + LOGGER.error("{} failed on AINode {}", requestType, endPoint, lastException); + return new TSStatus(TSStatusCode.INTERNAL_REQUEST_RETRY_ERROR.getStatusCode()) + .setMessage("All retry failed due to: " + lastException.getMessage()); + } + + public Object sendSyncRequestToAINodeWithGivenRetry( + TEndPoint endPoint, Object req, CnToAnSyncRequestType requestType, int retryNum) { + Throwable lastException = new TException(); + for (int retry = 0; retry < retryNum; retry++) { + try (SyncAINodeClient client = clientManager.borrowClient(endPoint)) { + return executeSyncRequest(requestType, client, req); + } catch (Exception e) { + lastException = e; + if (retry != retryNum - 1) { + LOGGER.warn("{} failed on AINode {}, retrying {}...", requestType, endPoint, retry + 1); + doRetryWait(retry); + } + } + } + LOGGER.error("{} failed on AINode {}", requestType, endPoint, lastException); + return new TSStatus(TSStatusCode.INTERNAL_REQUEST_RETRY_ERROR.getStatusCode()) + .setMessage("All retry failed due to: " + lastException.getMessage()); + } + + private Object executeSyncRequest( + CnToAnSyncRequestType requestType, SyncAINodeClient client, Object req) throws Exception { + return Objects.requireNonNull(actionMap.get(requestType)).apply(req, client); + } + + private void doRetryWait(int retryNum) { + try { + if (retryNum < 3) { + TimeUnit.MILLISECONDS.sleep(800L); + } else if (retryNum < 5) { + TimeUnit.MILLISECONDS.sleep(100L * (long) Math.pow(2, retryNum)); + } else { + TimeUnit.MILLISECONDS.sleep(3200L); + } + } catch (InterruptedException e) { + LOGGER.warn("Retry wait failed.", e); + Thread.currentThread().interrupt(); + } + } + + private static class ClientPoolHolder { + + private static final SyncAINodeClientPool INSTANCE = new SyncAINodeClientPool(); + + private ClientPoolHolder() { + // Empty constructor + } + } + + public static SyncAINodeClientPool getInstance() { + return SyncAINodeClientPool.ClientPoolHolder.INSTANCE; + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java index 2a1c6881b1413..facc2ce4ab728 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java @@ -22,13 +22,13 @@ import org.apache.iotdb.common.rpc.thrift.TAINodeLocation; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; +import org.apache.iotdb.confignode.client.sync.CnToAnSyncRequestType; +import org.apache.iotdb.confignode.client.sync.SyncAINodeClientPool; import org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.state.RemoveAINodeState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; -import org.apache.iotdb.db.protocol.client.an.AINodeClient; -import org.apache.iotdb.db.protocol.client.an.AINodeClientManager; import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; @@ -65,16 +65,13 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RemoveAINodeState st try { switch (state) { case NODE_STOP: - TSStatus resp = null; - try (AINodeClient client = - AINodeClientManager.getInstance() - .borrowClient(AINodeClientManager.AINODE_ID_PLACEHOLDER)) { - resp = client.stopAINode(); - } catch (Exception e) { - LOGGER.warn( - "Failed to stop AINode {}, but the remove process will continue.", - removedAINode.getInternalEndPoint()); - } + TSStatus resp = + (TSStatus) + SyncAINodeClientPool.getInstance() + .sendSyncRequestToAINodeWithRetry( + removedAINode.getInternalEndPoint(), + null, + CnToAnSyncRequestType.STOP_AI_NODE); if (resp != null && resp.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.info("Successfully stopped AINode {}", removedAINode.getInternalEndPoint()); } else { @@ -92,7 +89,6 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RemoveAINodeState st env.getConfigManager() .getConsensusManager() .write(new RemoveAINodePlan(removedAINode)); - if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { throw new ProcedureException( String.format( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java index d09b9245e8402..7ed34359b21c6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java @@ -31,6 +31,7 @@ import org.apache.iotdb.commons.client.property.PipeConsensusClientProperty; import org.apache.iotdb.commons.client.property.ThriftClientProperty; import org.apache.iotdb.commons.client.property.ThriftClientProperty.DefaultProperty; +import org.apache.iotdb.commons.client.sync.SyncAINodeClient; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient; @@ -393,6 +394,27 @@ public GenericKeyedObjectPool create } } + public static class SyncAINodeClientPoolFactory + implements IClientPoolFactory { + + @Override + public GenericKeyedObjectPool createClientPool( + ClientManager manager) { + GenericKeyedObjectPool clientPool = + new GenericKeyedObjectPool<>( + new SyncAINodeClient.Factory( + manager, + new ThriftClientProperty.Builder() + .setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS()) + .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) + .build()), + new ClientPoolProperty.Builder().build().getConfig()); + ClientManagerMetrics.getInstance() + .registerClientManager(this.getClass().getSimpleName(), clientPool); + return clientPool; + } + } + public static class AsyncAINodeHeartbeatServiceClientPoolFactory implements IClientPoolFactory { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncAINodeClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncAINodeClient.java new file mode 100644 index 0000000000000..054b644609958 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncAINodeClient.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.client.sync; + +import org.apache.iotdb.ainode.rpc.thrift.IAINodeRPCService; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.client.ClientManager; +import org.apache.iotdb.commons.client.ThriftClient; +import org.apache.iotdb.commons.client.factory.ThriftClientFactory; +import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory; +import org.apache.iotdb.rpc.TConfigurationConst; +import org.apache.iotdb.rpc.TimeoutChangeableTransport; + +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransportException; + +import java.net.SocketException; + +public class SyncAINodeClient extends IAINodeRPCService.Client + implements ThriftClient, AutoCloseable { + + private final boolean printLogWhenEncounterException; + private final TEndPoint endpoint; + private final ClientManager clientManager; + private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); + + public SyncAINodeClient( + ThriftClientProperty property, + TEndPoint endpoint, + ClientManager clientManager) + throws TTransportException { + super( + property + .getProtocolFactory() + .getProtocol( + COMMON_CONFIG.isEnableInternalSSL() + ? DeepCopyRpcTransportFactory.INSTANCE.getTransport( + endpoint.getIp(), + endpoint.getPort(), + property.getConnectionTimeoutMs(), + COMMON_CONFIG.getTrustStorePath(), + COMMON_CONFIG.getTrustStorePwd(), + COMMON_CONFIG.getKeyStorePath(), + COMMON_CONFIG.getKeyStorePwd()) + : DeepCopyRpcTransportFactory.INSTANCE.getTransport( + new TSocket( + TConfigurationConst.defaultTConfiguration, + endpoint.getIp(), + endpoint.getPort(), + property.getConnectionTimeoutMs())))); + this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException(); + this.endpoint = endpoint; + this.clientManager = clientManager; + if (!getInputProtocol().getTransport().isOpen()) { + getInputProtocol().getTransport().open(); + } + } + + public int getTimeout() throws SocketException { + return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut(); + } + + public void setTimeout(int timeout) { + // the same transport is used in both input and output + ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout); + } + + public TEndPoint getEndpoint() { + return endpoint; + } + + public ClientManager getClientManager() { + return clientManager; + } + + @Override + public void close() throws Exception { + clientManager.returnClient(endpoint, this); + } + + @Override + public void invalidate() { + getInputProtocol().getTransport().close(); + } + + @Override + public void invalidateAll() { + clientManager.clear(endpoint); + } + + @Override + public boolean printLogWhenEncounterException() { + return printLogWhenEncounterException; + } + + @Override + public String toString() { + return String.format("SyncAINodeClient{%s}", endpoint); + } + + public static class Factory extends ThriftClientFactory { + + public Factory( + ClientManager clientManager, + ThriftClientProperty thriftClientProperty) { + super(clientManager, thriftClientProperty); + } + + @Override + public void destroyObject(TEndPoint endpoint, PooledObject pooledObject) { + pooledObject.getObject().invalidate(); + } + + @Override + public PooledObject makeObject(TEndPoint endpoint) throws Exception { + return new DefaultPooledObject<>( + SyncThriftClientWithErrorHandler.newErrorHandler( + SyncAINodeClient.class, + SyncAINodeClient.class.getConstructor( + thriftClientProperty.getClass(), endpoint.getClass(), clientManager.getClass()), + thriftClientProperty, + endpoint, + clientManager)); + } + + @Override + public boolean validateObject(TEndPoint endpoint, PooledObject pooledObject) { + return pooledObject.getObject().getInputProtocol().getTransport().isOpen(); + } + } +}