diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/DWCArgumentsParser.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/DWCArgumentsParser.scala index 00f91d101be..2a5efbf0d09 100644 --- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/DWCArgumentsParser.scala +++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/DWCArgumentsParser.scala @@ -17,7 +17,7 @@ package org.apache.linkis.common.conf -import org.apache.linkis.common.utils.{ParameterUtils, Logging} +import org.apache.linkis.common.utils.{Logging, ParameterUtils} import org.apache.commons.lang3.StringUtils diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala index 09c7669fe44..3f529e0454f 100644 --- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala +++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala @@ -21,11 +21,18 @@ import org.apache.linkis.common.conf.Configuration import org.apache.linkis.common.exception.LinkisCommonErrorException import org.apache.linkis.common.variable import org.apache.linkis.common.variable._ -import org.apache.linkis.common.variable.DateTypeUtils.{getCurHour, getMonthDay, getToday, getYesterday} -import org.apache.commons.lang3.{StringUtils, Strings} +import org.apache.linkis.common.variable.DateTypeUtils.{ + getCurHour, + getMonthDay, + getToday, + getYesterday +} + +import org.apache.commons.lang3.{Strings, StringUtils} import java.time.ZonedDateTime import java.util + import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.control.Exception.allCatch @@ -115,7 +122,9 @@ object VariableUtils extends Logging { case _ => if (!nameAndType.contains(key) && StringUtils.isNotEmpty(value)) { // if ((allCatch opt value.toDouble).isDefined) { - if ((allCatch opt BigDecimal(value)).isDefined && !Strings.CS.startsWith(value, "0")) { + if ( + (allCatch opt BigDecimal(value)).isDefined && !Strings.CS.startsWith(value, "0") + ) { nameAndType(key) = variable.BigDecimalValue(BigDecimal(value)) } else { nameAndType(key) = variable.StringType(value) diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/VariableType.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/VariableType.scala index 75e22cb51ba..031d6e79cf4 100644 --- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/VariableType.scala +++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/VariableType.scala @@ -111,6 +111,7 @@ case class YearType(value: CustomYearType) extends VariableType { } case class BigDecimalValue(value: BigDecimal) extends VariableType { + override def getValue: String = { val result = bigDecimalOrLong(value) result match { @@ -126,7 +127,10 @@ case class BigDecimalValue(value: BigDecimal) extends VariableType { case "*" => val res = value * BigDecimal(bValue); formatResult(res) case "/" => val res = value / BigDecimal(bValue); formatResult(res) case _ => - throw new LinkisCommonErrorException(20050, s"BigDecimal class is not supported to use:$signal") + throw new LinkisCommonErrorException( + 20050, + s"BigDecimal class is not supported to use:$signal" + ) } } @@ -146,6 +150,7 @@ case class BigDecimalValue(value: BigDecimal) extends VariableType { bd } } + } case class LongType(value: Long) extends VariableType { diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala index 17345c050af..3785a460454 100644 --- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala +++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala @@ -50,8 +50,8 @@ object StorageConfiguration { val STORAGE_BUILD_FS_CLASSES = CommonVars( "wds.linkis.storage.build.fs.classes", "org.apache.linkis.storage.factory.impl.BuildHDFSFileSystem,org.apache.linkis.storage.factory.impl.BuildLocalFileSystem," + - "org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem," + - "org.apache.linkis.storage.factory.impl.BuildAzureBlobFileSystem" + "org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem," + + "org.apache.linkis.storage.factory.impl.BuildAzureBlobFileSystem" ) val IS_SHARE_NODE = CommonVars("wds.linkis.storage.is.share.node", true) diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala index a38b0edc4c5..f2948496cae 100644 --- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala +++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala @@ -204,7 +204,9 @@ object StorageUtils extends Logging { * @return */ def getFsPath(path: String): FsPath = { - if (path.startsWith(FILE_SCHEMA) || path.startsWith(HDFS_SCHEMA) || path.startsWith(BLOB_SCHEMA)) new FsPath(path) + if ( + path.startsWith(FILE_SCHEMA) || path.startsWith(HDFS_SCHEMA) || path.startsWith(BLOB_SCHEMA) + ) new FsPath(path) else { new FsPath(FILE_SCHEMA + path) } diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/EngineConnArguments.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/EngineConnArguments.scala index adb34b98aa5..77b0d0f44aa 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/EngineConnArguments.scala +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/EngineConnArguments.scala @@ -17,7 +17,7 @@ package org.apache.linkis.governance.common.utils -import org.apache.linkis.common.utils.{ParameterUtils, Logging} +import org.apache.linkis.common.utils.{Logging, ParameterUtils} import org.apache.commons.lang3.StringUtils diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala index 7689ae94af0..5a85334f19a 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala @@ -19,8 +19,6 @@ package org.apache.linkis.manager.engineplugin.common.conf import org.apache.linkis.common.conf.{ByteType, CommonVars, Configuration} -import org.apache.commons.lang3.{JavaVersion, SystemUtils} - object EnvConfiguration { val HIVE_CONF_DIR = CommonVars[String]( diff --git a/linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java b/linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java index 49e65e964a4..94f2a9b4d49 100644 --- a/linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java +++ b/linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java @@ -18,6 +18,7 @@ package org.apache.linkis.entrance.interceptor.impl; import org.apache.linkis.governance.common.entity.job.JobRequest; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -55,24 +56,21 @@ void isSelectOverLimit() { } /** - * 未修复前代码进行拼接sql时,输出的sql为 - * select - * id, - * name, - * array_join(array_intersect(map_keys(info),array['abs','oda'],' limit 5000; - * ') as infos - * from ods.dim_ep22 + * 未修复前代码进行拼接sql时,输出的sql为 select id, name, + * array_join(array_intersect(map_keys(info),array['abs','oda'],' limit 5000; ') as infos from + * ods.dim_ep22 */ @Test void splicingLimitSql() { - String code = "select\n" + - "id,\n" + - "name,\n" + - "array_join(array_intersect(map_keys(info),array['abs','oda'],';') as infos\n" + - "from ods.dim_ep22"; + String code = + "select\n" + + "id,\n" + + "name,\n" + + "array_join(array_intersect(map_keys(info),array['abs','oda'],';') as infos\n" + + "from ods.dim_ep22"; StringBuilder logAppender = new StringBuilder(); JobRequest jobRequest = new JobRequest(); SQLExplain.dealSQLLimit(code, jobRequest, logAppender); - Assertions.assertEquals(code+" limit 5000", jobRequest.getExecutionCode()); + Assertions.assertEquals(code + " limit 5000", jobRequest.getExecutionCode()); } } diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala index 47496cf17ac..9fe37eb3d98 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala @@ -135,7 +135,7 @@ class HiveEngineConcurrentConnExecutor( code: String ): ExecuteResponse = { LOG.info(s"HiveEngineConcurrentConnExecutor Ready to executeLine: $code") - val taskId: String = engineExecutorContext.getJobId.get + val taskId: String = engineExecutorContext.getJobId.getOrElse("udf_init") CSHiveHelper.setContextIDInfoToHiveConf(engineExecutorContext, hiveConf) val realCode = code.trim() @@ -166,12 +166,7 @@ class HiveEngineConcurrentConnExecutor( val driver = new HiveDriverProxy(any) driverCache.put(taskId, driver) - executeHQL( - engineExecutorContext.getJobId.get, - engineExecutorContext, - realCode, - driver - ) + executeHQL(taskId, engineExecutorContext, realCode, driver) case _ => val resp = proc.run(realCode.substring(tokens(0).length).trim) val result = new String(baos.toByteArray) diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala index 9019db96214..aaa96274d3e 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala @@ -24,7 +24,10 @@ import org.apache.linkis.engineconn.common.engineconn.EngineConn import org.apache.linkis.engineconn.common.hook.EngineConnHook import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext import org.apache.linkis.engineconn.core.executor.ExecutorManager -import org.apache.linkis.engineplugin.hive.executor.HiveEngineConnExecutor +import org.apache.linkis.engineplugin.hive.executor.{ + HiveEngineConcurrentConnExecutor, + HiveEngineConnExecutor +} import org.apache.linkis.manager.engineplugin.common.launch.process.Environment import org.apache.linkis.manager.label.entity.Label import org.apache.linkis.manager.label.entity.engine.{CodeLanguageLabel, RunType} @@ -78,7 +81,12 @@ class HiveAddJarsEngineHook extends EngineConnHook with Logging { ExecutorManager.getInstance.getExecutorByLabels(labels) match { case executor: HiveEngineConnExecutor => executor.executeLine(new EngineExecutionContext(executor), sql) + logger.info("use hive none concurrent mode.") + case executor: HiveEngineConcurrentConnExecutor => + executor.executeLine(new EngineExecutionContext(executor), sql) + logger.info("use hive concurrent mode.") case _ => + logger.warn(s"Executor is not a ComputationExecutor, skip adding jar: $jar") } } catch { case t: Throwable => logger.error(s"run hive sql ${addSql + jar} failed", t)