LINKIS_DATASOURCE_AES_SWITCH =
+ CommonVars.apply("linkis.datasource.aes.switch", false);
+
+ /**
+ * 加密
+ *
+ * @param content
+ * @param password
+ * @return
+ */
+ public static String encrypt(String content, String password) {
+ try {
+ // 创建密码器
+ Cipher cipher = Cipher.getInstance(DEFAULT_CIPHER_ALGORITHM);
+
+ byte[] byteContent = content.getBytes(ENCODING_TYPE);
+ // 初始化为加密模式的密码器
+ cipher.init(Cipher.ENCRYPT_MODE, getSecretKey(password));
+ // 加密
+ byte[] result = cipher.doFinal(byteContent);
+ // 通过Base64转码返回
+ return Base64.encodeBase64String(result).trim();
+ } catch (Exception e) {
+ throw new ErrorException(21304, "AES加密加密失败");
+ }
+ }
+
+ public static String encrypt(byte[] content, String password) {
+ try {
+ // 创建密码器
+ Cipher cipher = Cipher.getInstance(DEFAULT_CIPHER_ALGORITHM);
+ // 初始化为加密模式的密码器
+ cipher.init(Cipher.ENCRYPT_MODE, getSecretKey(password));
+ // 加密
+ byte[] result = cipher.doFinal(content);
+ // 通过Base64转码返回
+ return Base64.encodeBase64String(result).trim();
+ } catch (Exception e) {
+ throw new ErrorException(21304, "AES加密加密失败");
+ }
+ }
+
+ /**
+ * AES 解密操作
+ *
+ * @param content
+ * @param password
+ * @return
+ */
+ public static String decrypt(String content, String password) {
+ try {
+ // 实例化
+ Cipher cipher = Cipher.getInstance(DEFAULT_CIPHER_ALGORITHM);
+ // 使用密钥初始化,设置为解密模式
+ cipher.init(Cipher.DECRYPT_MODE, getSecretKey(password));
+ // 执行操作
+ byte[] result = cipher.doFinal(Base64.decodeBase64(content));
+ return new String(result, ENCODING_TYPE);
+ } catch (Exception e) {
+ throw new ErrorException(21304, "AES加密解密失败");
+ }
+ }
+
+ /**
+ * AES 解密操作
+ *
+ * @param content
+ * @param password
+ * @return
+ */
+ public static byte[] decrypt(byte[] content, String password) {
+ try {
+ // 实例化
+ Cipher cipher = Cipher.getInstance(DEFAULT_CIPHER_ALGORITHM);
+ // 使用密钥初始化,设置为解密模式
+ cipher.init(Cipher.DECRYPT_MODE, getSecretKey(password));
+ // 执行操作
+ return cipher.doFinal(Base64.decodeBase64(content));
+ } catch (Exception e) {
+ throw new ErrorException(21304, "AES加密解密失败");
+ }
+ }
+
+ /**
+ * 生成加密秘钥
+ *
+ * @return
+ */
+ private static SecretKeySpec getSecretKey(String password) {
+ // 返回生成指定算法密钥生成器的 KeyGenerator 对象
+ KeyGenerator kg;
+ try {
+ kg = KeyGenerator.getInstance(KEY_ALGORITHM);
+ SecureRandom secureRandom = SecureRandom.getInstance(SECRET_RANDOM);
+ secureRandom.setSeed(password.getBytes());
+ // AES 要求密钥长度为 128
+ kg.init(128, secureRandom);
+ // 生成一个密钥
+ SecretKey secretKey = kg.generateKey();
+ // 转换为AES专用密钥
+ return new SecretKeySpec(secretKey.getEncoded(), KEY_ALGORITHM);
+ } catch (NoSuchAlgorithmException e) {
+ throw new ErrorException(21304, "AES生成加密秘钥失败");
+ }
+ }
+
+ public static String isDecryptByConf(String password) {
+ if (AESUtils.LINKIS_DATASOURCE_AES_SWITCH.getValue()) {
+ // decrypt
+ password = AESUtils.decrypt(password, AESUtils.LINKIS_DATASOURCE_AES_KEY.getValue());
+ }
+ return password;
+ }
+}
diff --git a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/ByteTimeUtils.java b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/ByteTimeUtils.java
index d23f4a0867d..e81da47e693 100644
--- a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/ByteTimeUtils.java
+++ b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/ByteTimeUtils.java
@@ -213,7 +213,6 @@ private static long parseByteString(String str, ByteUnit unit) {
} else {
throw new NumberFormatException("Failed to parse byte string: " + str);
}
- suffix = suffix.toLowerCase();
// Check for invalid suffixes
if (suffix != null && !byteSuffixes.containsKey(suffix)) {
throw new NumberFormatException("Invalid suffix: \"" + suffix + "\"");
@@ -297,6 +296,18 @@ public static long byteStringAsGb(String str) {
return parseByteString(str, ByteUnit.GiB);
}
+ /**
+ * Convert a passed byte string (e.g. -50b, -100k, or -250m) to gibibytes for internal use.
+ *
+ * If no suffix is provided, the passed number is assumed to be in gibibytes.
+ */
+ public static long negativeByteStringAsGb(String str) {
+ if (str.startsWith("-")) {
+ return Math.negateExact(parseByteString(str.substring(1), ByteUnit.GiB));
+ }
+ return parseByteString(str, ByteUnit.GiB);
+ }
+
/**
* Returns a byte array with the buffer's contents, trying to avoid copying the data if possible.
*/
@@ -354,7 +365,7 @@ public double toBytes(long d) {
if (d < 0) {
throw new IllegalArgumentException("Negative size value. Size must be positive: " + d);
}
- return d * multiplier;
+ return (double) d * multiplier;
}
public long toKiB(long d) {
diff --git a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/LinkisUtils.java b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/LinkisUtils.java
new file mode 100644
index 00000000000..353f80f1da8
--- /dev/null
+++ b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/LinkisUtils.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.common.utils;
+
+import org.apache.linkis.common.exception.ErrorException;
+import org.apache.linkis.common.exception.FatalException;
+import org.apache.linkis.common.exception.WarnException;
+
+import java.util.concurrent.Callable;
+import java.util.function.Function;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LinkisUtils {
+ private static final Logger logger = LoggerFactory.getLogger(LinkisUtils.class);
+
+ public static T tryCatch(Callable tryOp, Function catchOp) {
+ T result = null;
+ try {
+ result = tryOp.call();
+ } catch (Throwable t) {
+ if (t instanceof FatalException) {
+ logger.error("Fatal error, system exit...", t);
+ System.exit(((FatalException) t).getErrCode());
+ } else if (t instanceof VirtualMachineError) {
+ logger.error("Fatal error, system exit...", t);
+ System.exit(-1);
+ } else if (null != t.getCause()
+ && (t.getCause() instanceof FatalException
+ || t.getCause() instanceof VirtualMachineError)) {
+ logger.error("Caused by fatal error, system exit...", t);
+ System.exit(-1);
+ } else if (t instanceof Error) {
+ logger.error("Throw error", t);
+ throw (Error) t;
+ } else {
+ result = catchOp.apply(t);
+ }
+ }
+ return result;
+ }
+
+ public static void tryFinally(Runnable tryOp, Runnable finallyOp) {
+ try {
+ tryOp.run();
+ } finally {
+ finallyOp.run();
+ }
+ }
+
+ public static T tryAndWarn(Callable tryOp, Logger log) {
+ return tryCatch(
+ tryOp,
+ t -> {
+ if (t instanceof ErrorException) {
+ ErrorException error = (ErrorException) t;
+ log.error(
+ "Warning code(警告码): {}, Warning message(警告信息): {}.",
+ error.getErrCode(),
+ error.getDesc(),
+ error);
+
+ } else if (t instanceof WarnException) {
+ WarnException warn = (WarnException) t;
+ log.warn(
+ "Warning code(警告码): {}, Warning message(警告信息): {}.",
+ warn.getErrCode(),
+ warn.getDesc(),
+ warn);
+
+ } else {
+ log.warn("", t);
+ }
+ return null;
+ });
+ }
+
+ public static void tryAndErrorMsg(Runnable tryOp, String message, Logger log) {
+ try {
+ tryOp.run();
+ } catch (WarnException t) {
+ WarnException warn = (WarnException) t;
+ log.warn(
+ "Warning code(警告码): {}, Warning message(警告信息): {}.", warn.getErrCode(), warn.getDesc());
+ log.warn(message, warn);
+ } catch (Exception t) {
+ log.warn(message, t);
+ }
+ }
+
+ public static void tryAndWarn(Runnable tryOp, Logger log) {
+ try {
+ tryOp.run();
+ } catch (Throwable error) {
+ if (error instanceof WarnException) {
+ WarnException warn = (WarnException) error;
+ log.warn(
+ "Warning code(警告码): {}, Warning message(警告信息): {}.",
+ warn.getErrCode(),
+ warn.getDesc(),
+ error);
+ } else {
+ log.warn("", error);
+ }
+ }
+ }
+
+ public static void tryAndWarnMsg(Runnable tryOp, String message, Logger log) {
+ try {
+ tryOp.run();
+ } catch (WarnException t) {
+ WarnException warn = (WarnException) t;
+ log.warn(
+ "Warning code(警告码): {}, Warning message(警告信息): {}.", warn.getErrCode(), warn.getDesc());
+ log.warn(message, warn);
+ } catch (Exception t) {
+ log.warn(message, t);
+ }
+ }
+
+ public static T tryAndWarnMsg(Callable tryOp, String message, Logger log) {
+ return tryCatch(
+ tryOp,
+ t -> {
+ if (t instanceof ErrorException) {
+ ErrorException error = (ErrorException) t;
+ log.warn(
+ "Warning code(警告码): {}, Warning message(警告信息): {}.",
+ error.getErrCode(),
+ error.getDesc());
+ log.warn(message, error);
+ } else if (t instanceof WarnException) {
+ WarnException warn = (WarnException) t;
+ log.warn(
+ "Warning code(警告码): {}, Warning message(警告信息): {}.",
+ warn.getErrCode(),
+ warn.getDesc());
+ log.warn(message, warn);
+ } else {
+ log.warn(message, t);
+ }
+ return null;
+ });
+ }
+
+ public static String getJvmUser() {
+ return System.getProperty("user.name");
+ }
+}
diff --git a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/MD5Utils.java b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/MD5Utils.java
new file mode 100644
index 00000000000..1291b8bb68b
--- /dev/null
+++ b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/MD5Utils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.common.utils;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+public class MD5Utils {
+
+ /**
+ * @param plaintext
+ * @return
+ * @throws NoSuchAlgorithmException
+ */
+ public static String encrypt(String plaintext) throws NoSuchAlgorithmException {
+ // 使用 MD5 算法创建 MessageDigest 对象
+ MessageDigest md = MessageDigest.getInstance("MD5");
+ // 更新 MessageDigest 对象中的字节数据
+ md.update(plaintext.getBytes());
+ // 对更新后的数据计算哈希值,存储在 byte 数组中
+ byte[] digest = md.digest();
+ // 将 byte 数组转换为十六进制字符串
+ StringBuilder sb = new StringBuilder();
+ for (byte b : digest) {
+ sb.append(String.format("%02x", b & 0xff));
+ }
+ // 返回十六进制字符串
+ return sb.toString();
+ }
+}
diff --git a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/ResultSetUtils.java b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/ResultSetUtils.java
new file mode 100644
index 00000000000..a367b38b80b
--- /dev/null
+++ b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/ResultSetUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.common.utils;
+
+import org.apache.linkis.common.io.FsPath;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class ResultSetUtils {
+
+ // Sort in ASC order by numx in the result set _numx.dolphin file name
+ public static Comparator getResultSetFileComparatorOrderByNameNum() {
+
+ Comparator comparator =
+ (o1, o2) -> {
+ // get the num of file name
+ String regx = "\\d+";
+
+ String[] res1 = o1.getPath().split(File.separator);
+ String fileName1 = res1[res1.length - 1];
+ Matcher matcher1 = Pattern.compile(regx).matcher(fileName1);
+ int num1 = matcher1.find() ? Integer.parseInt(matcher1.group()) : Integer.MAX_VALUE;
+
+ String[] res2 = o2.getPath().split(File.separator);
+ String fileName2 = res2[res2.length - 1];
+ Matcher matcher2 = Pattern.compile(regx).matcher(fileName2);
+ int num2 = matcher2.find() ? Integer.parseInt(matcher2.group()) : Integer.MAX_VALUE;
+
+ return num1 - num2;
+ };
+ return comparator;
+ }
+
+ public static void sortByNameNum(List fsPathList) {
+ Collections.sort(fsPathList, getResultSetFileComparatorOrderByNameNum());
+ }
+}
diff --git a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/SHAUtils.java b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/SHAUtils.java
new file mode 100644
index 00000000000..b3fe61a1c93
--- /dev/null
+++ b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/SHAUtils.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.common.utils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SHAUtils {
+
+ /**
+ * 对字符串加密,默认使用SHA-256
+ *
+ * @param strSrc 要加密的字符串
+ * @param encName 加密类型
+ * @return
+ * @throws UnsupportedEncodingException
+ */
+ public static String Encrypt(String strSrc, String encName) throws UnsupportedEncodingException {
+ MessageDigest md = null;
+ String strDes = null;
+ byte[] bt = strSrc.getBytes("utf-8");
+ try {
+ if (encName == null || encName.equals("")) {
+ encName = "SHA-256";
+ }
+ md = MessageDigest.getInstance(encName);
+ md.update(bt);
+ strDes = bytes2Hex(md.digest()); // to HexString
+ } catch (NoSuchAlgorithmException e) {
+ return null;
+ }
+ return strDes;
+ }
+
+ public static String bytes2Hex(byte[] bts) {
+ String des = "";
+ String tmp = null;
+ for (int i = 0; i < bts.length; i++) {
+ tmp = (Integer.toHexString(bts[i] & 0xFF));
+ if (tmp.length() == 1) {
+ des += "0";
+ }
+ des += tmp;
+ }
+ return des;
+ }
+
+ public static void main(String[] args) throws IOException {
+ String applicationId = args[0];
+ String app_id = args[1];
+ String token = args[2];
+ String nonce = args[3];
+ if (StringUtils.isBlank(applicationId)) {
+ throw new LinkageError("Invalid applicationId cannot be empty");
+ }
+ if (StringUtils.isBlank(app_id)) {
+ throw new LinkageError("Invalid app_id cannot be empty");
+ }
+ if (StringUtils.isBlank(token)) {
+ throw new LinkageError("Invalid token cannot be empty");
+ }
+ if (StringUtils.isBlank(nonce)) {
+ throw new LinkageError("Invalid nonce cannot be empty");
+ }
+ Map parms = new HashMap<>();
+ String timestampStr = String.valueOf(System.currentTimeMillis());
+ parms.put("applicationId", applicationId);
+ parms.put("app_id", app_id);
+ parms.put("timestamp", timestampStr);
+ parms.put("nonce", nonce);
+ if (StringUtils.isNotBlank(token)) {
+ String signature =
+ Encrypt(Encrypt(parms.get("app_id") + nonce + timestampStr, null) + token, null);
+ parms.put("signature", signature);
+ }
+ System.out.println(parms);
+ }
+}
diff --git a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/SecurityUtils.java b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/SecurityUtils.java
index f7158b4899b..af163a64948 100644
--- a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/SecurityUtils.java
+++ b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/SecurityUtils.java
@@ -25,10 +25,9 @@
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@@ -46,8 +45,12 @@ public abstract class SecurityUtils {
private static final String QUESTION_MARK = "?";
+ private static final String REGEX_QUESTION_MARK = "\\?";
+
+ private static final int JDBC_URL_ITEM_COUNT = 2;
+
/** allowLoadLocalInfile,allowLoadLocalInfiled,# */
- public static final CommonVars MYSQL_SENSITIVE_PARAMS =
+ private static final CommonVars MYSQL_SENSITIVE_PARAMS =
CommonVars$.MODULE$.apply(
"linkis.mysql.sensitive.params",
"allowLoadLocalInfile,autoDeserialize,allowLocalInfile,allowUrlInLocalInfile,#");
@@ -55,16 +58,129 @@ public abstract class SecurityUtils {
/**
* "allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false"
*/
- public static final CommonVars MYSQL_FORCE_PARAMS =
+ private static final CommonVars MYSQL_FORCE_PARAMS =
CommonVars$.MODULE$.apply(
"linkis.mysql.force.params",
"allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false");
- public static final CommonVars MYSQL_STRONG_SECURITY_ENABLE =
+ private static final CommonVars MYSQL_STRONG_SECURITY_ENABLE =
CommonVars$.MODULE$.apply("linkis.mysql.strong.security.enable", "false");
+ private static final CommonVars MYSQL_SECURITY_CHECK_ENABLE =
+ CommonVars$.MODULE$.apply("linkis.mysql.security.check.enable", "true");
+
+ private static final CommonVars MYSQL_CONNECT_URL =
+ CommonVars.apply("linkis.security.mysql.url.template", "jdbc:mysql://%s:%s/%s");
+
+ private static final CommonVars JDBC_MATCH_REGEX =
+ CommonVars$.MODULE$.apply(
+ "linkis.mysql.jdbc.match.regex",
+ "(?i)jdbc:(?i)(mysql)://([^:]+)(:[0-9]+)?(/[a-zA-Z0-9_-]*[\\.\\-]?)?");
+
+ private static final String JDBC_MYSQL_PROTOCOL = "jdbc:mysql";
+
/**
- * mysql url append force params
+ * check mysql connection params
+ *
+ * @param host
+ * @param port
+ * @param username
+ * @param password
+ * @param database
+ * @param extraParams
+ */
+ public static void checkJdbcConnParams(
+ String host,
+ Integer port,
+ String username,
+ String password,
+ String database,
+ Map extraParams) {
+
+ // check switch
+ if (!Boolean.valueOf(MYSQL_SECURITY_CHECK_ENABLE.getValue())) {
+ return;
+ }
+
+ // 1. Check blank params
+ if (StringUtils.isAnyBlank(host, username)) {
+ logger.error(
+ "Invalid mysql connection params: host: {}, username: {}, database: {}",
+ host,
+ username,
+ database);
+ throw new LinkisSecurityException(35000, "Invalid mysql connection params.");
+ }
+
+ // 2. Check url format
+ String url = String.format(MYSQL_CONNECT_URL.getValue(), host.trim(), port, database.trim());
+ checkUrl(url);
+
+ // 3. Check params. Mainly vulnerability parameters. Note the url encoding
+ checkParams(extraParams);
+ }
+
+ /** @param url */
+ public static void checkJdbcConnUrl(String url) {
+
+ // check switch
+ if (!Boolean.valueOf(MYSQL_SECURITY_CHECK_ENABLE.getValue())) {
+ return;
+ }
+
+ logger.info("jdbc url: {}", url);
+ if (StringUtils.isBlank(url)) {
+ throw new LinkisSecurityException(35000, "Invalid jdbc connection url.");
+ }
+
+ // temporarily only check mysql jdbc url.
+ if (!url.toLowerCase().startsWith(JDBC_MYSQL_PROTOCOL)) {
+ return;
+ }
+
+ String[] urlItems = url.split(REGEX_QUESTION_MARK);
+ if (urlItems.length > JDBC_URL_ITEM_COUNT) {
+ throw new LinkisSecurityException(35000, "Invalid jdbc connection url.");
+ }
+
+ // check url
+ checkUrl(urlItems[0]);
+
+ // check params
+ if (urlItems.length == JDBC_URL_ITEM_COUNT) {
+ Map params = parseMysqlUrlParamsToMap(urlItems[1]);
+ checkParams(params);
+ }
+ }
+
+ /**
+ * call after checkJdbcConnUrl
+ *
+ * @param url
+ * @return
+ */
+ public static String getJdbcUrl(String url) {
+ // preventing NPE
+ if (StringUtils.isBlank(url)) {
+ return url;
+ }
+ // temporarily deal with only mysql jdbc url.
+ if (!url.toLowerCase().startsWith(JDBC_MYSQL_PROTOCOL)) {
+ return url;
+ }
+ String[] items = url.split(REGEX_QUESTION_MARK);
+ String result = items[0];
+ if (items.length == JDBC_URL_ITEM_COUNT) {
+ Map params = parseMysqlUrlParamsToMap(items[1]);
+ appendMysqlForceParams(params);
+ String paramUrl = parseParamsMapToMysqlParamUrl(params);
+ result += QUESTION_MARK + paramUrl;
+ }
+ return result;
+ }
+
+ /**
+ * append force params, Should be called after the checkJdbcConnParams method
*
* @param url
* @return
@@ -73,6 +189,9 @@ public static String appendMysqlForceParams(String url) {
if (StringUtils.isBlank(url)) {
return "";
}
+ if (!Boolean.valueOf(MYSQL_STRONG_SECURITY_ENABLE.getValue())) {
+ return url;
+ }
String extraParamString = MYSQL_FORCE_PARAMS.getValue();
@@ -86,36 +205,41 @@ public static String appendMysqlForceParams(String url) {
return url;
}
+ /**
+ * append force params, Should be called after the checkJdbcConnParams method
+ *
+ * @param extraParams
+ */
public static void appendMysqlForceParams(Map extraParams) {
- extraParams.putAll(parseMysqlUrlParamsToMap(MYSQL_FORCE_PARAMS.getValue()));
+ if (Boolean.valueOf(MYSQL_STRONG_SECURITY_ENABLE.getValue())) {
+ extraParams.putAll(parseMysqlUrlParamsToMap(MYSQL_FORCE_PARAMS.getValue()));
+ }
}
- public static String checkJdbcSecurity(String url) {
- logger.info("checkJdbcSecurity origin url: {}", url);
- if (StringUtils.isBlank(url)) {
- throw new LinkisSecurityException(35000, "Invalid mysql connection cul, url is empty");
- }
- // deal with url encode
- try {
- url = URLDecoder.decode(url, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new LinkisSecurityException(35000, "mysql connection cul decode error: " + e);
+ public static String parseParamsMapToMysqlParamUrl(Map params) {
+ if (params == null || params.isEmpty()) {
+ return "";
}
- if (url.endsWith(QUESTION_MARK) || !url.contains(QUESTION_MARK)) {
- logger.info("checkJdbcSecurity target url: {}", url);
- return url;
+ return params.entrySet().stream()
+ .map(e -> String.join(EQUAL_SIGN, e.getKey(), String.valueOf(e.getValue())))
+ .collect(Collectors.joining(AND_SYMBOL));
+ }
+
+ /**
+ * check url, format: jdbc:mysql://host:port/dbname
+ *
+ * @param url
+ */
+ public static void checkUrl(String url) {
+ if (url != null && !url.toLowerCase().startsWith(JDBC_MYSQL_PROTOCOL)) {
+ return;
}
- String[] items = url.split("\\?");
- if (items.length != 2) {
- logger.warn("Invalid url: {}", url);
- throw new LinkisSecurityException(35000, "Invalid mysql connection cul: " + url);
+ Pattern regex = Pattern.compile(JDBC_MATCH_REGEX.getValue());
+ Matcher matcher = regex.matcher(url);
+ if (!matcher.matches()) {
+ logger.info("Invalid mysql connection url: {}", url);
+ throw new LinkisSecurityException(35000, "Invalid mysql connection url.");
}
- Map params = parseMysqlUrlParamsToMap(items[1]);
- Map securityMap = checkJdbcSecurity(params);
- String paramUrl = parseParamsMapToMysqlParamUrl(securityMap);
- url = items[0] + QUESTION_MARK + paramUrl;
- logger.info("checkJdbcSecurity target url: {}", url);
- return url;
}
/**
@@ -123,15 +247,9 @@ public static String checkJdbcSecurity(String url) {
*
* @param paramsMap
*/
- public static Map checkJdbcSecurity(Map paramsMap) {
- if (paramsMap == null) {
- return new HashMap<>();
- }
-
- // mysql url strong security
- if (Boolean.valueOf(MYSQL_STRONG_SECURITY_ENABLE.getValue())) {
- paramsMap.clear();
- return paramsMap;
+ private static void checkParams(Map paramsMap) {
+ if (paramsMap == null || paramsMap.isEmpty()) {
+ return;
}
// deal with url encode
@@ -163,19 +281,12 @@ public static Map checkJdbcSecurity(Map paramsMa
"Invalid mysql connection parameters: " + parseParamsMapToMysqlParamUrl(paramsMap));
}
}
- return paramsMap;
- }
-
- public static String parseParamsMapToMysqlParamUrl(Map forceParams) {
- if (forceParams == null) {
- return "";
- }
- return forceParams.entrySet().stream()
- .map(e -> String.join(EQUAL_SIGN, e.getKey(), String.valueOf(e.getValue())))
- .collect(Collectors.joining(AND_SYMBOL));
}
private static Map parseMysqlUrlParamsToMap(String paramsUrl) {
+ if (StringUtils.isBlank(paramsUrl)) {
+ return new LinkedHashMap<>();
+ }
String[] params = paramsUrl.split(AND_SYMBOL);
Map map = new LinkedHashMap<>(params.length);
for (String param : params) {
@@ -209,4 +320,40 @@ private static boolean isNotSecurity(String key, String value, String param) {
return key.toLowerCase().contains(param.toLowerCase())
|| value.toLowerCase().contains(param.toLowerCase());
}
+
+ /**
+ * allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false
+ *
+ * @return
+ */
+ public static Properties getMysqlSecurityParams() {
+ Properties properties = new Properties();
+ properties.setProperty("allowLoadLocalInfile", "false");
+ properties.setProperty("autoDeserialize", "false");
+ properties.setProperty("allowLocalInfile", "false");
+ properties.setProperty("allowUrlInLocalInfile", "false");
+ return properties;
+ }
+
+ /**
+ * Check if the path has a relative path
+ *
+ * @param path
+ * @return
+ */
+ public static boolean containsRelativePath(String path) {
+ if (path.startsWith("./")
+ || path.contains("/./")
+ || path.startsWith("../")
+ || path.contains("/../")) {
+ return true;
+ }
+ if (path.startsWith(".\\")
+ || path.contains("\\.\\")
+ || path.startsWith("..\\")
+ || path.contains("\\..\\")) {
+ return true;
+ }
+ return false;
+ }
}
diff --git a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/VariableOperationUtils.java b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/VariableOperationUtils.java
index d71f8b40e63..d1cb59c397b 100644
--- a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/VariableOperationUtils.java
+++ b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/VariableOperationUtils.java
@@ -17,18 +17,19 @@
package org.apache.linkis.common.utils;
+import org.apache.linkis.common.conf.Configuration;
import org.apache.linkis.common.exception.VariableOperationFailedException;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.time.ZonedDateTime;
+import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
+import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -49,6 +50,9 @@ public class VariableOperationUtils {
private static final String[] CYCLES =
new String[] {CYCLE_YEAR, CYCLE_MONTH, CYCLE_DAY, CYCLE_HOUR, CYCLE_MINUTE, CYCLE_SECOND};
+ private static final ObjectMapper mapper =
+ JsonMapper.builder().enable(DeserializationFeature.FAIL_ON_TRAILING_TOKENS).build();
+
/**
* yyyy-MM-dd HH:mm:ss
*
@@ -56,9 +60,16 @@ public class VariableOperationUtils {
* @return
*/
public static ZonedDateTime toZonedDateTime(Date date, ZoneId zoneId) {
- Instant instant = date.toInstant();
- LocalDateTime localDateTime = instant.atZone(zoneId).toLocalDateTime();
- return ZonedDateTime.of(localDateTime, zoneId);
+ if (Configuration.VARIABLE_OPERATION_USE_NOW()) {
+ LocalTime currentTime = LocalTime.now();
+ LocalDate localDate = date.toInstant().atZone(zoneId).toLocalDate();
+ LocalDateTime localDateTime = LocalDateTime.of(localDate, currentTime);
+ return ZonedDateTime.of(localDateTime, zoneId);
+ } else {
+ Instant instant = date.toInstant();
+ LocalDateTime localDateTime = instant.atZone(zoneId).toLocalDateTime();
+ return ZonedDateTime.of(localDateTime, zoneId);
+ }
}
/**
@@ -78,30 +89,44 @@ public static ZonedDateTime toZonedDateTime(Date date) {
* @param str
* @return
*/
+ @Deprecated
public static String replaces(ZonedDateTime dateTime, String str)
throws VariableOperationFailedException {
- return replaces(dateTime, str, true);
+ try {
+ JsonNode rootNode = mapper.readTree(str);
+ if (rootNode.isArray() || rootNode.isObject()) {
+ replaceJson(dateTime, rootNode);
+ return rootNode.toString();
+ }
+ } catch (Exception e) {
+ return replace(dateTime, str);
+ }
+ return replace(dateTime, str);
}
/**
* json support variable operation
*
+ * @param codeType
* @param dateTime
* @param str
- * @param format
* @return
*/
- public static String replaces(ZonedDateTime dateTime, String str, boolean format)
+ public static String replaces(String codeType, ZonedDateTime dateTime, String str)
throws VariableOperationFailedException {
- try {
- JsonNode rootNode = JsonUtils.jackson().readTree(str);
- if (rootNode.isArray() || rootNode.isObject()) {
- replaceJson(dateTime, rootNode);
- return rootNode.toString();
+ String languageType = CodeAndRunTypeUtils.getLanguageTypeByCodeType(codeType, "");
+ if (languageType.equals(CodeAndRunTypeUtils.LANGUAGE_TYPE_JSON())) {
+ try {
+ JsonNode rootNode = mapper.readTree(str);
+ if (rootNode.isArray() || rootNode.isObject()) {
+ replaceJson(dateTime, rootNode);
+ return rootNode.toString();
+ }
+ } catch (Exception e) {
+ return replace(dateTime, str);
}
- } catch (Exception e) {
- return replace(dateTime, str);
}
+
return replace(dateTime, str);
}
@@ -197,7 +222,7 @@ private static void replaceJson(ZonedDateTime dateTime, JsonNode object)
} else if (temp.isObject()) {
replaceJson(dateTime, temp);
} else {
- arrayNode.insert(i, replace(dateTime, temp.toString()));
+ arrayNode.set(i, replace(dateTime, temp.toString()));
}
}
} else if (object.isObject()) {
diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/BDPConfiguration.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/BDPConfiguration.scala
index 55535e5336c..9bfa053b77b 100644
--- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/BDPConfiguration.scala
+++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/BDPConfiguration.scala
@@ -22,7 +22,7 @@ import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
-import java.io.{File, FileInputStream, InputStream, IOException}
+import java.io._
import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantReadWriteLock
@@ -140,15 +140,20 @@ private[conf] object BDPConfiguration extends Logging {
private def initConfig(config: Properties, filePath: String) {
var inputStream: InputStream = null
-
+ var reader: InputStreamReader = null
+ var buff: BufferedReader = null
Utils.tryFinally {
Utils.tryCatch {
inputStream = new FileInputStream(filePath)
- config.load(inputStream)
+ reader = new InputStreamReader(inputStream, "UTF-8")
+ buff = new BufferedReader(reader)
+ config.load(buff)
} { case e: IOException =>
logger.error("Can't load " + filePath, e)
}
} {
+ IOUtils.closeQuietly(buff)
+ IOUtils.closeQuietly(reader)
IOUtils.closeQuietly(inputStream)
}
}
@@ -227,19 +232,20 @@ private[conf] object BDPConfiguration extends Logging {
private[common] def formatValue[T](defaultValue: T, value: Option[String]): Option[T] = {
if (value.isEmpty || value.exists(StringUtils.isEmpty)) return Option(defaultValue)
+ val trimValue = value.map(_.trim)
val formattedValue = defaultValue match {
- case _: String => value
- case _: Byte => value.map(_.toByte)
- case _: Short => value.map(_.toShort)
- case _: Char => value.map(_.toCharArray.apply(0))
- case _: Int => value.map(_.toInt)
- case _: Long => value.map(_.toLong)
- case _: Float => value.map(_.toFloat)
- case _: Double => value.map(_.toDouble)
- case _: Boolean => value.map(_.toBoolean)
- case _: TimeType => value.map(new TimeType(_))
- case _: ByteType => value.map(new ByteType(_))
- case null => value
+ case _: String => trimValue
+ case _: Byte => trimValue.map(_.toByte)
+ case _: Short => trimValue.map(_.toShort)
+ case _: Char => trimValue.map(_.toCharArray.apply(0))
+ case _: Int => trimValue.map(_.toInt)
+ case _: Long => trimValue.map(_.toLong)
+ case _: Float => trimValue.map(_.toFloat)
+ case _: Double => trimValue.map(_.toDouble)
+ case _: Boolean => trimValue.map(_.toBoolean)
+ case _: TimeType => trimValue.map(new TimeType(_))
+ case _: ByteType => trimValue.map(new ByteType(_))
+ case null => trimValue
}
formattedValue.asInstanceOf[Option[T]]
}
diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala
index 8ac94739c46..dd4570d95b1 100644
--- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala
+++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala
@@ -17,7 +17,7 @@
package org.apache.linkis.common.conf
-import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.common.utils.{Logging, RSAUtils}
import org.apache.commons.lang3.StringUtils
@@ -31,8 +31,12 @@ object Configuration extends Logging {
val IS_TEST_MODE = CommonVars("wds.linkis.test.mode", false)
+ val LINKIS_SYS_NAME = CommonVars("linkis.system.name", "")
+
val IS_PROMETHEUS_ENABLE = CommonVars("wds.linkis.prometheus.enable", false)
+ val IS_MULTIPLE_YARN_CLUSTER = CommonVars("linkis.multiple.yarn.cluster", false).getValue
+
val PROMETHEUS_ENDPOINT = CommonVars("wds.linkis.prometheus.endpoint", "/actuator/prometheus")
val LINKIS_HOME = CommonVars("wds.linkis.home", CommonVars("LINKIS_HOME", "/tmp").getValue)
@@ -53,6 +57,9 @@ object Configuration extends Logging {
val CLOUD_CONSOLE_VARIABLE_SPRING_APPLICATION_NAME =
CommonVars("wds.linkis.console.variable.application.name", "linkis-ps-publicservice")
+ val JOBHISTORY_SPRING_APPLICATION_NAME =
+ CommonVars("wds.linkis.jobhistory.application.name", "linkis-ps-jobhistory")
+
// read from env
val PREFER_IP_ADDRESS: Boolean = CommonVars(
"linkis.discovery.prefer-ip-address",
@@ -63,10 +70,24 @@ object Configuration extends Logging {
val JOB_HISTORY_ADMIN = CommonVars("wds.linkis.jobhistory.admin", "hadoop")
+ val JOB_HISTORY_DEPARTMENT_ADMIN = CommonVars("wds.linkis.jobhistory.department.admin", "hadoop")
+
+ val JOB_RESULT_DEPARTMENT_LIMIT =
+ CommonVars("linkis.jobhistory.result.limit.department", "")
+
// Only the specified token has permission to call some api
val GOVERNANCE_STATION_ADMIN_TOKEN_STARTWITH = "ADMIN-"
- val VARIABLE_OPERATION: Boolean = CommonVars("wds.linkis.variable.operation", false).getValue
+ val VARIABLE_OPERATION_USE_NOW: Boolean =
+ CommonVars("wds.linkis.variable.operation.use.now", true).getValue
+
+ val IS_VIEW_FS_ENV = CommonVars("wds.linkis.env.is.viewfs", true)
+
+ val LINKIS_RSA_TOKEN_SWITCH = CommonVars("linkis.rsa.token.switch", false).getValue
+
+ val LINKIS_RSA_PUBLIC_KEY = CommonVars("linkis.rsa.public.key", "")
+
+ val LINKIS_RSA_PRIVATE_KEY = CommonVars("linkis.rsa.private.key", "")
val ERROR_MSG_TIP =
CommonVars(
@@ -74,11 +95,37 @@ object Configuration extends Logging {
"The request interface %s is abnormal. You can try to troubleshoot common problems in the knowledge base document"
)
+ val LINKIS_TOKEN = CommonVars("wds.linkis.token", "")
+
+ val HDFS_HOUR_DIR_SWITCH = CommonVars("linkis.hdfs.hour.dir.switch", false).getValue
+
+ val LINKIS_KEYTAB_SWITCH: Boolean = CommonVars("linkis.keytab.switch", false).getValue
+
+ val METRICS_INCREMENTAL_UPDATE_ENABLE =
+ CommonVars[Boolean]("linkis.jobhistory.metrics.incremental.update.enable", false)
+
+ val GLOBAL_CONF_CHN_NAME = "全局设置"
+
+ val GLOBAL_CONF_CHN_OLDNAME = "通用设置"
+
+ val GLOBAL_CONF_CHN_EN_NAME = "GlobalSettings"
+
+ val GLOBAL_CONF_SYMBOL = "*"
+
+ val GLOBAL_CONF_LABEL = "*-*,*-*"
+
def isAdminToken(token: String): Boolean = {
if (StringUtils.isBlank(token)) {
false
} else {
- token.toUpperCase().startsWith(GOVERNANCE_STATION_ADMIN_TOKEN_STARTWITH)
+ if (Configuration.LINKIS_RSA_TOKEN_SWITCH && token.startsWith(RSAUtils.PREFIX)) {
+ RSAUtils
+ .dncryptWithLinkisPublicKey(token)
+ .toUpperCase()
+ .contains(GOVERNANCE_STATION_ADMIN_TOKEN_STARTWITH)
+ } else {
+ token.toUpperCase().contains(GOVERNANCE_STATION_ADMIN_TOKEN_STARTWITH)
+ }
}
}
@@ -122,10 +169,27 @@ object Configuration extends Logging {
.exists(username.equalsIgnoreCase)
}
+ def isDepartmentAdmin(username: String): Boolean = {
+ val departmentAdminUsers = JOB_HISTORY_DEPARTMENT_ADMIN.getHotValue.split(",")
+ departmentAdminUsers.exists(username.equalsIgnoreCase)
+ }
+
def getJobHistoryAdmin(): Array[String] = {
val adminUsers = GOVERNANCE_STATION_ADMIN.getHotValue.split(",")
val historyAdminUsers = JOB_HISTORY_ADMIN.getHotValue.split(",")
(adminUsers ++ historyAdminUsers).distinct
}
+ def getGlobalCreator(creator: String): String = creator match {
+ case Configuration.GLOBAL_CONF_CHN_NAME | Configuration.GLOBAL_CONF_CHN_OLDNAME |
+ Configuration.GLOBAL_CONF_CHN_EN_NAME =>
+ GLOBAL_CONF_SYMBOL
+ case _ => creator
+ }
+
+ def canResultSetByDepartment(departmentId: String): Boolean = {
+ val jobResultLimit = JOB_RESULT_DEPARTMENT_LIMIT.getHotValue.split(",")
+ !jobResultLimit.exists(departmentId.equalsIgnoreCase)
+ }
+
}
diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/log/LogUtils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/log/LogUtils.scala
index 77c82f38838..e558e765bed 100644
--- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/log/LogUtils.scala
+++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/log/LogUtils.scala
@@ -33,7 +33,7 @@ object LogUtils {
}
def generateERROR(rawLog: String): String = {
- getTimeFormat + " " + "ERROR" + " " + rawLog
+ getTimeFormat + " " + ERROR_STR + " " + rawLog
}
def generateWarn(rawLog: String): String = {
@@ -52,4 +52,6 @@ object LogUtils {
getTimeFormat + " " + "SYSTEM-WARN" + " " + rawLog
}
+ val ERROR_STR = "ERROR"
+
}
diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/CodeAndRunTypeUtils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/CodeAndRunTypeUtils.scala
index 3870fe6e584..9bbd3201186 100644
--- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/CodeAndRunTypeUtils.scala
+++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/CodeAndRunTypeUtils.scala
@@ -21,6 +21,10 @@ import org.apache.linkis.common.conf.CommonVars
import org.apache.commons.lang3.StringUtils
+import java.util.Locale
+
+import scala.collection.mutable
+
object CodeAndRunTypeUtils {
private val CONF_LOCK = new Object()
@@ -29,7 +33,7 @@ object CodeAndRunTypeUtils {
*/
val CODE_TYPE_AND_RUN_TYPE_RELATION = CommonVars(
"linkis.codeType.language.relation",
- "sql=>sql|hql|jdbc|hive|psql|fql|tsql,python=>python|py|pyspark,java=>java,scala=>scala,shell=>sh|shell,json=>json|data_calc"
+ "sql=>sql|hql|jdbc|hive|psql|fql|tsql|nebula|ngql|aisql|starrocks,python=>python|py|pyspark|py3,java=>java,scala=>scala,shell=>sh|shell,json=>json|data_calc"
)
val LANGUAGE_TYPE_SQL = "sql"
@@ -44,6 +48,8 @@ object CodeAndRunTypeUtils {
val LANGUAGE_TYPE_JSON = "json"
+ val LANGUAGE_TYPE_AI_SQL = "aisql"
+
private var codeTypeAndLanguageTypeRelationMap: Map[String, List[String]] = null
/**
@@ -101,14 +107,23 @@ object CodeAndRunTypeUtils {
def getLanguageTypeAndCodeTypeRelationMap: Map[String, String] = {
val codeTypeAndRunTypeRelationMap = getCodeTypeAndLanguageTypeRelationMap
if (codeTypeAndRunTypeRelationMap.isEmpty) Map()
- else codeTypeAndRunTypeRelationMap.flatMap(x => x._2.map(y => (y, x._1)))
+ else {
+// codeTypeAndRunTypeRelationMap.flatMap(x => x._2.map(y => (y, x._1)))
+ val map = mutable.Map[String, String]()
+ codeTypeAndRunTypeRelationMap.foreach(kv => {
+ kv._2.foreach(v => map.put(v, kv._1))
+ })
+ map.toMap
+ }
}
def getLanguageTypeByCodeType(codeType: String, defaultLanguageType: String = ""): String = {
if (StringUtils.isBlank(codeType)) {
return ""
}
- getLanguageTypeAndCodeTypeRelationMap.getOrElse(codeType, defaultLanguageType)
+ val lowerCaseCodeType = codeType.toLowerCase(Locale.getDefault)
+ getLanguageTypeAndCodeTypeRelationMap.getOrElse(lowerCaseCodeType, defaultLanguageType)
+
}
/**
diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/LDAPUtils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/LDAPUtils.scala
index b53184eceb4..f298d5af5bc 100644
--- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/LDAPUtils.scala
+++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/LDAPUtils.scala
@@ -19,12 +19,17 @@ package org.apache.linkis.common.utils
import org.apache.linkis.common.conf.CommonVars
+import org.apache.commons.codec.binary.Hex
import org.apache.commons.lang3.StringUtils
import javax.naming.Context
import javax.naming.ldap.InitialLdapContext
+import java.nio.charset.StandardCharsets
import java.util.Hashtable
+import java.util.concurrent.TimeUnit
+
+import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
object LDAPUtils extends Logging {
@@ -38,7 +43,33 @@ object LDAPUtils extends Logging {
val baseDN = CommonVars("wds.linkis.ldap.proxy.baseDN", "").getValue
val userNameFormat = CommonVars("wds.linkis.ldap.proxy.userNameFormat", "").getValue
+ private val storeUser: Cache[String, String] = CacheBuilder
+ .newBuilder()
+ .maximumSize(1000)
+ .expireAfterWrite(20, TimeUnit.MINUTES)
+ .removalListener(new RemovalListener[String, String] {
+
+ override def onRemoval(removalNotification: RemovalNotification[String, String]): Unit = {
+ logger.info(s"store user remove key: ${removalNotification.getKey}")
+ }
+
+ })
+ .build()
+
def login(userID: String, password: String): Unit = {
+
+ val saltPwd = storeUser.getIfPresent(userID)
+ if (StringUtils.isNotBlank(saltPwd)) {
+ Utils.tryAndWarn {
+ if (
+ saltPwd.equalsIgnoreCase(Hex.encodeHexString(password.getBytes(StandardCharsets.UTF_8)))
+ ) {
+ logger.info(s"user $userID login success for storeUser")
+ return
+ }
+ }
+ }
+
val env = new Hashtable[String, String]()
val bindDN =
if (StringUtils.isBlank(userNameFormat)) userID
@@ -53,6 +84,9 @@ object LDAPUtils extends Logging {
env.put(Context.SECURITY_CREDENTIALS, bindPassword)
new InitialLdapContext(env, null)
+ Utils.tryAndWarn {
+ storeUser.put(userID, Hex.encodeHexString(password.getBytes(StandardCharsets.UTF_8)))
+ }
logger.info(s"user $userID login success.")
}
diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/RSAUtils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/RSAUtils.scala
index 746b3600a6c..4a34db89765 100644
--- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/RSAUtils.scala
+++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/RSAUtils.scala
@@ -17,16 +17,22 @@
package org.apache.linkis.common.utils
+import org.apache.linkis.common.conf.Configuration
+
import org.apache.commons.codec.binary.Hex
import org.apache.commons.net.util.Base64
import javax.crypto.Cipher
+import java.net.URLDecoder
import java.nio.charset.StandardCharsets
-import java.security.{KeyPair, KeyPairGenerator, PrivateKey, PublicKey}
+import java.security.{KeyFactory, KeyPair, KeyPairGenerator, PrivateKey, PublicKey}
+import java.security.spec.{PKCS8EncodedKeySpec, X509EncodedKeySpec}
+
+object RSAUtils extends Logging {
+ private implicit val keyPair = genKeyPair(2048)
-object RSAUtils {
- private implicit val keyPair = genKeyPair(1024)
+ implicit val PREFIX = "{RSA}"
def genKeyPair(keyLength: Int): KeyPair = {
val keyPair = KeyPairGenerator.getInstance("RSA")
@@ -64,4 +70,112 @@ object RSAUtils {
}
def decrypt(data: Array[Byte]): Array[Byte] = decrypt(data, keyPair.getPrivate)
+
+ /**
+ * 将字符串形式的公钥转换为 PublicKey 对象。
+ *
+ * @param publicKeyStr
+ * 公钥字符串,Base64 编码
+ * @return
+ * 转换后的 PublicKey 对象
+ */
+ def stringToPublicKey(publicKeyStr: String): PublicKey = {
+ val keyBytes = Base64.decodeBase64(publicKeyStr)
+ val keySpec = new X509EncodedKeySpec(keyBytes)
+ val keyFactory = KeyFactory.getInstance("RSA")
+ keyFactory.generatePublic(keySpec)
+ }
+
+ /**
+ * 将字符串形式的私钥转换为 PrivateKey 对象。
+ *
+ * @param privateKeyStr
+ * 私钥字符串,Base64 编码
+ * @return
+ * 转换后的 PrivateKey 对象
+ */
+ def stringToPrivateKey(privateKeyStr: String): PrivateKey = {
+ val keyBytes = Base64.decodeBase64(privateKeyStr)
+ val keySpec = new PKCS8EncodedKeySpec(keyBytes)
+ val keyFactory = KeyFactory.getInstance("RSA")
+ keyFactory.generatePrivate(keySpec)
+ }
+
+ /**
+ * 使用 Linkis 配置文件中的公钥对数据进行加密。
+ *
+ * @param data
+ * 需要加密的原始数据字符串
+ * @return
+ * 加密后的数据字符串,带有前缀
+ */
+ def encryptWithLinkisPublicKey(data: String): String = {
+ // 从配置文件中获取公钥和私钥字符串
+ val publicKey = Configuration.LINKIS_RSA_PUBLIC_KEY.getValue
+ val privateKey = Configuration.LINKIS_RSA_PRIVATE_KEY.getValue
+ // 将公钥和私钥字符串转换为 KeyPair 对象
+ val keyPair =
+ new KeyPair(RSAUtils.stringToPublicKey(publicKey), RSAUtils.stringToPrivateKey(privateKey))
+ // 使用公钥对数据进行加密
+ val encryptedData = RSAUtils.encrypt(data.getBytes, keyPair.getPublic)
+ // 将加密后的数据进行 Base64 编码,并添加前缀
+ val encodedEncryptedData =
+ PREFIX + new String(Base64.encodeBase64URLSafe(encryptedData))
+ encodedEncryptedData
+ }
+
+ /**
+ * 使用 Linkis 配置文件中的私钥对数据进行解密。
+ *
+ * @param data
+ * 需要解密的加密数据字符串,带有前缀
+ * @return
+ * 解密后的原始数据字符串
+ */
+ def dncryptWithLinkisPublicKey(data: String): String = {
+ // 从配置文件中获取公钥和私钥字符串
+ val publicKey = Configuration.LINKIS_RSA_PUBLIC_KEY.getValue
+ val privateKey = Configuration.LINKIS_RSA_PRIVATE_KEY.getValue
+ val decodedData = URLDecoder.decode(data, "UTF-8")
+ // 将公钥和私钥字符串转换为 KeyPair 对象
+ val keyPair =
+ new KeyPair(RSAUtils.stringToPublicKey(publicKey), RSAUtils.stringToPrivateKey(privateKey))
+ // 检查数据是否以指定前缀开头
+ if (decodedData.startsWith(PREFIX)) {
+ // 去掉前缀,获取加密数据部分
+ val dataSub = decodedData.substring(5)
+ // 将加密数据进行 Base64 解码
+ val decodedEncryptedData = Base64.decodeBase64(dataSub)
+ // 使用私钥对数据进行解密
+ val decryptedData = RSAUtils.decrypt(decodedEncryptedData, keyPair.getPrivate)
+ // 将解密后的数据转换为字符串
+ val decryptedString = new String(decryptedData)
+ decryptedString
+ } else {
+ logger.warn(s"token信息非$PREFIX 开头,不执行解密!")
+ data
+ }
+ }
+
+ /**
+ * 从给定的 token 中提取前半部分字符串。
+ *
+ * @param token
+ * 输入的完整 token 字符串。
+ * @return
+ * 提取的 token 后半部分字符串。
+ */
+ def tokenSubRule(token: String): String = {
+ val lowerToken = token.toLowerCase()
+ // 判断条件:
+ // 1. 以 "-auth" 结尾(不区分大小写)且长度 < 12
+ // 2. 或者长度 < 10
+ if ((lowerToken.endsWith("-auth") && lowerToken.length < 12) || lowerToken.length < 10) {
+ token // 不截取,原样返回
+ } else {
+ // 否则,取后半部分(原逻辑)
+ token.substring(token.length / 2, token.length)
+ }
+ }
+
}
diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/Utils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/Utils.scala
index 80e3ff7e5e0..67dfc0971a6 100644
--- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/Utils.scala
+++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/Utils.scala
@@ -43,6 +43,8 @@ import org.slf4j.Logger
object Utils extends Logging {
+ val DEFAULE_SCHEDULER_THREAD_NAME_PREFIX = "Linkis-Default-Scheduler-Thread-"
+
def tryQuietly[T](tryOp: => T): T = tryQuietly(tryOp, _ => ())
def tryCatch[T](tryOp: => T)(catchOp: Throwable => T): T = {
@@ -181,6 +183,15 @@ object Utils extends Logging {
): ExecutionContextExecutorService =
ExecutionContext.fromExecutorService(newCachedThreadPool(threadNum, threadName, isDaemon))
+ def newCachedExecutionContextWithExecutor(
+ threadNum: Int,
+ threadName: String,
+ isDaemon: Boolean = true
+ ): (ExecutionContextExecutorService, ThreadPoolExecutor) = {
+ val threadPool: ThreadPoolExecutor = newCachedThreadPool(threadNum, threadName, isDaemon)
+ (ExecutionContext.fromExecutorService(threadPool), threadPool)
+ }
+
def newFixedThreadPool(
threadNum: Int,
threadName: String,
@@ -199,7 +210,7 @@ object Utils extends Logging {
val defaultScheduler: ScheduledThreadPoolExecutor = {
val scheduler =
- new ScheduledThreadPoolExecutor(20, threadFactory("Linkis-Default-Scheduler-Thread-", true))
+ new ScheduledThreadPoolExecutor(20, threadFactory(DEFAULE_SCHEDULER_THREAD_NAME_PREFIX, true))
scheduler.setMaximumPoolSize(20)
scheduler.setKeepAliveTime(5, TimeUnit.MINUTES)
scheduler
diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala
index 30bdeb4b147..bd2fab49302 100644
--- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala
+++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala
@@ -43,6 +43,8 @@ object VariableUtils extends Logging {
val RUN_TODAY_H = "run_today_h"
+ val RUN_TODAY_HOUR = "run_today_hour"
+
private val codeReg =
"\\$\\{\\s*[A-Za-z][A-Za-z0-9_\\.]*\\s*[\\+\\-\\*/]?\\s*[A-Za-z0-9_\\.]*\\s*\\}".r
@@ -83,6 +85,13 @@ object VariableUtils extends Logging {
nameAndType(RUN_TODAY_H) = HourType(runTodayH)
}
}
+ if (variables.containsKey(RUN_TODAY_HOUR)) {
+ val runTodayHourStr = variables.get(RUN_TODAY_HOUR).asInstanceOf[String]
+ if (StringUtils.isNotBlank(runTodayHourStr)) {
+ val runTodayHour = new CustomHourType(runTodayHourStr, false)
+ nameAndType(RUN_TODAY_HOUR) = HourType(runTodayHour)
+ }
+ }
initAllDateVars(run_date, nameAndType)
val codeOperation = parserVar(replaceStr, nameAndType)
parserDate(codeOperation, run_date)
@@ -141,18 +150,27 @@ object VariableUtils extends Logging {
nameAndType(RUN_TODAY_H) = HourType(runTodayH)
}
}
+ if (variables.containsKey(RUN_TODAY_HOUR)) {
+ val runTodayHourStr = variables.get(RUN_TODAY_HOUR).asInstanceOf[String]
+ if (StringUtils.isNotBlank(runTodayHourStr)) {
+ val runTodayHour = new CustomHourType(runTodayHourStr, false)
+ nameAndType(RUN_TODAY_HOUR) = HourType(runTodayHour)
+ }
+ }
initAllDateVars(run_date, nameAndType)
val codeOperation = parserVar(code, nameAndType)
- parserDate(codeOperation, run_date)
+ parserDate(codeType, codeOperation, run_date)
}
+ @deprecated
private def parserDate(code: String, run_date: CustomDateType): String = {
- if (Configuration.VARIABLE_OPERATION) {
- val zonedDateTime: ZonedDateTime = VariableOperationUtils.toZonedDateTime(run_date.getDate)
- VariableOperationUtils.replaces(zonedDateTime, code)
- } else {
- code
- }
+ val zonedDateTime: ZonedDateTime = VariableOperationUtils.toZonedDateTime(run_date.getDate)
+ VariableOperationUtils.replaces(zonedDateTime, code)
+ }
+
+ private def parserDate(codeType: String, code: String, run_date: CustomDateType): String = {
+ val zonedDateTime: ZonedDateTime = VariableOperationUtils.toZonedDateTime(run_date.getDate)
+ VariableOperationUtils.replaces(codeType, zonedDateTime, code)
}
private def initAllDateVars(
@@ -255,6 +273,30 @@ object VariableUtils extends Logging {
nameAndType("run_today_h_std") = HourType(
new CustomHourType(nameAndType(RUN_TODAY_H).asInstanceOf[HourType].getValue, true)
)
+ // calculate run_today_hour base on run_date
+ if (nameAndType.contains("run_today_hour")) {
+ nameAndType("run_today_hour").asInstanceOf[HourType]
+ } else {
+ val run_today_hour = new CustomHourType(getCurHour(false, run_today.toString), false)
+ nameAndType("run_today_hour") = HourType(run_today_hour)
+ }
+ nameAndType("run_today_hour_std") = HourType(
+ new CustomHourType(nameAndType("run_today_hour").asInstanceOf[HourType].getValue, true)
+ )
+ // calculate run_last_mon base on run_today
+ val run_roday_mon = new CustomMonType(getMonthDay(false, run_today.getDate), false)
+ nameAndType("run_last_mon_now") = MonType(new CustomMonType(run_roday_mon - 1, false, false))
+ nameAndType("run_last_mon_now_std") = MonType(new CustomMonType(run_roday_mon - 1, true, false))
+ // calculate run_current_mon_now base on run_today
+ nameAndType("run_current_mon_now") = MonType(
+ new CustomMonType(run_roday_mon.toString, false, false)
+ )
+ nameAndType("run_current_mon_now_std") = MonType(
+ new CustomMonType(run_roday_mon.toString, true, false)
+ )
+ // calculate run_mon_now base on run_today
+ nameAndType("run_mon_now") = MonType(new CustomMonType(run_roday_mon.toString, false, false))
+ nameAndType("run_mon_now_std") = MonType(new CustomMonType(run_roday_mon.toString, true, false))
}
/**
@@ -337,7 +379,7 @@ object VariableUtils extends Logging {
*
* @param code
* :code
- * @param codeType
+ * @param languageType
* :SQL,PYTHON
* @return
*/
@@ -346,27 +388,37 @@ object VariableUtils extends Logging {
var varString: String = null
var errString: String = null
+ var rightVarString: String = null
languageType match {
case CodeAndRunTypeUtils.LANGUAGE_TYPE_SQL =>
varString = """\s*--@set\s*.+\s*"""
+ rightVarString = """^\s*--@set\s*.+\s*"""
errString = """\s*--@.*"""
case CodeAndRunTypeUtils.LANGUAGE_TYPE_PYTHON | CodeAndRunTypeUtils.LANGUAGE_TYPE_SHELL =>
varString = """\s*#@set\s*.+\s*"""
+ rightVarString = """^\s*#@set\s*.+\s*"""
errString = """\s*#@"""
case CodeAndRunTypeUtils.LANGUAGE_TYPE_SCALA =>
varString = """\s*//@set\s*.+\s*"""
+ rightVarString = """^\s*//@set\s*.+\s*"""
errString = """\s*//@.+"""
case CodeAndRunTypeUtils.LANGUAGE_TYPE_JAVA =>
varString = """\s*!!@set\s*.+\s*"""
+ rightVarString = """^\s*!!@set\s*.+\s*"""
case _ =>
return nameAndValue
}
val customRegex = varString.r.unanchored
+ val customRightRegex = rightVarString.r.unanchored
val errRegex = errString.r.unanchored
code.split("\n").foreach { str =>
{
+
+ if (customRightRegex.unapplySeq(str).size < customRegex.unapplySeq(str).size) {
+ logger.warn(s"code:$str is wrong custom variable format!!!")
+ }
str match {
case customRegex() =>
val clearStr = if (str.endsWith(";")) str.substring(0, str.length - 1) else str
diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/CustomDateType.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/CustomDateType.scala
index 4359df33984..0c528a4a9b6 100644
--- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/CustomDateType.scala
+++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/CustomDateType.scala
@@ -79,20 +79,12 @@ class CustomMonthType(date: String, std: Boolean = true, isEnd: Boolean = false)
def -(months: Int): String = {
val dateFormat = DateTypeUtils.dateFormatLocal.get()
- if (std) {
- DateTypeUtils.getMonth(std, isEnd, DateUtils.addMonths(dateFormat.parse(date), -months))
- } else {
- DateTypeUtils.getMonth(std, isEnd, DateUtils.addMonths(dateFormat.parse(date), -months))
- }
+ DateTypeUtils.getMonth(std, isEnd, DateUtils.addMonths(dateFormat.parse(date), -months))
}
def +(months: Int): String = {
val dateFormat = DateTypeUtils.dateFormatLocal.get()
- if (std) {
- DateTypeUtils.getMonth(std, isEnd, DateUtils.addMonths(dateFormat.parse(date), months))
- } else {
- DateTypeUtils.getMonth(std, isEnd, DateUtils.addMonths(dateFormat.parse(date), months))
- }
+ DateTypeUtils.getMonth(std, isEnd, DateUtils.addMonths(dateFormat.parse(date), months))
}
override def toString: String = {
@@ -111,20 +103,12 @@ class CustomMonType(date: String, std: Boolean = true, isEnd: Boolean = false) {
def -(months: Int): String = {
val dateFormat = DateTypeUtils.dateFormatMonLocal.get()
- if (std) {
- DateTypeUtils.getMon(std, isEnd, DateUtils.addMonths(dateFormat.parse(date), -months))
- } else {
- DateTypeUtils.getMon(std, isEnd, DateUtils.addMonths(dateFormat.parse(date), -months))
- }
+ DateTypeUtils.getMon(std, isEnd, DateUtils.addMonths(dateFormat.parse(date), -months))
}
def +(months: Int): String = {
val dateFormat = DateTypeUtils.dateFormatMonLocal.get()
- if (std) {
- DateTypeUtils.getMon(std, isEnd, DateUtils.addMonths(dateFormat.parse(date), months))
- } else {
- DateTypeUtils.getMon(std, isEnd, DateUtils.addMonths(dateFormat.parse(date), months))
- }
+ DateTypeUtils.getMon(std, isEnd, DateUtils.addMonths(dateFormat.parse(date), months))
}
override def toString: String = {
diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/DateTypeUtils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/DateTypeUtils.scala
index df6dff865d7..ed97be83daf 100644
--- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/DateTypeUtils.scala
+++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/DateTypeUtils.scala
@@ -46,6 +46,10 @@ object DateTypeUtils {
override protected def initialValue = new SimpleDateFormat("yyyy-MM-dd HH")
}
+ val dateFormatSecondLocal = new ThreadLocal[SimpleDateFormat]() {
+ override protected def initialValue = new SimpleDateFormat("yyyyMMddHHmmss")
+ }
+
/**
* Get Today"s date
*
diff --git a/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/conf/BDPConfigurationTest.java b/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/conf/BDPConfigurationTest.java
new file mode 100644
index 00000000000..5a025eb8b02
--- /dev/null
+++ b/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/conf/BDPConfigurationTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.common.conf;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/** BDPConfiguration Tester */
+public class BDPConfigurationTest {
+
+ @Test
+ public void testGetOption() {
+
+ Assertions.assertEquals(
+ "properties支持中文",
+ BDPConfiguration.getOption(
+ CommonVars.apply("linkis.jobhistory.error.msg.tip", "properties支持中文"))
+ .get());
+
+ Assertions.assertEquals(
+ "properties支持中文(默认)",
+ BDPConfiguration.getOption(
+ CommonVars.apply("linkis.jobhistory.error.msg.tip1", "properties支持中文(默认)"))
+ .get());
+ }
+}
diff --git a/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/utils/ByteTimeUtilsTest.java b/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/utils/ByteTimeUtilsTest.java
new file mode 100644
index 00000000000..f548d89d462
--- /dev/null
+++ b/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/utils/ByteTimeUtilsTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.common.utils;
+
+import java.util.function.Function;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class ByteTimeUtilsTest {
+
+ private static final ImmutableMap> opFunction =
+ ImmutableMap.>builder()
+ .put("byteStringAsBytes", tar -> ByteTimeUtils.byteStringAsBytes(tar))
+ .put("byteStringAsKb", tar -> ByteTimeUtils.byteStringAsKb(tar))
+ .put("byteStringAsMb", tar -> ByteTimeUtils.byteStringAsMb(tar))
+ .put("byteStringAsGb", tar -> ByteTimeUtils.byteStringAsGb(tar))
+ .build();
+
+ private static final ImmutableMap convertToByte =
+ ImmutableMap.builder()
+ .put("1", 1l)
+ .put("1b", 1l)
+ .put("1B", 1l)
+ .put("1k", 1024l)
+ .put("1K", 1024l)
+ .put("1kb", 1024l)
+ .put("1Kb", 1024l)
+ .put("1kB", 1024l)
+ .put("1KB", 1024l)
+ .put("1m", 1024l * 1024l)
+ .put("1M", 1024l * 1024l)
+ .put("1mb", 1024l * 1024l)
+ .put("1Mb", 1024l * 1024l)
+ .put("1mB", 1024l * 1024l)
+ .put("1MB", 1024l * 1024l)
+ .put("1g", 1024l * 1024l * 1024l)
+ .put("1G", 1024l * 1024l * 1024l)
+ .put("1gb", 1024l * 1024l * 1024l)
+ .put("1gB", 1024l * 1024l * 1024l)
+ .put("1Gb", 1024l * 1024l * 1024l)
+ .put("1GB", 1024l * 1024l * 1024l)
+ .put("1t", 1024l * 1024l * 1024l * 1024l)
+ .put("1T", 1024l * 1024l * 1024l * 1024l)
+ .put("1tb", 1024l * 1024l * 1024l * 1024l)
+ .put("1Tb", 1024l * 1024l * 1024l * 1024l)
+ .put("1tB", 1024l * 1024l * 1024l * 1024l)
+ .put("1TB", 1024l * 1024l * 1024l * 1024l)
+ .put("1p", 1024l * 1024l * 1024l * 1024l * 1024l)
+ .put("1P", 1024l * 1024l * 1024l * 1024l * 1024l)
+ .put("1pb", 1024l * 1024l * 1024l * 1024l * 1024l)
+ .put("1Pb", 1024l * 1024l * 1024l * 1024l * 1024l)
+ .put("1pB", 1024l * 1024l * 1024l * 1024l * 1024l)
+ .put("1PB", 1024l * 1024l * 1024l * 1024l * 1024l)
+ .build();
+
+ private static final ImmutableMap convertToKB =
+ ImmutableMap.builder()
+ .put("1", 1l)
+ .put("1024b", 1l)
+ .put("1024B", 1l)
+ .put("1k", 1l)
+ .put("1K", 1l)
+ .put("1kb", 1l)
+ .put("1Kb", 1l)
+ .put("1kB", 1l)
+ .put("1KB", 1l)
+ .put("1m", 1024l)
+ .put("1M", 1024l)
+ .put("1mb", 1024l)
+ .put("1Mb", 1024l)
+ .put("1mB", 1024l)
+ .put("1MB", 1024l)
+ .put("1g", 1024l * 1024l)
+ .put("1G", 1024l * 1024l)
+ .put("1gb", 1024l * 1024l)
+ .put("1gB", 1024l * 1024l)
+ .put("1Gb", 1024l * 1024l)
+ .put("1GB", 1024l * 1024l)
+ .build();
+
+ private static final ImmutableMap convertToMB =
+ ImmutableMap.builder()
+ .put("1", 1l)
+ .put("1024k", 1l)
+ .put("1024K", 1l)
+ .put("1024kb", 1l)
+ .put("1024Kb", 1l)
+ .put("1024kB", 1l)
+ .put("1024KB", 1l)
+ .put("1m", 1l)
+ .put("1M", 1l)
+ .put("1mb", 1l)
+ .put("1Mb", 1l)
+ .put("1mB", 1l)
+ .put("1MB", 1l)
+ .put("1g", 1024l)
+ .put("1G", 1024l)
+ .put("1gb", 1024l)
+ .put("1gB", 1024l)
+ .put("1Gb", 1024l)
+ .put("1GB", 1024l)
+ .build();
+
+ private static final ImmutableMap convertToGB =
+ ImmutableMap.builder()
+ .put("1", 1l)
+ .put("1024m", 1l)
+ .put("1024M", 1l)
+ .put("1024mb", 1l)
+ .put("1024Mb", 1l)
+ .put("1024mB", 1l)
+ .put("1024MB", 1l)
+ .put("1g", 1l)
+ .put("1G", 1l)
+ .put("1gb", 1l)
+ .put("1gB", 1l)
+ .put("1Gb", 1l)
+ .put("1GB", 1l)
+ .put("1t", 1024l)
+ .put("1T", 1024l)
+ .put("1tb", 1024l)
+ .put("1Tb", 1024l)
+ .put("1tB", 1024l)
+ .put("1TB", 1024l)
+ .build();
+
+ @Test
+ void byteStringAsBytes() {
+ convertToByte.forEach(
+ (k, v) -> Assertions.assertEquals(opFunction.get("byteStringAsBytes").apply(k), v));
+ Assertions.assertThrows(
+ IllegalArgumentException.class, () -> opFunction.get("byteStringAsBytes").apply("1A"));
+ }
+
+ @Test
+ void byteStringAsKb() {
+ convertToKB.forEach(
+ (k, v) -> Assertions.assertEquals(opFunction.get("byteStringAsKb").apply(k), v));
+ Assertions.assertThrows(
+ IllegalArgumentException.class, () -> opFunction.get("byteStringAsKb").apply("1a"));
+ }
+
+ @Test
+ void byteStringAsMb() {
+ convertToMB.forEach(
+ (k, v) -> Assertions.assertEquals(opFunction.get("byteStringAsMb").apply(k), v));
+ Assertions.assertThrows(
+ IllegalArgumentException.class, () -> opFunction.get("byteStringAsMb").apply("1c"));
+ }
+
+ @Test
+ void byteStringAsGb() {
+ convertToGB.forEach(
+ (k, v) -> Assertions.assertEquals(opFunction.get("byteStringAsGb").apply(k), v));
+ Assertions.assertThrows(
+ IllegalArgumentException.class, () -> opFunction.get("byteStringAsGb").apply("1C"));
+ }
+}
diff --git a/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/utils/SecurityUtilsTest.java b/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/utils/SecurityUtilsTest.java
index 4fdca7b82ac..6953b8835b9 100644
--- a/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/utils/SecurityUtilsTest.java
+++ b/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/utils/SecurityUtilsTest.java
@@ -17,166 +17,326 @@
package org.apache.linkis.common.utils;
+import org.apache.linkis.common.conf.BDPConfiguration;
import org.apache.linkis.common.exception.LinkisSecurityException;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
/** SecurityUtils Tester */
public class SecurityUtilsTest {
+ @BeforeAll
+ public static void init() {
+ BDPConfiguration.set("linkis.mysql.strong.security.enable", "true");
+ }
+
@Test
- public void testAppendMysqlForceParamsUrl() throws Exception {
- // allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false
+ public void testCheckUrl() {
+ // true
String url = "jdbc:mysql://127.0.0.1:10000/db_name";
- String newUrl = SecurityUtils.appendMysqlForceParams(url);
- Assertions.assertEquals(
- url
- + "?allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false",
- newUrl);
-
- url = "jdbc:mysql://127.0.0.1:10000/db_name?";
- newUrl = SecurityUtils.appendMysqlForceParams(url);
- Assertions.assertEquals(
- url
- + "allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false",
- newUrl);
-
- url = "jdbc:mysql://127.0.0.1:10000/db_name?p1=v1";
- newUrl = SecurityUtils.appendMysqlForceParams(url);
- Assertions.assertEquals(
- url
- + "&"
- + "allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false",
- newUrl);
+ Assertions.assertDoesNotThrow(
+ () -> {
+ SecurityUtils.checkUrl(url);
+ });
+ // false
+ String url1 = "jdbc:mysql://127.0.0.1:10000/db_name?";
+ Assertions.assertThrows(
+ LinkisSecurityException.class,
+ () -> {
+ SecurityUtils.checkUrl(url1);
+ });
+ // false
+ String url11 = "jdbc:mysql://127.0.0.1:10000/db_name?abc";
+ Assertions.assertThrows(
+ LinkisSecurityException.class,
+ () -> {
+ SecurityUtils.checkUrl(url11);
+ });
+ // true
+ String url2 = "jdbc:mysql://127.0.0.1:10000/";
+ Assertions.assertDoesNotThrow(
+ () -> {
+ SecurityUtils.checkUrl(url2);
+ });
+ // true
+ String url3 = "jdbc:mysql://127.0.0.1:10000";
+ Assertions.assertDoesNotThrow(
+ () -> {
+ SecurityUtils.checkUrl(url3);
+ });
+ // true
+ String url4 = "JDBC:mysql://127.0.0.1:10000/db_name";
+ Assertions.assertDoesNotThrow(
+ () -> {
+ SecurityUtils.checkUrl(url4);
+ });
+ // true
+ String url5 = "JDBC:H2://127.0.0.1:10000/db_name";
+ Assertions.assertDoesNotThrow(
+ () -> {
+ SecurityUtils.checkUrl(url5);
+ });
+ // true
+ String url6 = "JDBC:H2://test-example.com:10000/db_name";
+ Assertions.assertDoesNotThrow(
+ () -> {
+ SecurityUtils.checkUrl(url6);
+ });
+ // true
+ String url7 = "JDBC:H2://example.测试:10000/db_name";
+ Assertions.assertDoesNotThrow(
+ () -> {
+ SecurityUtils.checkUrl(url7);
+ });
}
@Test
- public void testAppendMysqlForceParamsExtraParams() throws Exception {
- Map extraParams = new HashMap<>();
- extraParams.put("testKey", "testValue");
- SecurityUtils.appendMysqlForceParams(extraParams);
- Assertions.assertEquals("false", extraParams.get("allowLoadLocalInfile"));
- Assertions.assertEquals("false", extraParams.get("autoDeserialize"));
- Assertions.assertEquals("false", extraParams.get("allowLocalInfile"));
- Assertions.assertEquals("false", extraParams.get("allowUrlInLocalInfile"));
- Assertions.assertEquals("testValue", extraParams.get("testKey"));
- Assertions.assertEquals(null, extraParams.get("otherKey"));
+ public void testGetUrl() {
+ BDPConfiguration.set("linkis.mysql.strong.security.enable", "true");
+ String baseUrl = "jdbc:mysql://127.0.0.1:10000/db_name";
+ String securityStr =
+ "allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false";
+ String url1 = "jdbc:mysql://127.0.0.1:10000/db_name";
+ Assertions.assertEquals(baseUrl, SecurityUtils.getJdbcUrl(url1));
+ String url11 = "jdbc:mysql://127.0.0.1:10000/db_name?";
+ Assertions.assertEquals(baseUrl, SecurityUtils.getJdbcUrl(url11));
+ String url2 = "jdbc:mysql://127.0.0.1:10000/db_name?k1=v1&";
+ Assertions.assertEquals(baseUrl + "?k1=v1&" + securityStr, SecurityUtils.getJdbcUrl(url2));
+ String url3 = "jdbc:mysql://127.0.0.1:10000/db_name?k1=v1&k2";
+ Assertions.assertEquals(baseUrl + "?k1=v1&" + securityStr, SecurityUtils.getJdbcUrl(url3));
}
@Test
- public void testCheckJdbcSecurityUrl() throws Exception {
- String url = "jdbc:mysql://127.0.0.1:10000/db_name";
- String newUrl = SecurityUtils.checkJdbcSecurity(url);
- Assertions.assertEquals(url, newUrl);
+ public void testRSA() {
+ String originalData = "rsa-test-str";
+ String encryptData = RSAUtils.encryptWithLinkisPublicKey(originalData);
+ String dncryptData = RSAUtils.dncryptWithLinkisPublicKey(encryptData);
+ Assertions.assertEquals(dncryptData, originalData);
+ }
- url = "jdbc:mysql://127.0.0.1:10000/db_name?";
- newUrl = SecurityUtils.checkJdbcSecurity(url);
- Assertions.assertEquals(url, newUrl);
+ @Test
+ public void testCheckJdbcConnParams() {
+ String host = "127.0.0.1";
+ Integer port = 3306;
+ String username = "test";
+ String password = "test";
+ String database = "tdb";
+ Map extraParams = new HashMap<>();
+ extraParams.put("k1", "v1");
- url = "jdbc:mysql://127.0.0.1:10000/db_name?p1=v1";
- newUrl = SecurityUtils.checkJdbcSecurity(url);
- Assertions.assertEquals(url, newUrl);
+ // match ip
+ Assertions.assertDoesNotThrow(
+ () -> {
+ SecurityUtils.checkJdbcConnParams(host, port, username, password, database, extraParams);
+ });
+ String host1 = "localhost";
+ Assertions.assertDoesNotThrow(
+ () -> {
+ SecurityUtils.checkJdbcConnParams(host1, port, username, password, database, extraParams);
+ });
- // key is not security
- url = "jdbc:mysql://127.0.0.1:10000/db_name?p1=v1&allowLocalInfile=true";
- AtomicReference atomUrl = new AtomicReference<>(url);
+ // match domain
+ String host2 = "www.apache.com";
+ Assertions.assertDoesNotThrow(
+ () -> {
+ SecurityUtils.checkJdbcConnParams(host2, port, username, password, database, extraParams);
+ });
+
+ // error host
+ String host3 = "localhost:3306";
Assertions.assertThrows(
LinkisSecurityException.class,
() -> {
- SecurityUtils.checkJdbcSecurity(atomUrl.get());
+ SecurityUtils.checkJdbcConnParams(host3, port, username, password, database, extraParams);
});
- // url encode
- url = "jdbc:mysql://127.0.0.1:10000/db_name?allowLocalInfil%65=true";
- atomUrl.set(url);
+ String host4 = "localhost:3306/test";
Assertions.assertThrows(
LinkisSecurityException.class,
() -> {
- SecurityUtils.checkJdbcSecurity(atomUrl.get());
+ SecurityUtils.checkJdbcConnParams(host4, port, username, password, database, extraParams);
});
- // value is not security
- url = "jdbc:mysql://127.0.0.1:10000/db_name?p1=allowLocalInfile";
- atomUrl.set(url);
+ // error port
Assertions.assertThrows(
LinkisSecurityException.class,
() -> {
- SecurityUtils.checkJdbcSecurity(atomUrl.get());
+ SecurityUtils.checkJdbcConnParams(host, null, username, password, database, extraParams);
});
- // contains #
- url = "jdbc:mysql://127.0.0.1:10000/db_name?p1=v1p2=v2";
- atomUrl.set(url);
+ // error username
Assertions.assertThrows(
LinkisSecurityException.class,
() -> {
- SecurityUtils.checkJdbcSecurity(atomUrl.get());
+ SecurityUtils.checkJdbcConnParams(host, port, " ", password, database, extraParams);
+ });
+ Assertions.assertThrows(
+ LinkisSecurityException.class,
+ () -> {
+ SecurityUtils.checkJdbcConnParams(host, port, null, password, database, extraParams);
});
- }
- @Test
- public void testCheckJdbcSecurityParamsMap() throws Exception {
- Map paramsMap = new HashMap<>();
- paramsMap.put("p1", "v1");
- Map newMap = SecurityUtils.checkJdbcSecurity(paramsMap);
- Assertions.assertEquals("v1", newMap.get("p1"));
+ // check database, The database name can be empty
+ Assertions.assertDoesNotThrow(
+ () -> {
+ SecurityUtils.checkJdbcConnParams(host, port, username, password, " ", extraParams);
+ });
- // key not security
- paramsMap.put("allowLocalInfil%67", "true");
- SecurityUtils.checkJdbcSecurity(paramsMap);
- Assertions.assertEquals("true", newMap.get("allowLocalInfilg"));
+ String database1 = "test?k1=v1";
+ Assertions.assertThrows(
+ LinkisSecurityException.class,
+ () -> {
+ SecurityUtils.checkJdbcConnParams(host, port, username, password, database1, extraParams);
+ });
- // key not security
- paramsMap.put("allowLocalInfile", "false");
+ // error param
+ extraParams.put("allowLoadLocalInfile", "true");
Assertions.assertThrows(
LinkisSecurityException.class,
() -> {
- SecurityUtils.checkJdbcSecurity(paramsMap);
+ SecurityUtils.checkJdbcConnParams(host, port, username, password, database, extraParams);
});
- // value not security
- paramsMap.clear();
- paramsMap.put("p1", "allowLocalInfile");
+ extraParams.clear();
+ extraParams.put("autoDeserialize", "true");
Assertions.assertThrows(
LinkisSecurityException.class,
() -> {
- SecurityUtils.checkJdbcSecurity(paramsMap);
+ SecurityUtils.checkJdbcConnParams(host, port, username, password, database, extraParams);
});
- // value not security
- paramsMap.clear();
- paramsMap.put("p1", "allowLocalInfil%65");
+ extraParams.clear();
+ extraParams.put("allowLocalInfile", "true");
Assertions.assertThrows(
LinkisSecurityException.class,
() -> {
- SecurityUtils.checkJdbcSecurity(paramsMap);
+ SecurityUtils.checkJdbcConnParams(host, port, username, password, database, extraParams);
});
- // contains #
- paramsMap.clear();
- paramsMap.put("p1#", "v1");
+ extraParams.clear();
+ extraParams.put("allowUrlInLocalInfile", "false");
Assertions.assertThrows(
LinkisSecurityException.class,
() -> {
- SecurityUtils.checkJdbcSecurity(paramsMap);
+ SecurityUtils.checkJdbcConnParams(host, port, username, password, database, extraParams);
});
- paramsMap.clear();
- paramsMap.put("p1", "v1#");
+ extraParams.clear();
+ extraParams.put("allowLocalInfil%65", "true");
+ Assertions.assertThrows(
+ LinkisSecurityException.class,
+ () -> {
+ SecurityUtils.checkJdbcConnParams(host, port, username, password, database, extraParams);
+ });
+
+ extraParams.clear();
+ extraParams.put("#", "true");
Assertions.assertThrows(
LinkisSecurityException.class,
() -> {
- SecurityUtils.checkJdbcSecurity(paramsMap);
+ SecurityUtils.checkJdbcConnParams(host, port, username, password, database, extraParams);
+ });
+
+ extraParams.clear();
+ extraParams.put("test", "#");
+ Assertions.assertThrows(
+ LinkisSecurityException.class,
+ () -> {
+ SecurityUtils.checkJdbcConnParams(host, port, username, password, database, extraParams);
+ });
+ }
+
+ @Test
+ public void testCheckJdbcConnUrl() {
+ // true
+ String url = "jdbc:mysql://127.0.0.1:10000/db_name";
+ Assertions.assertDoesNotThrow(
+ () -> {
+ SecurityUtils.checkJdbcConnUrl(url);
+ });
+ // true
+ String url1 = "jdbc:mysql://127.0.0.1:10000/db_name?";
+ Assertions.assertDoesNotThrow(
+ () -> {
+ SecurityUtils.checkJdbcConnUrl(url1);
+ });
+ // true
+ String url11 = "jdbc:mysql://127.0.0.1/db_name?";
+ Assertions.assertDoesNotThrow(
+ () -> {
+ SecurityUtils.checkJdbcConnUrl(url11);
});
+ // true
+ String url2 = "JDBC:mysql://127.0.0.1:10000/db_name?";
+ Assertions.assertDoesNotThrow(
+ () -> {
+ SecurityUtils.checkJdbcConnUrl(url2);
+ });
+ // true
+ String url21 = "JDBC:h2://127.0.0.1:10000/db_name?";
+ Assertions.assertDoesNotThrow(
+ () -> {
+ SecurityUtils.checkJdbcConnUrl(url21);
+ });
+ // true
+ String url3 = "jdbc:mysql://127.0.0.1:10000/db_name?p1=v1";
+ Assertions.assertDoesNotThrow(
+ () -> {
+ SecurityUtils.checkJdbcConnUrl(url3);
+ });
+ // false url error
+ String url33 =
+ "jdbc:mysql://127.0.0.1:10000:/db_name?jdbc:mysql://127.0.0.1:10000?allowLocalInfile=true";
+ Assertions.assertThrows(
+ LinkisSecurityException.class,
+ () -> {
+ SecurityUtils.checkJdbcConnUrl(url33);
+ });
+ // false key is not security
+ String url4 = "jdbc:mysql://127.0.0.1:10000/db_name?p1=v1&allowLocalInfile=true";
+ Assertions.assertThrows(
+ LinkisSecurityException.class,
+ () -> {
+ SecurityUtils.checkJdbcConnUrl(url4);
+ });
+
+ // false value is not security
+ String url5 = "jdbc:mysql://127.0.0.1:10000/db_name?p1=allowLocalInfile";
+ Assertions.assertThrows(
+ LinkisSecurityException.class,
+ () -> {
+ SecurityUtils.checkJdbcConnUrl(url5);
+ });
+
+ // false contains #
+ String url6 = "jdbc:mysql://127.0.0.1:10000/db_name?p1=v1p2=v2";
+ Assertions.assertThrows(
+ LinkisSecurityException.class,
+ () -> {
+ SecurityUtils.checkJdbcConnUrl(url6);
+ });
+ }
+
+ @Test
+ public void testAppendMysqlForceParamsExtraParams() {
+ Map extraParams = new HashMap<>();
+ extraParams.put("testKey", "testValue");
+ SecurityUtils.appendMysqlForceParams(extraParams);
+ Assertions.assertEquals("false", extraParams.get("allowLoadLocalInfile"));
+ Assertions.assertEquals("false", extraParams.get("autoDeserialize"));
+ Assertions.assertEquals("false", extraParams.get("allowLocalInfile"));
+ Assertions.assertEquals("false", extraParams.get("allowUrlInLocalInfile"));
+ Assertions.assertEquals("testValue", extraParams.get("testKey"));
+ Assertions.assertEquals(null, extraParams.get("otherKey"));
}
@Test
- public void testMapToString() throws Exception {
+ public void testMapToString() {
Map paramsMap = new HashMap<>();
paramsMap.put("p1", "v1");
String str = SecurityUtils.parseParamsMapToMysqlParamUrl(paramsMap);
diff --git a/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/variable/VariableOperationTest.java b/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/variable/VariableOperationTest.java
index d56c3cd2a1e..f033fe8ca99 100644
--- a/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/variable/VariableOperationTest.java
+++ b/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/variable/VariableOperationTest.java
@@ -38,21 +38,10 @@ public class VariableOperationTest {
@Test
public void testSqlFormat() throws VariableOperationFailedException {
- String jsonOld =
- "select \n"
- + "\"&{yyyy-MM}\",\n"
- + "\"&{yyyy-MM-dd HHmmss}\",\n"
- + "\"&yyyyMMddHH\",\n"
- + "\"&{yyyy-MM-dd-HH}\"";
+ String jsonOld = "select \n" + "\"&{yyyy-MM}\"";
String jsonNew = VariableOperationUtils.replaces(zonedDateTime, jsonOld);
System.out.println(jsonNew);
- assertEquals(
- jsonNew,
- "select \n"
- + "\"2022-04\",\n"
- + "\"2022-04-02 173507\",\n"
- + "\"&yyyyMMddHH\",\n"
- + "\"2022-04-02-17\"");
+ assertEquals(jsonNew, "select \n" + "\"2022-04\"");
}
@Test
diff --git a/linkis-commons/linkis-common/src/test/resources/linkis.properties b/linkis-commons/linkis-common/src/test/resources/linkis.properties
new file mode 100644
index 00000000000..d6e47523f29
--- /dev/null
+++ b/linkis-commons/linkis-common/src/test/resources/linkis.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+linkis.jobhistory.error.msg.tip=properties支持中文
+linkis.test.error.conf=123
+linkis.test.error.conf2= 456
\ No newline at end of file
diff --git a/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/conf/ConfigurationTest.scala b/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/conf/ConfigurationTest.scala
index ee1102c91cf..33c8229a4b4 100644
--- a/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/conf/ConfigurationTest.scala
+++ b/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/conf/ConfigurationTest.scala
@@ -27,4 +27,11 @@ class ConfigurationTest {
Assertions.assertFalse(Configuration.isAdmin("HaDooop"))
}
+ @Test private[conf] def testFormatValue(): Unit = {
+ val confvalue = CommonVars[Int]("linkis.test.error.conf", 456).getValue
+ val confvalue2 = CommonVars[Int]("linkis.test.error.conf2", 789).getValue
+ Assertions.assertTrue(123 == confvalue)
+ Assertions.assertTrue(456 == confvalue2)
+ }
+
}
diff --git a/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/VariableUtilsTest.scala b/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/VariableUtilsTest.scala
index c0d4ad1d618..892731e0d5f 100644
--- a/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/VariableUtilsTest.scala
+++ b/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/VariableUtilsTest.scala
@@ -22,6 +22,8 @@ import org.apache.linkis.common.variable.DateTypeUtils.{getCurHour, getToday}
import java.util
+import scala.collection.mutable
+
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
@@ -40,6 +42,10 @@ class VariableUtilsTest {
|'${run_half_year_begin-1}' as run_half_year_begin_sub1,
|'${run_half_year_begin_std}' as run_half_year_begin_std,
|'${run_half_year_end}' as run_half_year_end,
+ |'${run_last_mon_now}' as run_last_mon_now,
+ |'${run_last_mon_now_std}' as run_last_mon_now_std,
+ |'${submit_user}' as submit_user,
+ |'${execute_user}' as execute_user,
|'${run_today_h+12}' as run_today_h_add1""".stripMargin
val run_date = new CustomDateType(run_date_str, false)
val dateType = DateType(run_date)
@@ -57,10 +63,31 @@ class VariableUtilsTest {
|'20190701' as run_half_year_begin_sub1,
|'2020-01-01' as run_half_year_begin_std,
|'20200630' as run_half_year_end,
+ |'202001' as run_last_mon_now,
+ |'2020-01' as run_last_mon_now_std,
+ |'hadoop' as submit_user,
+ |'hadoop' as execute_user,
|'${hourTypeRes}' as run_today_h_add1""".stripMargin
val varMap = new util.HashMap[String, String]()
varMap.put("run_date", run_date_str)
+ varMap.put("execute_user", "hadoop")
+ varMap.put("submit_user", "hadoop")
assertEquals(VariableUtils.replace(sql, "sql", varMap), resSql)
}
+ @Test
+ def testGetCustomVar: Unit = {
+ var scalaCode = "" +
+ "-------@set globalpara=60--------\n" +
+ "--@set globalpara2=66\n" +
+ "select ${globalpara} as globalpara,\n" +
+ "-- ${globalpara1} as globalpara1, \n" +
+ "${globalpara2} as globalpara2;\n"
+ var pythonCode = ""
+
+ val nameAndValue: mutable.Map[String, String] =
+ VariableUtils.getCustomVar(scalaCode, CodeAndRunTypeUtils.LANGUAGE_TYPE_SQL);
+ assertEquals(nameAndValue.size, 2)
+ }
+
}
diff --git a/linkis-commons/linkis-hadoop-common/src/main/java/org/apache/linkis/hadoop/common/utils/KerberosUtils.java b/linkis-commons/linkis-hadoop-common/src/main/java/org/apache/linkis/hadoop/common/utils/KerberosUtils.java
index 6c5c125f6d8..608d5fad3c4 100644
--- a/linkis-commons/linkis-hadoop-common/src/main/java/org/apache/linkis/hadoop/common/utils/KerberosUtils.java
+++ b/linkis-commons/linkis-hadoop-common/src/main/java/org/apache/linkis/hadoop/common/utils/KerberosUtils.java
@@ -17,12 +17,14 @@
package org.apache.linkis.hadoop.common.utils;
+import org.apache.linkis.common.utils.Utils;
import org.apache.linkis.hadoop.common.conf.HadoopConf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +35,10 @@
public class KerberosUtils {
private static final Logger LOG = LoggerFactory.getLogger(KerberosUtils.class);
+ private static boolean kerberosRefreshStarted = false;
+
+ private static final Object kerberosRefreshLock = new Object();
+
private KerberosUtils() {}
private static Configuration createKerberosSecurityConfiguration() {
@@ -81,20 +87,20 @@ public static boolean runRefreshKerberosLogin() {
public static Long getKerberosRefreshInterval() {
long refreshInterval;
- String refreshIntervalString = "86400000";
- // defined in linkis-env.sh, if not initialized then the default value is 86400000 ms (1d).
- if (System.getenv("LINKIS_JDBC_KERBEROS_REFRESH_INTERVAL") != null) {
- refreshIntervalString = System.getenv("LINKIS_JDBC_KERBEROS_REFRESH_INTERVAL");
+ String refreshIntervalString = "43200";
+ // defined in linkis-env.sh, if not initialized then the default value is 43200 s (0.5d).
+ if (System.getenv("LINKIS_KERBEROS_REFRESH_INTERVAL") != null) {
+ refreshIntervalString = System.getenv("LINKIS_KERBEROS_REFRESH_INTERVAL");
}
try {
refreshInterval = Long.parseLong(refreshIntervalString);
} catch (NumberFormatException e) {
LOG.error(
- "Cannot get time in MS for the given string, "
+ "Cannot get time in S for the given string, "
+ refreshIntervalString
- + " defaulting to 86400000 ",
+ + " defaulting to 43200 ",
e);
- refreshInterval = 86400000L;
+ refreshInterval = 43200;
}
return refreshInterval;
}
@@ -102,14 +108,13 @@ public static Long getKerberosRefreshInterval() {
public static Integer kinitFailTimesThreshold() {
Integer kinitFailThreshold = 5;
// defined in linkis-env.sh, if not initialized then the default value is 5.
- if (System.getenv("LINKIS_JDBC_KERBEROS_KINIT_FAIL_THRESHOLD") != null) {
+ if (System.getenv("LINKIS_KERBEROS_KINIT_FAIL_THRESHOLD") != null) {
try {
- kinitFailThreshold =
- new Integer(System.getenv("LINKIS_JDBC_KERBEROS_KINIT_FAIL_THRESHOLD"));
+ kinitFailThreshold = new Integer(System.getenv("LINKIS_KERBEROS_KINIT_FAIL_THRESHOLD"));
} catch (Exception e) {
LOG.error(
"Cannot get integer value from the given string, "
- + System.getenv("LINKIS_JDBC_KERBEROS_KINIT_FAIL_THRESHOLD")
+ + System.getenv("LINKIS_KERBEROS_KINIT_FAIL_THRESHOLD")
+ " defaulting to "
+ kinitFailThreshold,
e);
@@ -117,4 +122,71 @@ public static Integer kinitFailTimesThreshold() {
}
return kinitFailThreshold;
}
+
+ public static void checkStatus() {
+ try {
+ LOG.info("isSecurityEnabled:" + UserGroupInformation.isSecurityEnabled());
+ LOG.info(
+ "userAuthenticationMethod:"
+ + UserGroupInformation.getLoginUser().getAuthenticationMethod());
+ UserGroupInformation loginUsr = UserGroupInformation.getLoginUser();
+ UserGroupInformation curUsr = UserGroupInformation.getCurrentUser();
+ LOG.info("LoginUser: " + loginUsr);
+ LOG.info("CurrentUser: " + curUsr);
+ if (curUsr == null) {
+ LOG.info("CurrentUser is null");
+ } else {
+ LOG.info("CurrentUser is not null");
+ }
+ assert curUsr != null;
+ if (loginUsr.getClass() != curUsr.getClass()) {
+ LOG.info("getClass() is different");
+ } else {
+ LOG.info("getClass() is same");
+ }
+ if (loginUsr.equals(curUsr)) {
+ LOG.info("subject is equal");
+ } else {
+ LOG.info("subject is not equal");
+ }
+ } catch (Exception e) {
+ LOG.error("UGI error: ", e.getMessage());
+ }
+ }
+
+ public static void startKerberosRefreshThread() {
+
+ if (kerberosRefreshStarted || !HadoopConf.KERBEROS_ENABLE()) {
+ LOG.warn(
+ "kerberos refresh thread had start or not kerberos {}", HadoopConf.HDFS_ENABLE_CACHE());
+ return;
+ }
+ synchronized (kerberosRefreshLock) {
+ if (kerberosRefreshStarted) {
+ LOG.warn("kerberos refresh thread had start");
+ return;
+ }
+ kerberosRefreshStarted = true;
+ LOG.info("kerberos Refresh tread started");
+ Utils.defaultScheduler()
+ .scheduleAtFixedRate(
+ () -> {
+ try {
+ checkStatus();
+ if (UserGroupInformation.isLoginKeytabBased()) {
+ LOG.info("Trying re-login from keytab");
+ UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+ } else if (UserGroupInformation.isLoginTicketBased()) {
+ LOG.info("Trying re-login from ticket cache");
+ UserGroupInformation.getLoginUser().reloginFromTicketCache();
+ }
+ } catch (Exception e) {
+ LOG.error("Unable to re-login", e);
+ }
+ },
+ getKerberosRefreshInterval(),
+ getKerberosRefreshInterval(),
+ TimeUnit.SECONDS);
+ }
+ }
}
diff --git a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
index fc7f91504f9..1a75418dfc3 100644
--- a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
+++ b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
@@ -23,26 +23,52 @@ object HadoopConf {
val HADOOP_ROOT_USER = CommonVars("wds.linkis.hadoop.root.user", "hadoop")
- val KERBEROS_ENABLE = CommonVars("wds.linkis.keytab.enable", false)
+ val KERBEROS_ENABLE = CommonVars("wds.linkis.keytab.enable", false).getValue
+
+ val KERBEROS_ENABLE_MAP =
+ CommonVars("linkis.keytab.enable.map", "cluster1=false,cluster2=true")
val KEYTAB_FILE = CommonVars("wds.linkis.keytab.file", "/appcom/keytab/")
+ val LINKIS_KEYTAB_FILE = CommonVars("linkis.copy.keytab.file", "/mnt/bdap/keytab/")
+
+ val EXTERNAL_KEYTAB_FILE_PREFIX =
+ CommonVars("linkis.external.keytab.file.prefix", "/appcom/config/external-conf/keytab")
+
val KEYTAB_HOST = CommonVars("wds.linkis.keytab.host", "127.0.0.1")
+ val KEYTAB_HOST_MAP =
+ CommonVars("linkis.keytab.host.map", "cluster1=127.0.0.2,cluster2=127.0.0.3")
+
val KEYTAB_HOST_ENABLED = CommonVars("wds.linkis.keytab.host.enabled", false)
val KEYTAB_PROXYUSER_ENABLED = CommonVars("wds.linkis.keytab.proxyuser.enable", false)
val KEYTAB_PROXYUSER_SUPERUSER = CommonVars("wds.linkis.keytab.proxyuser.superuser", "hadoop")
+ val KEYTAB_PROXYUSER_SUPERUSER_MAP =
+ CommonVars("linkis.keytab.proxyuser.superuser.map", "cluster1=hadoop1,cluster2=hadoop2")
+
val hadoopConfDir =
CommonVars("hadoop.config.dir", CommonVars("HADOOP_CONF_DIR", "").getValue).getValue
val HADOOP_EXTERNAL_CONF_DIR_PREFIX =
CommonVars("wds.linkis.hadoop.external.conf.dir.prefix", "/appcom/config/external-conf/hadoop")
+ /**
+ * Whether to close the hdfs underlying cache or turn it off if it is ture
+ */
+ val FS_CACHE_DISABLE =
+ CommonVars[java.lang.Boolean]("wds.linkis.fs.hdfs.impl.disable.cache", false)
+
val HDFS_ENABLE_CACHE = CommonVars("wds.linkis.hadoop.hdfs.cache.enable", false).getValue
+ val HDFS_ENABLE_CACHE_CLOSE =
+ CommonVars("linkis.hadoop.hdfs.cache.close.enable", true).getValue
+
+ val HDFS_ENABLE_NOT_CLOSE_USERS =
+ CommonVars("linkis.hadoop.hdfs.cache.not.close.users", "hadoop").getValue
+
val HDFS_ENABLE_CACHE_IDLE_TIME =
CommonVars("wds.linkis.hadoop.hdfs.cache.idle.time", 3 * 60 * 1000).getValue
diff --git a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/entity/HDFSFileSystemContainer.scala b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/entity/HDFSFileSystemContainer.scala
index dfbc5c9347e..f87f89393e9 100644
--- a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/entity/HDFSFileSystemContainer.scala
+++ b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/entity/HDFSFileSystemContainer.scala
@@ -21,7 +21,7 @@ import org.apache.linkis.hadoop.common.conf.HadoopConf
import org.apache.hadoop.fs.FileSystem
-class HDFSFileSystemContainer(fs: FileSystem, user: String) {
+class HDFSFileSystemContainer(fs: FileSystem, user: String, label: String) {
private var lastAccessTime: Long = System.currentTimeMillis()
@@ -31,6 +31,8 @@ class HDFSFileSystemContainer(fs: FileSystem, user: String) {
def getUser: String = this.user
+ def getLabel: String = this.label
+
def getLastAccessTime: Long = this.lastAccessTime
def updateLastAccessTime: Unit = {
@@ -46,8 +48,7 @@ class HDFSFileSystemContainer(fs: FileSystem, user: String) {
def canRemove(): Boolean = {
val currentTime = System.currentTimeMillis()
val idleTime = currentTime - this.lastAccessTime
- idleTime > HadoopConf.HDFS_ENABLE_CACHE_MAX_TIME || (System
- .currentTimeMillis() - this.lastAccessTime > HadoopConf.HDFS_ENABLE_CACHE_IDLE_TIME) && count <= 0
+ idleTime > HadoopConf.HDFS_ENABLE_CACHE_MAX_TIME || ((idleTime > HadoopConf.HDFS_ENABLE_CACHE_IDLE_TIME) && count <= 0)
}
}
diff --git a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
index 922b5f6a8f6..f6d91edbad2 100644
--- a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
+++ b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
@@ -17,7 +17,8 @@
package org.apache.linkis.hadoop.common.utils
-import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.common.conf.Configuration.LINKIS_KEYTAB_SWITCH
+import org.apache.linkis.common.utils.{AESUtils, Logging, Utils}
import org.apache.linkis.hadoop.common.conf.HadoopConf
import org.apache.linkis.hadoop.common.conf.HadoopConf._
import org.apache.linkis.hadoop.common.entity.HDFSFileSystemContainer
@@ -29,21 +30,34 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
import java.io.File
-import java.nio.file.Paths
+import java.nio.file.{Files, Paths}
+import java.nio.file.attribute.PosixFilePermissions
import java.security.PrivilegedExceptionAction
-import java.util.concurrent.TimeUnit
+import java.util.Base64
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._
object HDFSUtils extends Logging {
private val fileSystemCache: java.util.Map[String, HDFSFileSystemContainer] =
- new java.util.HashMap[String, HDFSFileSystemContainer]()
+ new ConcurrentHashMap[String, HDFSFileSystemContainer]()
private val LOCKER_SUFFIX = "_HDFS"
+ private val DEFAULT_CACHE_LABEL = "default"
+ private val JOINT = "_"
+ val KEYTAB_SUFFIX = ".keytab"
- if (HadoopConf.HDFS_ENABLE_CACHE) {
- logger.info("HDFS Cache enabled ")
+ private val count = new AtomicLong
+
+ /**
+ * For FS opened with public tenants, we should not perform close action, but should close only
+ * when hdfsfilesystem encounters closed problem
+ * 对于使用公共租户开启的FS,我们不应该去执行close动作,应该由hdfsfilesystem遇到closed问题时才进行关闭
+ */
+ if (HadoopConf.HDFS_ENABLE_CACHE && HadoopConf.HDFS_ENABLE_CACHE_CLOSE) {
+ logger.info("HDFS Cache clear enabled ")
Utils.defaultScheduler.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = Utils.tryAndWarn {
@@ -58,8 +72,13 @@ object HDFSUtils extends Logging {
.foreach { hdfsFileSystemContainer =>
val locker = hdfsFileSystemContainer.getUser + LOCKER_SUFFIX
locker.intern() synchronized {
- if (hdfsFileSystemContainer.canRemove()) {
- fileSystemCache.remove(hdfsFileSystemContainer.getUser)
+ if (
+ hdfsFileSystemContainer.canRemove() && !HadoopConf.HDFS_ENABLE_NOT_CLOSE_USERS
+ .contains(hdfsFileSystemContainer.getUser)
+ ) {
+ fileSystemCache.remove(
+ hdfsFileSystemContainer.getUser + JOINT + hdfsFileSystemContainer.getLabel
+ )
IOUtils.closeQuietly(hdfsFileSystemContainer.getFileSystem)
logger.info(
s"user${hdfsFileSystemContainer.getUser} to remove hdfsFileSystemContainer,because hdfsFileSystemContainer can remove"
@@ -116,79 +135,153 @@ object HDFSUtils extends Logging {
)
def getHDFSRootUserFileSystem(conf: org.apache.hadoop.conf.Configuration): FileSystem =
- getHDFSUserFileSystem(HADOOP_ROOT_USER.getValue, conf)
+ getHDFSUserFileSystem(HADOOP_ROOT_USER.getValue, null, conf)
- def getHDFSUserFileSystem(userName: String): FileSystem =
- getHDFSUserFileSystem(userName, getConfiguration(userName))
+ /**
+ * If the cache switch is turned on, fs will be obtained from the cache first
+ * @param userName
+ * @return
+ */
+ def getHDFSUserFileSystem(userName: String): FileSystem = {
+ getHDFSUserFileSystem(userName, null)
+ }
+
+ def getHDFSUserFileSystem(userName: String, label: String): FileSystem = {
+
+ if (HadoopConf.HDFS_ENABLE_CACHE) {
+ val cacheLabel = if (label == null) DEFAULT_CACHE_LABEL else label
+ val cacheKey = userName + JOINT + cacheLabel
+ val locker = userName + LOCKER_SUFFIX
+ locker.intern().synchronized {
+ if (fileSystemCache.containsKey(cacheKey)) {
+ val hdfsFileSystemContainer = fileSystemCache.get(cacheKey)
+ hdfsFileSystemContainer.addAccessCount()
+ hdfsFileSystemContainer.updateLastAccessTime
+ hdfsFileSystemContainer.getFileSystem
+ } else {
+ getHDFSUserFileSystem(userName, label, getConfigurationByLabel(userName, label))
+ }
+ }
+ } else {
+ getHDFSUserFileSystem(userName, label, getConfigurationByLabel(userName, label))
+ }
+ }
def getHDFSUserFileSystem(
userName: String,
+ label: String,
conf: org.apache.hadoop.conf.Configuration
- ): FileSystem = if (HadoopConf.HDFS_ENABLE_CACHE) {
- val locker = userName + LOCKER_SUFFIX
- locker.intern().synchronized {
- val hdfsFileSystemContainer = if (fileSystemCache.containsKey(userName)) {
- fileSystemCache.get(userName)
- } else {
- val newHDFSFileSystemContainer =
- new HDFSFileSystemContainer(createFileSystem(userName, conf), userName)
- fileSystemCache.put(userName, newHDFSFileSystemContainer)
- newHDFSFileSystemContainer
+ ): FileSystem = {
+
+ if (HadoopConf.FS_CACHE_DISABLE.getValue && null != conf) {
+ conf.set("fs.hdfs.impl.disable.cache", "true")
+ }
+ if (HadoopConf.HDFS_ENABLE_CACHE) {
+ val locker = userName + LOCKER_SUFFIX
+ val cacheLabel = if (label == null) DEFAULT_CACHE_LABEL else label
+ val cacheKey = userName + JOINT + cacheLabel
+ locker.intern().synchronized {
+ val hdfsFileSystemContainer = if (fileSystemCache.containsKey(cacheKey)) {
+ fileSystemCache.get(cacheKey)
+ } else {
+ // we use cacheLabel to create HDFSFileSystemContainer, and in the rest part of HDFSUtils, we consistently
+ // use the same cacheLabel to operate HDFSFileSystemContainer, like close or remove.
+ // At the same time, we don't want to change the behavior of createFileSystem which is out of HDFSUtils,
+ // so we continue to use the original label to createFileSystem.
+ val newHDFSFileSystemContainer =
+ new HDFSFileSystemContainer(
+ createFileSystem(userName, label, conf),
+ userName,
+ cacheLabel
+ )
+ fileSystemCache.put(cacheKey, newHDFSFileSystemContainer)
+ newHDFSFileSystemContainer
+ }
+ hdfsFileSystemContainer.addAccessCount()
+ hdfsFileSystemContainer.updateLastAccessTime
+ hdfsFileSystemContainer.getFileSystem
}
- hdfsFileSystemContainer.addAccessCount()
- hdfsFileSystemContainer.updateLastAccessTime
- hdfsFileSystemContainer.getFileSystem
+ } else {
+ createFileSystem(userName, label, conf)
}
- } else {
- createFileSystem(userName, conf)
}
def createFileSystem(userName: String, conf: org.apache.hadoop.conf.Configuration): FileSystem =
- getUserGroupInformation(userName)
+ createFileSystem(userName, null, conf)
+
+ def createFileSystem(
+ userName: String,
+ label: String,
+ conf: org.apache.hadoop.conf.Configuration
+ ): FileSystem = {
+ val createCount = count.getAndIncrement()
+ logger.info(s"user ${userName} to create Fs, create time ${createCount}")
+ getUserGroupInformation(userName, label)
.doAs(new PrivilegedExceptionAction[FileSystem] {
- // scalastyle:off FileSystemGet
- def run: FileSystem = FileSystem.get(conf)
- // scalastyle:on FileSystemGet
+ def run: FileSystem = FileSystem.newInstance(conf)
})
+ }
def closeHDFSFIleSystem(fileSystem: FileSystem, userName: String): Unit =
if (null != fileSystem && StringUtils.isNotBlank(userName)) {
- closeHDFSFIleSystem(fileSystem, userName, false)
+ closeHDFSFIleSystem(fileSystem, userName, null, false)
}
+ def closeHDFSFIleSystem(fileSystem: FileSystem, userName: String, label: String): Unit =
+ closeHDFSFIleSystem(fileSystem, userName, label, false)
+
def closeHDFSFIleSystem(fileSystem: FileSystem, userName: String, isForce: Boolean): Unit =
+ closeHDFSFIleSystem(fileSystem, userName, null, isForce)
+
+ def closeHDFSFIleSystem(
+ fileSystem: FileSystem,
+ userName: String,
+ label: String,
+ isForce: Boolean
+ ): Unit =
if (null != fileSystem && StringUtils.isNotBlank(userName)) {
- if (HadoopConf.HDFS_ENABLE_CACHE) {
- val hdfsFileSystemContainer = fileSystemCache.get(userName)
- if (null != hdfsFileSystemContainer) {
- val locker = userName + LOCKER_SUFFIX
+ val locker = userName + LOCKER_SUFFIX
+ if (HadoopConf.HDFS_ENABLE_CACHE) locker.intern().synchronized {
+ val cacheLabel = if (label == null) DEFAULT_CACHE_LABEL else label
+ val cacheKey = userName + JOINT + cacheLabel
+ val hdfsFileSystemContainer = fileSystemCache.get(cacheKey)
+ if (
+ null != hdfsFileSystemContainer && fileSystem == hdfsFileSystemContainer.getFileSystem
+ ) {
if (isForce) {
- locker synchronized fileSystemCache.remove(hdfsFileSystemContainer.getUser)
+ fileSystemCache.remove(hdfsFileSystemContainer.getUser)
IOUtils.closeQuietly(hdfsFileSystemContainer.getFileSystem)
logger.info(
s"user${hdfsFileSystemContainer.getUser} to Force remove hdfsFileSystemContainer"
)
} else {
- locker synchronized hdfsFileSystemContainer.minusAccessCount()
+ hdfsFileSystemContainer.minusAccessCount()
}
+ } else {
+ IOUtils.closeQuietly(fileSystem)
}
- } else {
+ }
+ else {
IOUtils.closeQuietly(fileSystem)
}
}
def getUserGroupInformation(userName: String): UserGroupInformation = {
- if (KERBEROS_ENABLE.getValue) {
- if (!KEYTAB_PROXYUSER_ENABLED.getValue) {
- val path = new File(KEYTAB_FILE.getValue, userName + ".keytab").getPath
- val user = getKerberosUser(userName)
- UserGroupInformation.setConfiguration(getConfiguration(userName))
+ getUserGroupInformation(userName, null);
+ }
+
+ def getUserGroupInformation(userName: String, label: String): UserGroupInformation = {
+ if (isKerberosEnabled(label)) {
+ if (!isKeytabProxyUserEnabled(label)) {
+ val path = getLinkisUserKeytabFile(userName, label)
+ val user = getKerberosUser(userName, label)
+ UserGroupInformation.setConfiguration(getConfigurationByLabel(userName, label))
UserGroupInformation.loginUserFromKeytabAndReturnUGI(user, path)
} else {
- val superUser = KEYTAB_PROXYUSER_SUPERUSER.getValue
- val path = new File(KEYTAB_FILE.getValue, superUser + ".keytab").getPath
- val user = getKerberosUser(superUser)
- UserGroupInformation.setConfiguration(getConfiguration(superUser))
+ val superUser = getKeytabSuperUser(label)
+ val path = getLinkisUserKeytabFile(superUser, label)
+ val user = getKerberosUser(superUser, label)
+ UserGroupInformation.setConfiguration(getConfigurationByLabel(superUser, label))
UserGroupInformation.createProxyUser(
userName,
UserGroupInformation.loginUserFromKeytabAndReturnUGI(user, path)
@@ -199,12 +292,108 @@ object HDFSUtils extends Logging {
}
}
- def getKerberosUser(userName: String): String = {
+ def isKerberosEnabled(label: String): Boolean = {
+ if (label == null) {
+ KERBEROS_ENABLE
+ } else {
+ kerberosValueMapParser(KERBEROS_ENABLE_MAP.getValue).get(label).contains("true")
+ }
+ }
+
+ def isKeytabProxyUserEnabled(label: String): Boolean = {
+ if (label == null) {
+ KEYTAB_PROXYUSER_ENABLED.getValue
+ } else {
+ kerberosValueMapParser(KEYTAB_PROXYUSER_SUPERUSER_MAP.getValue).contains(label)
+ }
+ }
+
+ def getKerberosUser(userName: String, label: String): String = {
var user = userName
- if (KEYTAB_HOST_ENABLED.getValue) {
- user = user + "/" + KEYTAB_HOST.getValue
+ if (label == null) {
+ if (KEYTAB_HOST_ENABLED.getValue) {
+ user = user + "/" + KEYTAB_HOST.getValue
+ }
+ } else {
+ val hostMap = kerberosValueMapParser(KEYTAB_HOST_MAP.getValue)
+ if (hostMap.contains(label)) {
+ user = user + "/" + hostMap(label)
+ }
}
user
}
+ def getKeytabSuperUser(label: String): String = {
+ if (label == null) {
+ KEYTAB_PROXYUSER_SUPERUSER.getValue
+ } else {
+ kerberosValueMapParser(KEYTAB_PROXYUSER_SUPERUSER_MAP.getValue)(label)
+ }
+ }
+
+ def getKeytabPath(label: String): String = {
+ if (label == null) {
+ KEYTAB_FILE.getValue
+ } else {
+ val prefix = if (EXTERNAL_KEYTAB_FILE_PREFIX.getValue.endsWith("/")) {
+ EXTERNAL_KEYTAB_FILE_PREFIX.getValue
+ } else {
+ EXTERNAL_KEYTAB_FILE_PREFIX.getValue + "/"
+ }
+ prefix + label
+ }
+ }
+
+ def getLinkisKeytabPath(label: String): String = {
+ if (label == null) {
+ LINKIS_KEYTAB_FILE.getValue
+ } else {
+ val prefix = if (EXTERNAL_KEYTAB_FILE_PREFIX.getValue.endsWith("/")) {
+ EXTERNAL_KEYTAB_FILE_PREFIX.getValue
+ } else {
+ EXTERNAL_KEYTAB_FILE_PREFIX.getValue + "/"
+ }
+ prefix + label
+ }
+ }
+
+ private def kerberosValueMapParser(configV: String): Map[String, String] = {
+ val confDelimiter = ","
+ if (configV == null || "".equals(configV)) {
+ Map()
+ } else {
+ configV
+ .split(confDelimiter)
+ .filter(x => x != null && !"".equals(x))
+ .map(x => {
+ val confArr = x.split("=")
+ if (confArr.length == 2) {
+ (confArr(0).trim, confArr(1).trim)
+ } else null
+ })
+ .filter(kerberosValue =>
+ kerberosValue != null && StringUtils.isNotBlank(
+ kerberosValue._1
+ ) && null != kerberosValue._2
+ )
+ .toMap
+ }
+ }
+
+ private def getLinkisUserKeytabFile(userName: String, label: String): String = {
+ val path = if (LINKIS_KEYTAB_SWITCH) {
+ // 读取文件
+ val byte = Files.readAllBytes(Paths.get(getLinkisKeytabPath(label), userName + KEYTAB_SUFFIX))
+ // 加密内容// 加密内容
+ val encryptedContent = AESUtils.decrypt(byte, AESUtils.PASSWORD)
+ val tempFile = Files.createTempFile(userName, KEYTAB_SUFFIX)
+ Files.setPosixFilePermissions(tempFile, PosixFilePermissions.fromString("rw-------"))
+ Files.write(tempFile, encryptedContent)
+ tempFile.toString
+ } else {
+ new File(getKeytabPath(label), userName + KEYTAB_SUFFIX).getPath
+ }
+ path
+ }
+
}
diff --git a/linkis-commons/linkis-hadoop-common/src/test/java/org/apache/linkis/hadoop/common/utils/KerberosUtilsTest.java b/linkis-commons/linkis-hadoop-common/src/test/java/org/apache/linkis/hadoop/common/utils/KerberosUtilsTest.java
deleted file mode 100644
index b84988a74a2..00000000000
--- a/linkis-commons/linkis-hadoop-common/src/test/java/org/apache/linkis/hadoop/common/utils/KerberosUtilsTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.hadoop.common.utils;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-
-public class KerberosUtilsTest {
-
- @Test
- @DisplayName("getKerberosRefreshIntervalTest")
- public void getKerberosRefreshIntervalTest() {
-
- Long refreshInterval = KerberosUtils.getKerberosRefreshInterval();
- Assertions.assertTrue(86400000L == refreshInterval.longValue());
- }
-
- @Test
- @DisplayName("kinitFailTimesThresholdTest")
- public void kinitFailTimesThresholdTest() {
-
- Integer timesThreshold = KerberosUtils.kinitFailTimesThreshold();
- Assertions.assertTrue(5 == timesThreshold.intValue());
- }
-}
diff --git a/linkis-commons/linkis-hadoop-common/src/test/scala/org/apache/linkis/hadoop/common/conf/HadoopConfTest.scala b/linkis-commons/linkis-hadoop-common/src/test/scala/org/apache/linkis/hadoop/common/conf/HadoopConfTest.scala
index 44ca1dabcb9..7c2c7b38355 100644
--- a/linkis-commons/linkis-hadoop-common/src/test/scala/org/apache/linkis/hadoop/common/conf/HadoopConfTest.scala
+++ b/linkis-commons/linkis-hadoop-common/src/test/scala/org/apache/linkis/hadoop/common/conf/HadoopConfTest.scala
@@ -26,7 +26,7 @@ class HadoopConfTest {
def constTest(): Unit = {
Assertions.assertEquals("hadoop", HadoopConf.HADOOP_ROOT_USER.getValue)
- Assertions.assertFalse(HadoopConf.KERBEROS_ENABLE.getValue)
+ Assertions.assertFalse(HadoopConf.KERBEROS_ENABLE)
Assertions.assertEquals("/appcom/keytab/", HadoopConf.KEYTAB_FILE.getValue)
Assertions.assertEquals("127.0.0.1", HadoopConf.KEYTAB_HOST.getValue)
Assertions.assertFalse(HadoopConf.KEYTAB_HOST_ENABLED.getValue)
diff --git a/linkis-commons/linkis-httpclient/pom.xml b/linkis-commons/linkis-httpclient/pom.xml
index 473b591a085..1951e3cd4f2 100644
--- a/linkis-commons/linkis-httpclient/pom.xml
+++ b/linkis-commons/linkis-httpclient/pom.xml
@@ -43,30 +43,6 @@
${httpmime.version}