diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java index c7f2699da222b..2fe8ba323eeba 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java @@ -68,6 +68,7 @@ public class UDFInfo implements SnapshotProcessor { private final UDFTable udfTable; private final Map existedJarToMD5; + private final Map existedJarToReferenceCount; private final UDFExecutableManager udfExecutableManager; @@ -78,6 +79,7 @@ public class UDFInfo implements SnapshotProcessor { public UDFInfo() throws IOException { udfTable = new UDFTable(); existedJarToMD5 = new HashMap<>(); + existedJarToReferenceCount = new HashMap<>(); udfExecutableManager = UDFExecutableManager.setupAndGetInstance( CONFIG_NODE_CONF.getUdfTemporaryLibDir(), CONFIG_NODE_CONF.getUdfDir()); @@ -135,7 +137,7 @@ public TSStatus addUDFInTable(CreateFunctionPlan physicalPlan) { final UDFInformation udfInformation = physicalPlan.getUdfInformation(); udfTable.addUDFInformation(udfInformation.getFunctionName(), udfInformation); if (udfInformation.isUsingURI()) { - existedJarToMD5.put(udfInformation.getJarName(), udfInformation.getJarMD5()); + addJarReference(udfInformation.getJarName(), udfInformation.getJarMD5()); if (physicalPlan.getJarFile() != null) { udfExecutableManager.saveToInstallDir( ByteBuffer.wrap(physicalPlan.getJarFile().getValues()), udfInformation.getJarName()); @@ -185,7 +187,10 @@ public JarResp getUDFJar(GetUDFJarPlan physicalPlan) { public TSStatus dropFunction(Model model, String functionName) { if (udfTable.containsUDF(model, functionName)) { - existedJarToMD5.remove(udfTable.getUDFInformation(model, functionName).getJarName()); + final UDFInformation udfInformation = udfTable.getUDFInformation(model, functionName); + if (udfInformation.isUsingURI()) { + removeJarReference(udfInformation.getJarName()); + } udfTable.removeUDFInformation(model, functionName); } return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); @@ -248,6 +253,7 @@ public void processLoadSnapshot(File snapshotDir) throws IOException { deserializeExistedJarToMD5(fileInputStream); udfTable.deserializeUDFTable(fileInputStream); + rebuildJarMetadataFromUDFTable(); } finally { releaseUDFTableLock(); } @@ -272,6 +278,32 @@ public void deserializeExistedJarToMD5(InputStream inputStream) throws IOExcepti public void clear() { existedJarToMD5.clear(); + existedJarToReferenceCount.clear(); udfTable.clear(); } + + private void addJarReference(String jarName, String jarMD5) { + existedJarToMD5.putIfAbsent(jarName, jarMD5); + existedJarToReferenceCount.merge(jarName, 1, Integer::sum); + } + + private void removeJarReference(String jarName) { + final Integer referenceCount = existedJarToReferenceCount.get(jarName); + if (referenceCount == null || referenceCount <= 1) { + existedJarToReferenceCount.remove(jarName); + existedJarToMD5.remove(jarName); + return; + } + existedJarToReferenceCount.put(jarName, referenceCount - 1); + } + + private void rebuildJarMetadataFromUDFTable() { + existedJarToMD5.clear(); + existedJarToReferenceCount.clear(); + for (UDFInformation udfInformation : udfTable.getAllInformationList()) { + if (udfInformation.isUsingURI()) { + addJarReference(udfInformation.getJarName(), udfInformation.getJarMD5()); + } + } + } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/UDFInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/UDFInfoTest.java index 0ee857bc5a218..464c24c567e00 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/UDFInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/UDFInfoTest.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.FunctionType; import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.udf.UDFInformation; import org.apache.iotdb.commons.udf.UDFType; import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan; @@ -41,6 +42,10 @@ public class UDFInfoTest { + private static final String SHARED_JAR_NAME = "shared.jar"; + private static final String SHARED_JAR_MD5 = "12345"; + private static final String DIFFERENT_JAR_MD5 = "54321"; + private static UDFInfo udfInfo; private static UDFInfo udfInfoSaveBefore; private static final File snapshotDir = new File(BASE_OUTPUT_PATH, "snapshot"); @@ -63,31 +68,44 @@ public static void cleanup() throws IOException { } @Test - public void testSnapshot() throws TException, IOException, IllegalPathException { - UDFInformation udfInformation = - new UDFInformation( - "test1", - "test1", - UDFType.of(Model.TREE, FunctionType.NONE, true), - true, - "test1.jar", - "12345"); - CreateFunctionPlan createFunctionPlan = - new CreateFunctionPlan(udfInformation, new Binary(new byte[] {1, 2, 3})); - udfInfo.addUDFInTable(createFunctionPlan); - udfInfoSaveBefore.addUDFInTable(createFunctionPlan); - - udfInformation = - new UDFInformation( - "test2", - "test2", - UDFType.of(Model.TREE, FunctionType.NONE, true), - true, - "test2.jar", - "123456"); - createFunctionPlan = new CreateFunctionPlan(udfInformation, new Binary(new byte[] {1, 2, 3})); - udfInfo.addUDFInTable(createFunctionPlan); - udfInfoSaveBefore.addUDFInTable(createFunctionPlan); + public void testDropOneSharedJarReferenceKeepsJarMetadata() + throws TException, IOException, IllegalPathException { + clearUdfInfos(); + + udfInfo.addUDFInTable(createFunctionPlan("test1", SHARED_JAR_NAME, SHARED_JAR_MD5, true)); + udfInfo.addUDFInTable(createFunctionPlan("test2", SHARED_JAR_NAME, SHARED_JAR_MD5, false)); + + udfInfo.dropFunction(Model.TREE, "test1"); + + Assert.assertFalse(udfInfo.needToSaveJar(SHARED_JAR_NAME)); + Assert.assertEquals(1, udfInfo.getRawExistedJarToMD5().size()); + Assert.assertEquals(SHARED_JAR_MD5, udfInfo.getRawExistedJarToMD5().get(SHARED_JAR_NAME)); + + udfInfo.validate(Model.TREE, "test3", SHARED_JAR_NAME, SHARED_JAR_MD5); + try { + udfInfo.validate(Model.TREE, "test3", SHARED_JAR_NAME, DIFFERENT_JAR_MD5); + Assert.fail("Expected shared jar conflict after dropping only one referenced UDF."); + } catch (IoTDBRuntimeException e) { + Assert.assertEquals( + org.apache.iotdb.rpc.TSStatusCode.UDF_ALREADY_EXISTS.getStatusCode(), e.getErrorCode()); + } + } + + @Test + public void testSnapshotRebuildsSharedJarReferences() + throws TException, IOException, IllegalPathException { + clearUdfInfos(); + FileUtils.cleanDirectory(snapshotDir); + + CreateFunctionPlan createFunctionPlan1 = + createFunctionPlan("test1", SHARED_JAR_NAME, SHARED_JAR_MD5, true); + CreateFunctionPlan createFunctionPlan2 = + createFunctionPlan("test2", SHARED_JAR_NAME, SHARED_JAR_MD5, false); + + udfInfo.addUDFInTable(createFunctionPlan1); + udfInfo.addUDFInTable(createFunctionPlan2); + udfInfoSaveBefore.addUDFInTable(createFunctionPlan1); + udfInfoSaveBefore.addUDFInTable(createFunctionPlan2); udfInfo.processTakeSnapshot(snapshotDir); udfInfo.clear(); @@ -95,5 +113,29 @@ public void testSnapshot() throws TException, IOException, IllegalPathException Assert.assertEquals(udfInfoSaveBefore.getRawExistedJarToMD5(), udfInfo.getRawExistedJarToMD5()); Assert.assertEquals(udfInfoSaveBefore.getRawUDFTable(), udfInfo.getRawUDFTable()); + + udfInfo.dropFunction(Model.TREE, "test1"); + Assert.assertFalse(udfInfo.needToSaveJar(SHARED_JAR_NAME)); + Assert.assertEquals(SHARED_JAR_MD5, udfInfo.getRawExistedJarToMD5().get(SHARED_JAR_NAME)); + } + + private static void clearUdfInfos() { + udfInfo.clear(); + udfInfoSaveBefore.clear(); + } + + private static CreateFunctionPlan createFunctionPlan( + String functionName, String jarName, String jarMD5, boolean includeJarFile) + throws IllegalPathException { + UDFInformation udfInformation = + new UDFInformation( + functionName, + functionName, + UDFType.of(Model.TREE, FunctionType.NONE, true), + true, + jarName, + jarMD5); + return new CreateFunctionPlan( + udfInformation, includeJarFile ? new Binary(new byte[] {1, 2, 3}) : null); } }