diff --git a/common/src/main/java/org/tron/common/es/ExecutorServiceManager.java b/common/src/main/java/org/tron/common/es/ExecutorServiceManager.java index 779a8edf75d..fb845ec211c 100644 --- a/common/src/main/java/org/tron/common/es/ExecutorServiceManager.java +++ b/common/src/main/java/org/tron/common/es/ExecutorServiceManager.java @@ -4,11 +4,14 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinWorkerThread; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import org.tron.common.exit.ExitManager; @@ -35,6 +38,22 @@ public static ScheduledExecutorService newSingleThreadScheduledExecutor(String n new ThreadFactoryBuilder().setNameFormat(name).setDaemon(isDaemon).build()); } + public static ForkJoinPool newForkJoinPool(String name, int parallelism) { + return newForkJoinPool(name, parallelism, false); + } + + public static ForkJoinPool newForkJoinPool(String name, int parallelism, boolean isDaemon) { + AtomicInteger counter = new AtomicInteger(0); + ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool -> { + ForkJoinWorkerThread thread = + ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); + thread.setName(name + "-" + counter.getAndIncrement()); + thread.setDaemon(isDaemon); + return thread; + }; + return new ForkJoinPool(parallelism, factory, null, false); + } + public static ExecutorService newFixedThreadPool(String name, int fixThreads) { return newFixedThreadPool(name, fixThreads, false); } diff --git a/common/src/main/java/org/tron/common/parameter/CommonParameter.java b/common/src/main/java/org/tron/common/parameter/CommonParameter.java index 0028a5d50d0..15863d680fe 100644 --- a/common/src/main/java/org/tron/common/parameter/CommonParameter.java +++ b/common/src/main/java/org/tron/common/parameter/CommonParameter.java @@ -483,6 +483,9 @@ public class CommonParameter { @Getter @Setter public int jsonRpcMaxBlockFilterNum = 50000; + @Getter + @Setter + public int jsonRpcMaxLogFilterNum = 20000; @Getter @Setter diff --git a/common/src/main/java/org/tron/core/config/args/NodeConfig.java b/common/src/main/java/org/tron/core/config/args/NodeConfig.java index 620152a907a..805219ef701 100644 --- a/common/src/main/java/org/tron/core/config/args/NodeConfig.java +++ b/common/src/main/java/org/tron/core/config/args/NodeConfig.java @@ -310,6 +310,7 @@ public void setHttpPBFTPort(int v) { private int maxBlockRange = 5000; private int maxSubTopics = 1000; private int maxBlockFilterNum = 50000; + private int maxLogFilterNum = 20000; private long maxMessageSize = 4194304; } diff --git a/common/src/main/resources/reference.conf b/common/src/main/resources/reference.conf index 63e5d86a4af..89ea662bc58 100644 --- a/common/src/main/resources/reference.conf +++ b/common/src/main/resources/reference.conf @@ -393,9 +393,12 @@ node { # Maximum topics within a topic criteria, >0 otherwise no limit maxSubTopics = 1000 - # Maximum number for blockFilter + # Maximum number for blockFilter. >0 otherwise no limit maxBlockFilterNum = 50000 + # Maximum number of concurrent eth_newFilter registrations, >0 otherwise no limit + maxLogFilterNum = 20000 + # Maximum JSON-RPC request body size, default 4MB. Independent from rpc.maxMessageSize. maxMessageSize = 4M } diff --git a/framework/src/main/java/org/tron/common/logsfilter/capsule/BlockFilterCapsule.java b/framework/src/main/java/org/tron/common/logsfilter/capsule/BlockFilterCapsule.java index 9cf3c0c690e..e0cfb6d4433 100644 --- a/framework/src/main/java/org/tron/common/logsfilter/capsule/BlockFilterCapsule.java +++ b/framework/src/main/java/org/tron/common/logsfilter/capsule/BlockFilterCapsule.java @@ -1,7 +1,5 @@ package org.tron.common.logsfilter.capsule; -import static org.tron.core.services.jsonrpc.TronJsonRpcImpl.handleBLockFilter; - import lombok.Getter; import lombok.Setter; import lombok.ToString; @@ -20,8 +18,7 @@ public class BlockFilterCapsule extends FilterTriggerCapsule { private boolean solidified; public BlockFilterCapsule(BlockCapsule block, boolean solidified) { - blockHash = block.getBlockId().toString(); - this.solidified = solidified; + this(block.getBlockId().toString(), solidified); } public BlockFilterCapsule(String blockHash, boolean solidified) { @@ -29,10 +26,4 @@ public BlockFilterCapsule(String blockHash, boolean solidified) { this.solidified = solidified; } - @Override - public void processFilterTrigger() { - handleBLockFilter(this); - } - } - diff --git a/framework/src/main/java/org/tron/common/logsfilter/capsule/FilterTriggerCapsule.java b/framework/src/main/java/org/tron/common/logsfilter/capsule/FilterTriggerCapsule.java index 0280f0c96a7..5d495a5c98c 100644 --- a/framework/src/main/java/org/tron/common/logsfilter/capsule/FilterTriggerCapsule.java +++ b/framework/src/main/java/org/tron/common/logsfilter/capsule/FilterTriggerCapsule.java @@ -1,8 +1,5 @@ package org.tron.common.logsfilter.capsule; -public class FilterTriggerCapsule extends TriggerCapsule { +public class FilterTriggerCapsule { - public void processFilterTrigger() { - throw new UnsupportedOperationException(); - } } \ No newline at end of file diff --git a/framework/src/main/java/org/tron/common/logsfilter/capsule/LogsFilterCapsule.java b/framework/src/main/java/org/tron/common/logsfilter/capsule/LogsFilterCapsule.java index 8a8e122d9a0..c6f35e736a3 100644 --- a/framework/src/main/java/org/tron/common/logsfilter/capsule/LogsFilterCapsule.java +++ b/framework/src/main/java/org/tron/common/logsfilter/capsule/LogsFilterCapsule.java @@ -1,7 +1,5 @@ package org.tron.common.logsfilter.capsule; -import static org.tron.core.services.jsonrpc.TronJsonRpcImpl.handleLogsFilter; - import java.util.List; import lombok.Getter; import lombok.Setter; @@ -43,8 +41,4 @@ public LogsFilterCapsule(long blockNumber, String blockHash, Bloom bloom, this.removed = removed; } - @Override - public void processFilterTrigger() { - handleLogsFilter(this); - } -} \ No newline at end of file +} diff --git a/framework/src/main/java/org/tron/core/config/args/Args.java b/framework/src/main/java/org/tron/core/config/args/Args.java index 652f37a90db..523260add9e 100644 --- a/framework/src/main/java/org/tron/core/config/args/Args.java +++ b/framework/src/main/java/org/tron/core/config/args/Args.java @@ -564,6 +564,7 @@ private static void applyNodeConfig(NodeConfig nc) { PARAMETER.jsonRpcMaxBlockRange = jsonrpc.getMaxBlockRange(); PARAMETER.jsonRpcMaxSubTopics = jsonrpc.getMaxSubTopics(); PARAMETER.jsonRpcMaxBlockFilterNum = jsonrpc.getMaxBlockFilterNum(); + PARAMETER.jsonRpcMaxLogFilterNum = jsonrpc.getMaxLogFilterNum(); PARAMETER.jsonRpcMaxMessageSize = jsonrpc.getMaxMessageSize(); // ---- P2P sub-bean ---- diff --git a/framework/src/main/java/org/tron/core/db/Manager.java b/framework/src/main/java/org/tron/core/db/Manager.java index 2c188c90b30..70d8dc9cfca 100644 --- a/framework/src/main/java/org/tron/core/db/Manager.java +++ b/framework/src/main/java/org/tron/core/db/Manager.java @@ -48,6 +48,7 @@ import org.apache.commons.collections4.CollectionUtils; import org.bouncycastle.util.encoders.Hex; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; import org.tron.api.GrpcAPI; import org.tron.api.GrpcAPI.TransactionInfoList; @@ -142,6 +143,7 @@ import org.tron.core.service.MortgageService; import org.tron.core.service.RewardViCalService; import org.tron.core.services.event.exception.EventException; +import org.tron.core.services.jsonrpc.TronJsonRpcImpl; import org.tron.core.store.AccountAssetStore; import org.tron.core.store.AccountIdIndexStore; import org.tron.core.store.AccountIndexStore; @@ -277,6 +279,10 @@ public class Manager { @Autowired private RewardViCalService rewardViCalService; + @Lazy + @Autowired + private TronJsonRpcImpl tronJsonRpcImpl; + /** * Cycle thread to rePush Transactions */ @@ -333,8 +339,10 @@ public class Manager { while (isRunFilterProcessThread) { try { FilterTriggerCapsule filterCapsule = filterCapsuleQueue.poll(1, TimeUnit.SECONDS); - if (filterCapsule != null) { - filterCapsule.processFilterTrigger(); + if (filterCapsule instanceof LogsFilterCapsule) { + tronJsonRpcImpl.handleLogsFilter((LogsFilterCapsule) filterCapsule); + } else if (filterCapsule instanceof BlockFilterCapsule) { + tronJsonRpcImpl.handleBLockFilter((BlockFilterCapsule) filterCapsule); } } catch (InterruptedException e) { logger.error("FilterProcessLoop get InterruptedException, error is {}.", @@ -2279,7 +2287,8 @@ private void reOrgLogsFilter() { } private void postBlockFilter(final BlockCapsule blockCapsule, boolean solidified) { - BlockFilterCapsule blockFilterCapsule = new BlockFilterCapsule(blockCapsule, solidified); + BlockFilterCapsule blockFilterCapsule = + new BlockFilterCapsule(blockCapsule, solidified); if (!filterCapsuleQueue.offer(blockFilterCapsule)) { logger.info("Too many filters, block filter lost: {}.", blockCapsule.getBlockId()); } diff --git a/framework/src/main/java/org/tron/core/services/jsonrpc/JsonRpcApiUtil.java b/framework/src/main/java/org/tron/core/services/jsonrpc/JsonRpcApiUtil.java index 104b72a66e8..6a0957d62d2 100644 --- a/framework/src/main/java/org/tron/core/services/jsonrpc/JsonRpcApiUtil.java +++ b/framework/src/main/java/org/tron/core/services/jsonrpc/JsonRpcApiUtil.java @@ -61,6 +61,8 @@ public class JsonRpcApiUtil { public static final String TAG_SAFE_SUPPORT_ERROR = "TAG safe not supported"; public static final String BLOCK_NUM_ERROR = "invalid block number"; + private static final SecureRandom random = new SecureRandom(); + public static byte[] convertToTronAddress(byte[] address) { byte[] newAddress = new byte[21]; byte[] temp = new byte[] {Wallet.getAddressPreFixByte()}; @@ -647,7 +649,6 @@ public static long parseBlockNumber(String blockNumOrTag, Wallet wallet) } public static String generateFilterId() { - SecureRandom random = new SecureRandom(); byte[] uid = new byte[16]; // 128 bits are converted to 16 bytes random.nextBytes(uid); return ByteArray.toHexString(uid); diff --git a/framework/src/main/java/org/tron/core/services/jsonrpc/TronJsonRpc.java b/framework/src/main/java/org/tron/core/services/jsonrpc/TronJsonRpc.java index f5707148724..50da763b8b9 100644 --- a/framework/src/main/java/org/tron/core/services/jsonrpc/TronJsonRpc.java +++ b/framework/src/main/java/org/tron/core/services/jsonrpc/TronJsonRpc.java @@ -291,9 +291,10 @@ CompilationResult ethSubmitHashrate(String hashrate, String id) @JsonRpcErrors({ @JsonRpcError(exception = JsonRpcMethodNotFoundException.class, code = -32601, data = "{}"), @JsonRpcError(exception = JsonRpcInvalidParamsException.class, code = -32602, data = "{}"), + @JsonRpcError(exception = JsonRpcExceedLimitException.class, code = -32005, data = "{}"), }) String newFilter(FilterRequest fr) throws JsonRpcInvalidParamsException, - JsonRpcMethodNotFoundException; + JsonRpcMethodNotFoundException, JsonRpcExceedLimitException; @JsonRpcMethod("eth_newBlockFilter") @JsonRpcErrors({ diff --git a/framework/src/main/java/org/tron/core/services/jsonrpc/TronJsonRpcImpl.java b/framework/src/main/java/org/tron/core/services/jsonrpc/TronJsonRpcImpl.java index 40fafac535b..4d919b81ece 100644 --- a/framework/src/main/java/org/tron/core/services/jsonrpc/TronJsonRpcImpl.java +++ b/framework/src/main/java/org/tron/core/services/jsonrpc/TronJsonRpcImpl.java @@ -14,6 +14,7 @@ import static org.tron.core.services.jsonrpc.JsonRpcApiUtil.parseBlockNumber; import static org.tron.core.services.jsonrpc.JsonRpcApiUtil.triggerCallContract; +import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.protobuf.ByteString; @@ -30,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import lombok.Getter; @@ -118,7 +120,8 @@ public enum RequestSource { private static final String FILTER_NOT_FOUND = "filter not found"; public static final int EXPIRE_SECONDS = 5 * 60; - private static final int maxBlockFilterNum = Args.getInstance().getJsonRpcMaxBlockFilterNum(); + private final int maxBlockFilterNum = Args.getInstance().getJsonRpcMaxBlockFilterNum(); + private final int maxLogFilterNum = Args.getInstance().getJsonRpcMaxLogFilterNum(); private static final Cache logElementCache = CacheBuilder.newBuilder() .maximumSize(300_000L) // 300s * tps(1000) * 1 log/tx ≈ 300_000 @@ -133,25 +136,25 @@ public enum RequestSource { * for log filter in Full Json-RPC */ @Getter - private static final Map eventFilter2ResultFull = + private final Map eventFilter2ResultFull = new ConcurrentHashMap<>(); /** * for block in Full Json-RPC */ @Getter - private static final Map blockFilter2ResultFull = + private final Map blockFilter2ResultFull = new ConcurrentHashMap<>(); /** * for log filter in solidity Json-RPC */ @Getter - private static final Map eventFilter2ResultSolidity = + private final Map eventFilter2ResultSolidity = new ConcurrentHashMap<>(); /** * for block in solidity Json-RPC */ @Getter - private static final Map blockFilter2ResultSolidity = + private final Map blockFilter2ResultSolidity = new ConcurrentHashMap<>(); public static final String HASH_REGEX = "(0x)?[a-zA-Z0-9]{64}$"; @@ -169,25 +172,42 @@ public enum RequestSource { private static final String ERROR_SELECTOR = "08c379a0"; // Function selector for Error(string) private static final int REVERT_REASON_SELECTOR_LENGTH = 4; private static final int MAX_REVERT_REASON_PAYLOAD_BYTES = 4096; + private int filterParallelThreshold = 10000; + /** + * Using the default maxLogFilterNum of 20,000, a 3-thread pool can keep up with log event + * processing for each block within the 3-second BLOCK_PRODUCED_INTERVAL. Increasing the thread + * pool size too much may affect the performance of the main block processing thread. + */ + private final ForkJoinPool logsFilterPool = + ExecutorServiceManager.newForkJoinPool("logs-filter-pool", 3); /** * thread pool of query section bloom store */ private final ExecutorService sectionExecutor; private final NodeInfoService nodeInfoService; private final Wallet wallet; - private final Manager manager; + @Autowired + private Manager manager; private final String esName = "query-section"; @Autowired - public TronJsonRpcImpl(@Autowired NodeInfoService nodeInfoService, @Autowired Wallet wallet, - @Autowired Manager manager) { + public TronJsonRpcImpl(@Autowired NodeInfoService nodeInfoService, @Autowired Wallet wallet) { this.nodeInfoService = nodeInfoService; this.wallet = wallet; - this.manager = manager; this.sectionExecutor = ExecutorServiceManager.newFixedThreadPool(esName, 5); } - public static void handleBLockFilter(BlockFilterCapsule blockFilterCapsule) { + @VisibleForTesting + public void setManager(Manager manager) { + this.manager = manager; + } + + @VisibleForTesting + public void setFilterParallelThreshold(int filterParallelThreshold) { + this.filterParallelThreshold = filterParallelThreshold; + } + + public void handleBLockFilter(BlockFilterCapsule blockFilterCapsule) { Iterator> it; if (blockFilterCapsule.isSolidified()) { @@ -221,54 +241,69 @@ public static void handleBLockFilter(BlockFilterCapsule blockFilterCapsule) { /** * append LogsFilterCapsule's LogFilterElement list to each filter if matched */ - public static void handleLogsFilter(LogsFilterCapsule logsFilterCapsule) { - Iterator> it; + public void handleLogsFilter(LogsFilterCapsule logsFilterCapsule) { + long t1 = System.currentTimeMillis(); + Map eventFilterMap; if (logsFilterCapsule.isSolidified()) { - it = getEventFilter2ResultSolidity().entrySet().iterator(); + eventFilterMap = getEventFilter2ResultSolidity(); } else { - it = getEventFilter2ResultFull().entrySet().iterator(); + eventFilterMap = getEventFilter2ResultFull(); } - while (it.hasNext()) { - Entry entry = it.next(); - if (entry.getValue().isExpire()) { - it.remove(); - continue; - } - - LogFilterAndResult logFilterAndResult = entry.getValue(); - long fromBlock = logFilterAndResult.getLogFilterWrapper().getFromBlock(); - long toBlock = logFilterAndResult.getLogFilterWrapper().getToBlock(); - if (!(fromBlock <= logsFilterCapsule.getBlockNumber() - && logsFilterCapsule.getBlockNumber() <= toBlock)) { - continue; - } + if (eventFilterMap.size() <= filterParallelThreshold) { + eventFilterMap.entrySet().forEach( + entry -> processLogFilterEntry(entry, eventFilterMap, logsFilterCapsule)); + } else { + logsFilterPool.submit(() -> eventFilterMap.entrySet().parallelStream() + .forEach(entry -> processLogFilterEntry(entry, eventFilterMap, logsFilterCapsule)) + ).join(); + } + long t2 = System.currentTimeMillis(); + logger.debug("handleLogsFilter {} cost {}, filter size {}", + logsFilterCapsule.isSolidified() ? "Solidity" : "Full", t2 - t1, eventFilterMap.size()); + } + + private void processLogFilterEntry( + Map.Entry entry, + Map eventFilterMap, + LogsFilterCapsule logsFilterCapsule) { + LogFilterAndResult logFilterAndResult = entry.getValue(); + if (logFilterAndResult.isExpire()) { + eventFilterMap.remove(entry.getKey()); + return; + } - if (logsFilterCapsule.getBloom() != null - && !logFilterAndResult.getLogFilterWrapper().getLogFilter() - .matchBloom(logsFilterCapsule.getBloom())) { - continue; - } + long blockNumber = logsFilterCapsule.getBlockNumber(); + long fromBlock = logFilterAndResult.getLogFilterWrapper().getFromBlock(); + long toBlock = logFilterAndResult.getLogFilterWrapper().getToBlock(); + if (!(fromBlock <= blockNumber && blockNumber <= toBlock)) { + return; + } - LogFilter logFilter = logFilterAndResult.getLogFilterWrapper().getLogFilter(); - List elements = - LogMatch.matchBlock(logFilter, logsFilterCapsule.getBlockNumber(), - logsFilterCapsule.getBlockHash(), logsFilterCapsule.getTxInfoList(), - logsFilterCapsule.isRemoved()); + if (logsFilterCapsule.getBloom() != null && !logFilterAndResult.getLogFilterWrapper() + .getLogFilter().matchBloom(logsFilterCapsule.getBloom())) { + return; + } - for (LogFilterElement element : elements) { - LogFilterElement cachedElement; - try { - // compare with hashcode() first, then with equals(). If not exist, put it. - cachedElement = logElementCache.get(element, () -> element); - } catch (ExecutionException e) { - logger.error("Getting/loading LogFilterElement from cache fails", e); // never happen - cachedElement = element; - } - logFilterAndResult.getResult().add(cachedElement); + LogFilter logFilter = logFilterAndResult.getLogFilterWrapper().getLogFilter(); + List elements = + LogMatch.matchBlock(logFilter, blockNumber, logsFilterCapsule.getBlockHash(), + logsFilterCapsule.getTxInfoList(), logsFilterCapsule.isRemoved()); + + List localResults = new ArrayList<>(elements.size()); + for (LogFilterElement element : elements) { + LogFilterElement cachedElement; + try { + // compare with hashcode() first, then with equals(). If not exist, put it. + cachedElement = logElementCache.get(element, () -> element); + } catch (ExecutionException e) { + logger.error("Getting/loading LogFilterElement from cache fails", e); // never happen + cachedElement = element; } + localResults.add(cachedElement); } + logFilterAndResult.getResult().addAll(localResults); } @Override @@ -1399,7 +1434,7 @@ public CompilationResult ethSubmitHashrate(String hashrate, String id) @Override public String newFilter(FilterRequest fr) throws JsonRpcInvalidParamsException, - JsonRpcMethodNotFoundException { + JsonRpcMethodNotFoundException, JsonRpcExceedLimitException { disableInPBFT("eth_newFilter"); // not supports finalized as block parameter @@ -1414,7 +1449,11 @@ public String newFilter(FilterRequest fr) throws JsonRpcInvalidParamsException, } else { eventFilter2Result = eventFilter2ResultSolidity; } - + // Due to concurrent access, the threshold may occasionally be exceeded. + if (maxLogFilterNum > 0 && eventFilter2Result.size() >= maxLogFilterNum) { + throw new JsonRpcExceedLimitException( + "exceed max log filters: " + maxLogFilterNum + ", try again later"); + } long currentMaxFullNum = wallet.getNowBlock().getBlockHeader().getRawData().getNumber(); LogFilterAndResult logFilterAndResult = new LogFilterAndResult(fr, currentMaxFullNum, wallet); String filterID = generateFilterId(); @@ -1433,7 +1472,7 @@ public String newBlockFilter() throws JsonRpcMethodNotFoundException, } else { blockFilter2Result = blockFilter2ResultSolidity; } - if (blockFilter2Result.size() >= maxBlockFilterNum) { + if (maxBlockFilterNum > 0 && blockFilter2Result.size() >= maxBlockFilterNum) { throw new JsonRpcExceedLimitException( "exceed max block filters: " + maxBlockFilterNum + ", try again later"); } @@ -1542,7 +1581,7 @@ private LogFilterElement[] getLogsByLogFilterWrapper(LogFilterWrapper logFilterW return logMatch.matchBlockOneByOne(); } - public static Object[] getFilterResult(String filterId, Map + public Object[] getFilterResult(String filterId, Map blockFilter2Result, Map eventFilter2Result) throws ItemNotFoundException { Object[] result; @@ -1566,6 +1605,7 @@ public static Object[] getFilterResult(String filterId, Map matchedLog = matchBlock(logFilterWrapper.getLogFilter(), blockNum, blockHash, transactionInfoList, false); + if (!matchedLog.isEmpty()) { + if (logFilterElementList.size() + matchedLog.size() > LogBlockQuery.MAX_RESULT) { + throw new JsonRpcTooManyResultException( + "query returned more than " + LogBlockQuery.MAX_RESULT + " results"); + } logFilterElementList.addAll(matchedLog); } - - if (logFilterElementList.size() > LogBlockQuery.MAX_RESULT) { - throw new JsonRpcTooManyResultException( - "query returned more than " + LogBlockQuery.MAX_RESULT + " results"); - } } return logFilterElementList.toArray(new LogFilterElement[0]); diff --git a/framework/src/main/resources/config.conf b/framework/src/main/resources/config.conf index 6c8f2082301..ebf272ec124 100644 --- a/framework/src/main/resources/config.conf +++ b/framework/src/main/resources/config.conf @@ -391,8 +391,10 @@ node { # The maximum number of allowed topics within a topic criteria, default value is 1000, # should be > 0, otherwise means no limit. maxSubTopics = 1000 - # Allowed maximum number for blockFilter + # Allowed maximum number for blockFilter, >0 otherwise no limit maxBlockFilterNum = 50000 + # Allowed maximum number for newFilter, >0 otherwise no limit + maxLogFilterNum = 20000 } # Disabled api list, it will work for http, rpc and pbft, both FullNode and SolidityNode, diff --git a/framework/src/test/java/org/tron/common/logsfilter/FilterQueryTest.java b/framework/src/test/java/org/tron/common/logsfilter/FilterQueryTest.java index c87d8e1136e..b57b3a92fcd 100644 --- a/framework/src/test/java/org/tron/common/logsfilter/FilterQueryTest.java +++ b/framework/src/test/java/org/tron/common/logsfilter/FilterQueryTest.java @@ -21,7 +21,6 @@ import org.junit.Assert; import org.junit.Test; import org.tron.common.logsfilter.capsule.ContractEventTriggerCapsule; -import org.tron.common.logsfilter.capsule.FilterTriggerCapsule; import org.tron.common.logsfilter.capsule.TriggerCapsule; import org.tron.common.runtime.LogEventWrapper; import org.tron.protos.contract.SmartContractOuterClass.SmartContract.ABI.Entry; @@ -103,13 +102,6 @@ public synchronized void testMatchFilter() { assertNotNull(filterQuery.toString()); } - FilterTriggerCapsule filterTriggerCapsule = new FilterTriggerCapsule(); - try { - filterTriggerCapsule.processFilterTrigger(); - } catch (Exception e) { - logger.info(e.getMessage()); - } - TriggerCapsule triggerCapsule = new TriggerCapsule(); try { triggerCapsule.processTrigger(); diff --git a/framework/src/test/java/org/tron/common/logsfilter/capsule/BlockFilterCapsuleTest.java b/framework/src/test/java/org/tron/common/logsfilter/capsule/BlockFilterCapsuleTest.java index b5f7e676eea..aac42facf96 100644 --- a/framework/src/test/java/org/tron/common/logsfilter/capsule/BlockFilterCapsuleTest.java +++ b/framework/src/test/java/org/tron/common/logsfilter/capsule/BlockFilterCapsuleTest.java @@ -31,7 +31,6 @@ public void testSetAndIsSolidified() { blockFilterCapsule = new BlockFilterCapsule( "e58f33f9baf9305dc6f82b9f1934ea8f0ade2defb951258d50167028c780351f", false); blockFilterCapsule.setSolidified(true); - blockFilterCapsule.processFilterTrigger(); Assert.assertTrue(blockFilterCapsule.isSolidified()); } } diff --git a/framework/src/test/java/org/tron/common/logsfilter/capsule/LogsFilterCapsuleTest.java b/framework/src/test/java/org/tron/common/logsfilter/capsule/LogsFilterCapsuleTest.java index 691a3106b49..f23c446c23d 100644 --- a/framework/src/test/java/org/tron/common/logsfilter/capsule/LogsFilterCapsuleTest.java +++ b/framework/src/test/java/org/tron/common/logsfilter/capsule/LogsFilterCapsuleTest.java @@ -27,7 +27,6 @@ public void testSetAndGetLogsFilterCapsule() { capsule.setRemoved(capsule.isRemoved()); capsule.setTxInfoList(capsule.getTxInfoList()); assertNotNull(capsule.toString()); - capsule.processFilterTrigger(); } } diff --git a/framework/src/test/java/org/tron/common/runtime/vm/Create2Test.java b/framework/src/test/java/org/tron/common/runtime/vm/Create2Test.java index 6fa2801c51f..5a58407f887 100644 --- a/framework/src/test/java/org/tron/common/runtime/vm/Create2Test.java +++ b/framework/src/test/java/org/tron/common/runtime/vm/Create2Test.java @@ -209,7 +209,8 @@ private void testJsonRpc(byte[] actualContract, long loop) { NodeInfoService nodeInfoService; nodeInfoService = context.getBean(NodeInfoService.class); Wallet wallet = context.getBean(Wallet.class); - tronJsonRpc = new TronJsonRpcImpl(nodeInfoService, wallet, manager); + tronJsonRpc = new TronJsonRpcImpl(nodeInfoService, wallet); + tronJsonRpc.setManager(manager); try { String res = tronJsonRpc.getStorageAt(ByteArray.toHexString(actualContract), "0", "latest"); diff --git a/framework/src/test/java/org/tron/core/jsonrpc/ConcurrentHashMapTest.java b/framework/src/test/java/org/tron/core/jsonrpc/ConcurrentHashMapTest.java index 2cdcaaf7a53..2fcb624002e 100644 --- a/framework/src/test/java/org/tron/core/jsonrpc/ConcurrentHashMapTest.java +++ b/framework/src/test/java/org/tron/core/jsonrpc/ConcurrentHashMapTest.java @@ -23,6 +23,7 @@ @Slf4j public class ConcurrentHashMapTest { private static final String EXECUTOR_NAME = "jsonrpc-concurrent-map-test"; + private final TronJsonRpcImpl jsonRpc = new TronJsonRpcImpl(null, null); private static int randomInt(int minInt, int maxInt) { return (int) round(random(true) * (maxInt - minInt) + minInt, true); @@ -39,7 +40,7 @@ public void testHandleBlockHash() { int times = 100; int eachCount = 200; - Map conMap = TronJsonRpcImpl.getBlockFilter2ResultFull(); + Map conMap = jsonRpc.getBlockFilter2ResultFull(); Map> resultMap1 = new ConcurrentHashMap<>(); // used to check result Map> resultMap2 = new ConcurrentHashMap<>(); // used to check result Map> resultMap3 = new ConcurrentHashMap<>(); // used to check result @@ -71,7 +72,7 @@ public void testHandleBlockHash() { for (int j = 1 + (i - 1) * eachCount; j <= i * eachCount; j++) { BlockFilterCapsule blockFilterCapsule = new BlockFilterCapsule(String.valueOf(j), false); - TronJsonRpcImpl.handleBLockFilter(blockFilterCapsule); + jsonRpc.handleBLockFilter(blockFilterCapsule); } try { Thread.sleep(randomInt(50, 100)); @@ -96,8 +97,8 @@ public void testHandleBlockHash() { for (int k = 0; k < 5; k++) { try { - Object[] blockHashList = TronJsonRpcImpl.getFilterResult(String.valueOf(k), conMap, - TronJsonRpcImpl.getEventFilter2ResultFull()); + Object[] blockHashList = jsonRpc.getFilterResult(String.valueOf(k), conMap, + jsonRpc.getEventFilter2ResultFull()); for (Object str : blockHashList) { resultMap1.get(String.valueOf(k)).add(str.toString()); @@ -124,8 +125,8 @@ public void testHandleBlockHash() { for (int k = 0; k < 5; k++) { try { - Object[] blockHashList = TronJsonRpcImpl.getFilterResult(String.valueOf(k), conMap, - TronJsonRpcImpl.getEventFilter2ResultFull()); + Object[] blockHashList = jsonRpc.getFilterResult(String.valueOf(k), conMap, + jsonRpc.getEventFilter2ResultFull()); // if (blockHashList.length == 0) { // continue; @@ -156,8 +157,8 @@ public void testHandleBlockHash() { for (int k = 0; k < 5; k++) { try { - Object[] blockHashList = TronJsonRpcImpl.getFilterResult(String.valueOf(k), conMap, - TronJsonRpcImpl.getEventFilter2ResultFull()); + Object[] blockHashList = jsonRpc.getFilterResult(String.valueOf(k), conMap, + jsonRpc.getEventFilter2ResultFull()); for (Object str : blockHashList) { try { diff --git a/framework/src/test/java/org/tron/core/jsonrpc/HandleLogsFilterTest.java b/framework/src/test/java/org/tron/core/jsonrpc/HandleLogsFilterTest.java new file mode 100644 index 00000000000..33835c482fe --- /dev/null +++ b/framework/src/test/java/org/tron/core/jsonrpc/HandleLogsFilterTest.java @@ -0,0 +1,293 @@ +package org.tron.core.jsonrpc; + +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.tron.common.logsfilter.capsule.LogsFilterCapsule; +import org.tron.common.runtime.vm.DataWord; +import org.tron.common.runtime.vm.LogInfo; +import org.tron.core.exception.jsonrpc.JsonRpcInvalidParamsException; +import org.tron.core.services.jsonrpc.TronJsonRpc.FilterRequest; +import org.tron.core.services.jsonrpc.TronJsonRpcImpl; +import org.tron.core.services.jsonrpc.filters.FilterResult; +import org.tron.core.services.jsonrpc.filters.LogFilterAndResult; +import org.tron.protos.Protocol.TransactionInfo; + +public class HandleLogsFilterTest { + + private static final String FILTER_ID_1 = "handle-logs-test-001"; + private static final String FILTER_ID_2 = "handle-logs-test-002"; + + private TronJsonRpcImpl jsonRpc; + + @Before + public void setUp() { + jsonRpc = new TronJsonRpcImpl(null, null); + } + + @After + public void tearDown() throws Exception { + jsonRpc.close(); + } + + private TransactionInfo buildTxInfoWithLog(byte[] address) { + LogInfo logInfo = new LogInfo(address, + Collections.singletonList(new DataWord(new byte[32])), new byte[0]); + return TransactionInfo.newBuilder().addLog(LogInfo.buildLog(logInfo)).build(); + } + + /** + * Events dispatched to a matching filter in the serial (<=10000 entries) path. + */ + @Test + public void testMatchingFilter_receivesLogElements() throws JsonRpcInvalidParamsException { + FilterRequest fr = new FilterRequest(); + LogFilterAndResult filterAndResult = new LogFilterAndResult(fr, 100L, null); + jsonRpc.getEventFilter2ResultFull().put(FILTER_ID_1, filterAndResult); + + List txInfoList = + Collections.singletonList(buildTxInfoWithLog(new byte[20])); + LogsFilterCapsule capsule = + new LogsFilterCapsule(150L, "0xabcdef", null, txInfoList, false, false); + + jsonRpc.handleLogsFilter(capsule); + + Assert.assertEquals(1, filterAndResult.getResult().size()); + } + + /** + * Filter with fromBlock=100 does not receive a capsule whose blockNumber is 50. + */ + @Test + public void testBlockNumberBelowRange_noResult() throws JsonRpcInvalidParamsException { + FilterRequest fr = new FilterRequest(); + // currentMaxBlockNum=100 → fromBlock=100, toBlock=MAX_VALUE + LogFilterAndResult filterAndResult = new LogFilterAndResult(fr, 100L, null); + jsonRpc.getEventFilter2ResultFull().put(FILTER_ID_1, filterAndResult); + + List txInfoList = + Collections.singletonList(buildTxInfoWithLog(new byte[20])); + LogsFilterCapsule capsule = + new LogsFilterCapsule(50L, "0xabcdef", null, txInfoList, false, false); + + jsonRpc.handleLogsFilter(capsule); + + Assert.assertTrue(filterAndResult.getResult().isEmpty()); + } + + /** + * An expired filter is removed from the map during handleLogsFilter. + */ + @Test + public void testExpiredFilter_removedFromMap() throws Exception { + FilterRequest fr = new FilterRequest(); + LogFilterAndResult filterAndResult = new LogFilterAndResult(fr, 100L, null); + + Field expireField = FilterResult.class.getDeclaredField("expireTimeStamp"); + expireField.setAccessible(true); + expireField.setLong(filterAndResult, 0L); + + Map map = jsonRpc.getEventFilter2ResultFull(); + map.put(FILTER_ID_1, filterAndResult); + Assert.assertTrue(map.containsKey(FILTER_ID_1)); + + List txInfoList = + Collections.singletonList(buildTxInfoWithLog(new byte[20])); + LogsFilterCapsule capsule = + new LogsFilterCapsule(150L, "0xabcdef", null, txInfoList, false, false); + + jsonRpc.handleLogsFilter(capsule); + + Assert.assertFalse("expired filter should be removed", map.containsKey(FILTER_ID_1)); + } + + /** + * A solidified capsule is routed only to the solidity map; the full-node map is untouched. + */ + @Test + public void testSolidifiedCapsule_routedToSolidityMap() throws JsonRpcInvalidParamsException { + FilterRequest fr = new FilterRequest(); + LogFilterAndResult solidityFilter = new LogFilterAndResult(fr, 100L, null); + jsonRpc.getEventFilter2ResultSolidity().put(FILTER_ID_1, solidityFilter); + + LogFilterAndResult fullFilter = new LogFilterAndResult(fr, 100L, null); + jsonRpc.getEventFilter2ResultFull().put(FILTER_ID_2, fullFilter); + + List txInfoList = + Collections.singletonList(buildTxInfoWithLog(new byte[20])); + LogsFilterCapsule capsule = + new LogsFilterCapsule(150L, "0xabcdef", null, txInfoList, true, false); + + jsonRpc.handleLogsFilter(capsule); + + Assert.assertEquals(1, solidityFilter.getResult().size()); + Assert.assertTrue("full-node filter must not be touched", fullFilter.getResult().isEmpty()); + } + + /** + * A non-solidified capsule is routed only to the full-node map. + */ + @Test + public void testNonSolidifiedCapsule_routedToFullMap() throws JsonRpcInvalidParamsException { + FilterRequest fr = new FilterRequest(); + LogFilterAndResult solidityFilter = new LogFilterAndResult(fr, 100L, null); + jsonRpc.getEventFilter2ResultSolidity().put(FILTER_ID_1, solidityFilter); + + LogFilterAndResult fullFilter = new LogFilterAndResult(fr, 100L, null); + jsonRpc.getEventFilter2ResultFull().put(FILTER_ID_2, fullFilter); + + List txInfoList = + Collections.singletonList(buildTxInfoWithLog(new byte[20])); + LogsFilterCapsule capsule = + new LogsFilterCapsule(150L, "0xabcdef", null, txInfoList, false, false); + + jsonRpc.handleLogsFilter(capsule); + + Assert.assertEquals(1, fullFilter.getResult().size()); + Assert.assertTrue("solidity filter must not be touched", solidityFilter.getResult().isEmpty()); + } + + /** + * Both filters in the map receive events when both match. + */ + @Test + public void testMultipleMatchingFilters_bothReceiveEvents() throws JsonRpcInvalidParamsException { + FilterRequest fr = new FilterRequest(); + LogFilterAndResult filter1 = new LogFilterAndResult(fr, 100L, null); + LogFilterAndResult filter2 = new LogFilterAndResult(fr, 100L, null); + jsonRpc.getEventFilter2ResultFull().put(FILTER_ID_1, filter1); + jsonRpc.getEventFilter2ResultFull().put(FILTER_ID_2, filter2); + + List txInfoList = + Collections.singletonList(buildTxInfoWithLog(new byte[20])); + LogsFilterCapsule capsule = + new LogsFilterCapsule(150L, "0xabcdef", null, txInfoList, false, false); + + jsonRpc.handleLogsFilter(capsule); + + Assert.assertEquals(1, filter1.getResult().size()); + Assert.assertEquals(1, filter2.getResult().size()); + } + + /** + * An empty txInfoList produces no results. + */ + @Test + public void testEmptyTxInfoList_noResult() throws JsonRpcInvalidParamsException { + FilterRequest fr = new FilterRequest(); + LogFilterAndResult filterAndResult = new LogFilterAndResult(fr, 100L, null); + jsonRpc.getEventFilter2ResultFull().put(FILTER_ID_1, filterAndResult); + + LogsFilterCapsule capsule = new LogsFilterCapsule(150L, "0xabcdef", null, + Collections.emptyList(), false, false); + + jsonRpc.handleLogsFilter(capsule); + + Assert.assertTrue(filterAndResult.getResult().isEmpty()); + } + + private void setParallelThreshold(int value) { + jsonRpc.setFilterParallelThreshold(value); + } + + /** + * Parallel path: every matching filter receives exactly one event — no events dropped or + * double-counted under concurrent dispatch. + */ + @Test(timeout = 10000) + public void testParallelPath_allMatchingFilters_receiveEvents() throws Exception { + setParallelThreshold(2); + int count = 5; + FilterRequest fr = new FilterRequest(); + List txInfoList = + Collections.singletonList(buildTxInfoWithLog(new byte[20])); + Map map = jsonRpc.getEventFilter2ResultFull(); + String prefix = "parallel-match-"; + for (int i = 0; i < count; i++) { + map.put(prefix + i, new LogFilterAndResult(fr, 0L, null)); + } + + LogsFilterCapsule capsule = + new LogsFilterCapsule(150L, "0xabcdef", null, txInfoList, false, false); + jsonRpc.handleLogsFilter(capsule); + + for (int i = 0; i < count; i++) { + Assert.assertEquals("filter " + i + " must receive exactly one event", + 1, map.get(prefix + i).getResult().size()); + } + } + + /** + * Parallel path: expired filters are evicted and all valid filters still receive their events. + */ + @Test(timeout = 10000) + public void testParallelPath_expiredFiltersRemoved() throws Exception { + setParallelThreshold(2); + int expiredCount = 2; + int validCount = 3; + FilterRequest fr = new FilterRequest(); + Field expireField = FilterResult.class.getDeclaredField("expireTimeStamp"); + expireField.setAccessible(true); + Map map = jsonRpc.getEventFilter2ResultFull(); + String prefix = "parallel-expire-"; + for (int i = 0; i < expiredCount + validCount; i++) { + LogFilterAndResult filter = new LogFilterAndResult(fr, 0L, null); + if (i < expiredCount) { + expireField.setLong(filter, 0L); + } + map.put(prefix + i, filter); + } + + List txInfoList = + Collections.singletonList(buildTxInfoWithLog(new byte[20])); + LogsFilterCapsule capsule = + new LogsFilterCapsule(150L, "0xabcdef", null, txInfoList, false, false); + jsonRpc.handleLogsFilter(capsule); + + for (int i = 0; i < expiredCount; i++) { + Assert.assertFalse("expired filter " + i + " should be removed", + map.containsKey(prefix + i)); + } + for (int i = expiredCount; i < expiredCount + validCount; i++) { + Assert.assertEquals("valid filter " + i + " must receive one event", + 1, map.get(prefix + i).getResult().size()); + } + } + + /** + * Parallel path: a solidified capsule dispatches only to the solidity map; the full-node map + * is untouched even though it holds entries. + */ + @Test(timeout = 10000) + public void testParallelPath_solidifiedCapsule_routedToSolidityMap() throws Exception { + setParallelThreshold(2); + int count = 5; + FilterRequest fr = new FilterRequest(); + List txInfoList = + Collections.singletonList(buildTxInfoWithLog(new byte[20])); + Map solidityMap = jsonRpc.getEventFilter2ResultSolidity(); + Map fullMap = jsonRpc.getEventFilter2ResultFull(); + String solidityPrefix = "parallel-solid-"; + for (int i = 0; i < count; i++) { + solidityMap.put(solidityPrefix + i, new LogFilterAndResult(fr, 0L, null)); + } + LogFilterAndResult fullFilter = new LogFilterAndResult(fr, 0L, null); + fullMap.put("parallel-solid-full-0", fullFilter); + + LogsFilterCapsule capsule = + new LogsFilterCapsule(150L, "0xabcdef", null, txInfoList, true, false); + jsonRpc.handleLogsFilter(capsule); + + for (int i = 0; i < count; i++) { + Assert.assertEquals("solidity filter " + i + " must receive one event", + 1, solidityMap.get(solidityPrefix + i).getResult().size()); + } + Assert.assertTrue("full-map filter must not receive events", + fullFilter.getResult().isEmpty()); + } +} diff --git a/framework/src/test/java/org/tron/core/jsonrpc/JsonRpcCallAndEstimateGasTest.java b/framework/src/test/java/org/tron/core/jsonrpc/JsonRpcCallAndEstimateGasTest.java index 65defdab2ed..2ab455fa580 100644 --- a/framework/src/test/java/org/tron/core/jsonrpc/JsonRpcCallAndEstimateGasTest.java +++ b/framework/src/test/java/org/tron/core/jsonrpc/JsonRpcCallAndEstimateGasTest.java @@ -211,7 +211,9 @@ private static TronJsonRpcImpl newRpcWithMockedFailedCall(byte[] resData, Estima }); } - return new TronJsonRpcImpl(mockNodeInfo, mockWallet, mockManager); + TronJsonRpcImpl rpc = new TronJsonRpcImpl(mockNodeInfo, mockWallet); + rpc.setManager(mockManager); + return rpc; } private static TronJsonRpcImpl newRpcWithMockedSuccessfulCall(byte[]... constantResults) @@ -237,7 +239,9 @@ private static TronJsonRpcImpl newRpcWithMockedSuccessfulCall(byte[]... constant .build(); }); - return new TronJsonRpcImpl(mockNodeInfo, mockWallet, mockManager); + TronJsonRpcImpl rpc = new TronJsonRpcImpl(mockNodeInfo, mockWallet); + rpc.setManager(mockManager); + return rpc; } private static TronJsonRpcImpl newRpcWithMockedEstimateGasSuccessfulCall(long energyValue, @@ -272,6 +276,8 @@ private static TronJsonRpcImpl newRpcWithMockedEstimateGasSuccessfulCall(long en }); } - return new TronJsonRpcImpl(mockNodeInfo, mockWallet, mockManager); + TronJsonRpcImpl rpc = new TronJsonRpcImpl(mockNodeInfo, mockWallet); + rpc.setManager(mockManager); + return rpc; } } diff --git a/framework/src/test/java/org/tron/core/jsonrpc/JsonrpcServiceTest.java b/framework/src/test/java/org/tron/core/jsonrpc/JsonrpcServiceTest.java index 9a1641f9e45..f753045d259 100644 --- a/framework/src/test/java/org/tron/core/jsonrpc/JsonrpcServiceTest.java +++ b/framework/src/test/java/org/tron/core/jsonrpc/JsonrpcServiceTest.java @@ -214,7 +214,8 @@ public void init() { dbManager.getTransactionRetStore() .put(ByteArray.fromLong(blockCapsule2.getNum()), transactionRetCapsule2); - tronJsonRpc = new TronJsonRpcImpl(nodeInfoService, wallet, dbManager); + tronJsonRpc = new TronJsonRpcImpl(nodeInfoService, wallet); + tronJsonRpc.setManager(dbManager); } @Test diff --git a/framework/src/test/java/org/tron/core/jsonrpc/LogMatchOverLimitTest.java b/framework/src/test/java/org/tron/core/jsonrpc/LogMatchOverLimitTest.java new file mode 100644 index 00000000000..77f869fd5a8 --- /dev/null +++ b/framework/src/test/java/org/tron/core/jsonrpc/LogMatchOverLimitTest.java @@ -0,0 +1,151 @@ +package org.tron.core.jsonrpc; + +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.protobuf.ByteString; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.junit.Assert; +import org.junit.Test; +import org.tron.api.GrpcAPI.TransactionInfoList; +import org.tron.common.utils.Sha256Hash; +import org.tron.core.ChainBaseManager; +import org.tron.core.capsule.BlockCapsule; +import org.tron.core.db.Manager; +import org.tron.core.exception.BadItemException; +import org.tron.core.exception.ItemNotFoundException; +import org.tron.core.exception.jsonrpc.JsonRpcInvalidParamsException; +import org.tron.core.exception.jsonrpc.JsonRpcTooManyResultException; +import org.tron.core.services.jsonrpc.TronJsonRpc.FilterRequest; +import org.tron.core.services.jsonrpc.TronJsonRpc.LogFilterElement; +import org.tron.core.services.jsonrpc.filters.LogBlockQuery; +import org.tron.core.services.jsonrpc.filters.LogFilterWrapper; +import org.tron.core.services.jsonrpc.filters.LogMatch; +import org.tron.protos.Protocol.TransactionInfo; +import org.tron.protos.Protocol.TransactionInfo.Log; + +/** + * Verifies the over-limit check in {@link LogMatch#matchBlockOneByOne()} + * The fix ensures the exception is thrown BEFORE {@code addAll}, so the result list never + * silently exceeds {@link LogBlockQuery#MAX_RESULT}. + */ +public class LogMatchOverLimitTest { + + private static final int MAX_RESULT = LogBlockQuery.MAX_RESULT; // 10000 + + /** Builds a TransactionInfoList containing one TransactionInfo with {@code logCount} logs. */ + private TransactionInfoList buildTxList(int logCount) { + TransactionInfo.Builder txBuilder = TransactionInfo.newBuilder(); + for (int i = 0; i < logCount; i++) { + txBuilder.addLog(Log.newBuilder() + .setAddress(ByteString.copyFrom(new byte[20])) + .build()); + } + return TransactionInfoList.newBuilder() + .addTransactionInfo(txBuilder.build()) + .build(); + } + + private Manager buildMockManager(long blockNum, TransactionInfoList txList) + throws ItemNotFoundException { + Manager manager = mock(Manager.class); + ChainBaseManager chainBaseManager = mock(ChainBaseManager.class); + BlockCapsule.BlockId blockId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, blockNum); + + when(manager.getChainBaseManager()).thenReturn(chainBaseManager); + when(chainBaseManager.getBlockIdByNum(anyLong())).thenReturn(blockId); + when(manager.getTransactionInfoByBlockNum(blockNum)).thenReturn(txList); + return manager; + } + + private Manager buildMockManager(long block1, TransactionInfoList txList1, + long block2, TransactionInfoList txList2) throws ItemNotFoundException { + Manager manager = mock(Manager.class); + ChainBaseManager chainBaseManager = mock(ChainBaseManager.class); + BlockCapsule.BlockId blockId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 0); + + when(manager.getChainBaseManager()).thenReturn(chainBaseManager); + when(chainBaseManager.getBlockIdByNum(anyLong())).thenReturn(blockId); + when(manager.getTransactionInfoByBlockNum(block1)).thenReturn(txList1); + when(manager.getTransactionInfoByBlockNum(block2)).thenReturn(txList2); + return manager; + } + + private LogMatch buildLogMatch(List blockNums, Manager manager) + throws JsonRpcInvalidParamsException { + FilterRequest fr = new FilterRequest(); // match-all filter + LogFilterWrapper wrapper = new LogFilterWrapper(fr, 0L, null, false); + return new LogMatch(wrapper, blockNums, manager); + } + + /** Under the limit: all logs returned without exception. */ + @Test + public void testUnderLimit_returnsAllResults() + throws BadItemException, ItemNotFoundException, JsonRpcTooManyResultException, + JsonRpcInvalidParamsException { + int logCount = MAX_RESULT / 2; // 5000, well under limit + Manager manager = buildMockManager(100L, buildTxList(logCount)); + LogMatch logMatch = buildLogMatch(Collections.singletonList(100L), manager); + + LogFilterElement[] results = logMatch.matchBlockOneByOne(); + Assert.assertEquals(logCount, results.length); + } + + /** + * The cumulative log count from two blocks equals exactly MAX_RESULT. + * This should succeed (boundary: equal is still OK). + */ + @Test + public void testAtExactLimit_succeeds() + throws BadItemException, ItemNotFoundException, JsonRpcTooManyResultException, + JsonRpcInvalidParamsException { + // block 1: MAX_RESULT - 1 logs, block 2: 1 log → total == MAX_RESULT + Manager manager = buildMockManager( + 1L, buildTxList(MAX_RESULT - 1), + 2L, buildTxList(1)); + LogMatch logMatch = buildLogMatch(Arrays.asList(1L, 2L), manager); + + LogFilterElement[] results = logMatch.matchBlockOneByOne(); + Assert.assertEquals(MAX_RESULT, results.length); + } + + /** + * Verifies the fix: when the second block would push the total over MAX_RESULT, + * {@link JsonRpcTooManyResultException} is thrown BEFORE {@code addAll}. + */ + @Test + public void testExceedsLimit_throws() + throws ItemNotFoundException, JsonRpcInvalidParamsException { + // block 1: MAX_RESULT - 1 logs, block 2: 2 logs → 9999 + 2 = 10001 > MAX_RESULT + Manager manager = buildMockManager( + 1L, buildTxList(MAX_RESULT - 1), + 2L, buildTxList(2)); + LogMatch logMatch = buildLogMatch(Arrays.asList(1L, 2L), manager); + + assertThrows(JsonRpcTooManyResultException.class, logMatch::matchBlockOneByOne); + } + + /** A block with no matching logs is skipped without incrementing the result count. */ + @Test + public void testEmptyBlockSkipped() + throws BadItemException, ItemNotFoundException, JsonRpcTooManyResultException, + JsonRpcInvalidParamsException { + // block 1: no logs (empty txInfoList → skipped), block 2: 3 logs + Manager manager = mock(Manager.class); + ChainBaseManager chainBaseManager = mock(ChainBaseManager.class); + BlockCapsule.BlockId blockId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 0); + when(manager.getChainBaseManager()).thenReturn(chainBaseManager); + when(chainBaseManager.getBlockIdByNum(anyLong())).thenReturn(blockId); + when(manager.getTransactionInfoByBlockNum(1L)) + .thenReturn(TransactionInfoList.newBuilder().build()); // empty + when(manager.getTransactionInfoByBlockNum(2L)).thenReturn(buildTxList(3)); + + LogMatch logMatch = buildLogMatch(Arrays.asList(1L, 2L), manager); + LogFilterElement[] results = logMatch.matchBlockOneByOne(); + Assert.assertEquals(3, results.length); + } +} diff --git a/framework/src/test/java/org/tron/core/jsonrpc/WalletCursorTest.java b/framework/src/test/java/org/tron/core/jsonrpc/WalletCursorTest.java index fdd9cb44222..24ca71a74bc 100644 --- a/framework/src/test/java/org/tron/core/jsonrpc/WalletCursorTest.java +++ b/framework/src/test/java/org/tron/core/jsonrpc/WalletCursorTest.java @@ -1,6 +1,9 @@ package org.tron.core.jsonrpc; import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import javax.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.junit.Assert; @@ -13,14 +16,18 @@ import org.tron.core.capsule.AccountCapsule; import org.tron.core.config.args.Args; import org.tron.core.db2.core.Chainbase.Cursor; +import org.tron.core.exception.jsonrpc.JsonRpcExceedLimitException; import org.tron.core.services.NodeInfoService; +import org.tron.core.services.jsonrpc.TronJsonRpc.FilterRequest; import org.tron.core.services.jsonrpc.TronJsonRpcImpl; import org.tron.core.services.jsonrpc.TronJsonRpcImpl.RequestSource; +import org.tron.core.services.jsonrpc.filters.LogFilterAndResult; import org.tron.core.services.jsonrpc.types.BuildArguments; import org.tron.protos.Protocol; @Slf4j public class WalletCursorTest extends BaseTest { + private static final String OWNER_ADDRESS; private static final String OWNER_ADDRESS_ACCOUNT_NAME = "first"; @Resource @@ -30,7 +37,7 @@ public class WalletCursorTest extends BaseTest { private static boolean init; static { - Args.setParam(new String[]{"--output-directory", dbPath()}, TestConstants.TEST_CONF); + Args.setParam(new String[] {"--output-directory", dbPath()}, TestConstants.TEST_CONF); OWNER_ADDRESS = Wallet.getAddressPreFixString() + "abd4b9367799eaa3197fecb144eb71de1e049abc"; @@ -53,7 +60,8 @@ public void init() { @Test public void testSource() { - TronJsonRpcImpl tronJsonRpc = new TronJsonRpcImpl(nodeInfoService, wallet, dbManager); + TronJsonRpcImpl tronJsonRpc = new TronJsonRpcImpl(nodeInfoService, wallet); + tronJsonRpc.setManager(dbManager); Assert.assertEquals(Cursor.HEAD, wallet.getCursor()); Assert.assertEquals(RequestSource.FULLNODE, tronJsonRpc.getSource()); @@ -84,9 +92,11 @@ public void testDisableInSolidity() { dbManager.setCursor(Cursor.SOLIDITY); - TronJsonRpcImpl tronJsonRpc = new TronJsonRpcImpl(nodeInfoService, wallet, dbManager); + TronJsonRpcImpl tronJsonRpc = new TronJsonRpcImpl(nodeInfoService, wallet); + tronJsonRpc.setManager(dbManager); try { tronJsonRpc.buildTransaction(buildArguments); + tronJsonRpc.close(); } catch (Exception e) { Assert.assertEquals("the method buildTransaction does not exist/is not available in " + "SOLIDITY", e.getMessage()); @@ -105,7 +115,8 @@ public void testDisableInPBFT() { dbManager.setCursor(Cursor.PBFT); - TronJsonRpcImpl tronJsonRpc = new TronJsonRpcImpl(nodeInfoService, wallet, dbManager); + TronJsonRpcImpl tronJsonRpc = new TronJsonRpcImpl(nodeInfoService, wallet); + tronJsonRpc.setManager(dbManager); try { tronJsonRpc.buildTransaction(buildArguments); } catch (Exception e) { @@ -132,13 +143,50 @@ public void testEnableInFullNode() { buildArguments.setTo("0x548794500882809695a8a687866e76d4271a1abc"); buildArguments.setValue("0x1f4"); - TronJsonRpcImpl tronJsonRpc = new TronJsonRpcImpl(nodeInfoService, wallet, dbManager); + TronJsonRpcImpl tronJsonRpc = new TronJsonRpcImpl(nodeInfoService, wallet); + tronJsonRpc.setManager(dbManager); try { tronJsonRpc.buildTransaction(buildArguments); + tronJsonRpc.close(); } catch (Exception e) { Assert.fail(); } } + /** + * When the active filter count reaches the configured cap (node.jsonrpc.maxLogFilterNum), + * eth_newFilter must throw JsonRpcExceedLimitException instead of growing without bound. + */ + @Test + public void testNewFilter_exceedsCapThrowsException() throws Exception { + int cap = 5; + int saved = Args.getInstance().getJsonRpcMaxLogFilterNum(); + Args.getInstance().setJsonRpcMaxLogFilterNum(cap); + FilterRequest fr = new FilterRequest(); + TronJsonRpcImpl tronJsonRpc = new TronJsonRpcImpl(nodeInfoService, wallet); + tronJsonRpc.setManager(dbManager); + Map map = tronJsonRpc.getEventFilter2ResultFull(); + List addedKeys = new ArrayList<>(); + + try { + for (int i = 0; i < cap; i++) { + String key = "walletcursor-cap-test-" + i; + map.put(key, new LogFilterAndResult(fr, 0L, null)); + addedKeys.add(key); + } + Assert.assertEquals(cap, addedKeys.size()); + + try { + tronJsonRpc.newFilter(fr); + Assert.fail("Expected JsonRpcExceedLimitException when filter count reaches cap"); + } catch (JsonRpcExceedLimitException e) { + Assert.assertTrue(e.getMessage().contains(String.valueOf(cap))); + } + } finally { + tronJsonRpc.close(); + Args.getInstance().setJsonRpcMaxLogFilterNum(saved); + } + } + } \ No newline at end of file diff --git a/framework/src/test/resources/config-localtest.conf b/framework/src/test/resources/config-localtest.conf index 53a78d3e4c6..d31705f39bd 100644 --- a/framework/src/test/resources/config-localtest.conf +++ b/framework/src/test/resources/config-localtest.conf @@ -168,6 +168,7 @@ node { # maxBlockRange = 5000 # maxSubTopics = 1000 # maxBlockFilterNum = 30000 + # maxLogFilterNum = 20000 } } diff --git a/framework/src/test/resources/config-test-mainnet.conf b/framework/src/test/resources/config-test-mainnet.conf index d39f432ac36..9f968c5628d 100644 --- a/framework/src/test/resources/config-test-mainnet.conf +++ b/framework/src/test/resources/config-test-mainnet.conf @@ -95,6 +95,7 @@ node { # maxBlockRange = 5000 # maxSubTopics = 1000 # maxBlockFilterNum = 50000 + # maxLogFilterNum = 20000 } rpc { diff --git a/framework/src/test/resources/config-test.conf b/framework/src/test/resources/config-test.conf index 71e93f84db5..21cebbfeef4 100644 --- a/framework/src/test/resources/config-test.conf +++ b/framework/src/test/resources/config-test.conf @@ -119,6 +119,7 @@ node { # maxBlockRange = 5000 # maxSubTopics = 1000 # maxBlockFilterNum = 30000 + # maxLogFilterNum = 20000 } # use your ipv6 address for node discovery and tcp connection, default false