Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ public class CommonParameter {
@Getter
@Setter
public long syncFetchBatchNum; // clearParam: 2000
@Getter
@Setter
public int maxPendingBlockSize;

// If you are running a solidity node for java tron,
// this flag is set to true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class NodeConfig {
private String trustNode = "";
private boolean walletExtensionApi = false;
private int syncFetchBatchNum = 2000;
private int maxPendingBlockSize = 500;
private int validateSignThreadNum = 0; // 0 = auto (availableProcessors)
private int maxConnections = 30;
private int minConnections = 8;
Expand Down Expand Up @@ -439,6 +440,14 @@ private void postProcess() {
syncFetchBatchNum = 100;
}

// maxPendingBlockSize: clamp to [50, 2000]
if (maxPendingBlockSize > 2000) {
maxPendingBlockSize = 2000;
}
if (maxPendingBlockSize < 50) {
maxPendingBlockSize = 50;
}

// blockProducedTimeOut: clamp to [30, 100]
if (blockProducedTimeOut < 30) {
blockProducedTimeOut = 30;
Expand Down
2 changes: 2 additions & 0 deletions common/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ node {

# Number of blocks to fetch in one batch during sync. Range: [100, 2000].
syncFetchBatchNum = 2000
# Max in-flight (requested but not yet processed) blocks during sync. Range: [50, 2000].
maxPendingBlockSize = 500

# Number of validate sign threads, default availableProcessors
# Number of validate sign threads, 0 = auto (availableProcessors)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,7 @@ private static void applyNodeConfig(NodeConfig nc) {
PARAMETER.nodeEnableIpv6 = nc.isEnableIpv6();

PARAMETER.syncFetchBatchNum = nc.getSyncFetchBatchNum();
PARAMETER.maxPendingBlockSize = nc.getMaxPendingBlockSize();
PARAMETER.solidityThreads = nc.getSolidityThreads();
PARAMETER.blockProducedTimeOut = nc.getBlockProducedTimeOut();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -44,9 +45,9 @@ public class SyncService {
@Autowired
private PbftDataSyncHandler pbftDataSyncHandler;

private Map<BlockMessage, PeerConnection> blockWaitToProcess = new ConcurrentHashMap<>();
private Map<UnparsedBlock, PeerConnection> blockWaitToProcess = new ConcurrentHashMap<>();

private Map<BlockMessage, PeerConnection> blockJustReceived = new ConcurrentHashMap<>();
private Map<UnparsedBlock, PeerConnection> blockJustReceived = new ConcurrentHashMap<>();

private long blockCacheTimeout = Args.getInstance().getBlockCacheTimeout();
private Cache<BlockId, PeerConnection> requestBlockIds = CacheBuilder.newBuilder()
Expand All @@ -69,6 +70,10 @@ public class SyncService {

private final long syncFetchBatchNum = Args.getInstance().getSyncFetchBatchNum();

private final int maxPendingBlockSize = Args.getInstance().getMaxPendingBlockSize();

private volatile long maxRequestedBlockNum = 0;

public void init() {
ExecutorServiceManager.scheduleWithFixedDelay(fetchExecutor, () -> {
try {
Expand Down Expand Up @@ -135,7 +140,9 @@ public void syncNext(PeerConnection peer) {

public void processBlock(PeerConnection peer, BlockMessage blockMessage) {
synchronized (blockJustReceived) {
blockJustReceived.put(blockMessage, peer);
UnparsedBlock unparsedBlock = new UnparsedBlock(
blockMessage.getBlockId(), blockMessage.getData());
blockJustReceived.put(unparsedBlock, peer);
}
handleFlag = true;
if (peer.isSyncIdle()) {
Expand Down Expand Up @@ -227,8 +234,18 @@ private BlockId getBlockIdByNum(long num) throws P2pException {
}

private void startFetchSyncBlock() {
Collection<PeerConnection> activePeers = tronNetDelegate.getActivePeer();
int reqNum = activePeers.stream()
.mapToInt(p -> p.getSyncBlockRequested().size()).sum();
int remainNum;
synchronized (blockJustReceived) {
remainNum = maxPendingBlockSize - reqNum
- blockJustReceived.size() - blockWaitToProcess.size();
}

HashMap<PeerConnection, List<BlockId>> send = new HashMap<>();
tronNetDelegate.getActivePeer().stream()
int[] fetchingBlockSize = {0};
activePeers.stream()
.filter(peer -> peer.isNeedSyncFromPeer() && peer.isSyncIdle())
.filter(peer -> peer.isFetchAble())
.forEach(peer -> {
Expand All @@ -238,9 +255,16 @@ private void startFetchSyncBlock() {
for (BlockId blockId : peer.getSyncBlockToFetch()) {
if (requestBlockIds.getIfPresent(blockId) == null
&& !peer.getSyncBlockInProcess().contains(blockId)) {
if (fetchingBlockSize[0] >= remainNum && blockId.getNum() > maxRequestedBlockNum) {
break;
}
if (blockId.getNum() > maxRequestedBlockNum) {
maxRequestedBlockNum = blockId.getNum();
}
requestBlockIds.put(blockId, peer);
peer.getSyncBlockRequested().put(blockId, System.currentTimeMillis());
send.get(peer).add(blockId);
fetchingBlockSize[0]++;
if (send.get(peer).size() >= MAX_BLOCK_FETCH_PER_PEER) {
break;
}
Expand Down Expand Up @@ -269,29 +293,37 @@ private synchronized void handleSyncBlock() {

isProcessed[0] = false;

blockWaitToProcess.forEach((msg, peerConnection) -> {
blockWaitToProcess.forEach((unparsedBlock, peerConnection) -> {
synchronized (tronNetDelegate.getBlockLock()) {
BlockId blockId = unparsedBlock.getBlockId();
if (peerConnection.isDisconnect()) {
blockWaitToProcess.remove(msg);
invalid(msg.getBlockId(), peerConnection);
blockWaitToProcess.remove(unparsedBlock);
invalid(blockId, peerConnection);
return;
}
if (msg.getBlockId().getNum() <= solidNum) {
blockWaitToProcess.remove(msg);
peerConnection.getSyncBlockInProcess().remove(msg.getBlockId());
if (blockId.getNum() <= solidNum) {
blockWaitToProcess.remove(unparsedBlock);
peerConnection.getSyncBlockInProcess().remove(blockId);
return;
}
final boolean[] isFound = {false};
tronNetDelegate.getActivePeer().stream()
.filter(peer -> msg.getBlockId().equals(peer.getSyncBlockToFetch().peek()))
.filter(peer -> blockId.equals(peer.getSyncBlockToFetch().peek()))
.forEach(peer -> {
isFound[0] = true;
});
if (isFound[0]) {
blockWaitToProcess.remove(msg);
blockWaitToProcess.remove(unparsedBlock);
isProcessed[0] = true;
processSyncBlock(msg.getBlockCapsule(), peerConnection);
peerConnection.getSyncBlockInProcess().remove(msg.getBlockId());
BlockCapsule block;
try {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MUST] On failure this branch only logs and returns, which leaves three problems:

  1. unparsedBlock stays in blockWaitToProcess - next tick re-parses, re-fails, and the entry permanently consumes one maxPendingBlockNum slot.
  2. peer.getSyncBlockInProcess() is not cleared for this blockId.
  3. invalid(blockId, peerConnection) is not called, so the peer is not penalised and the block is not rescheduled to another peer.

The trigger probability is very low (bytes were already parsed once in the BlockMessage ctor), but the cleanup should mirror the isDisconnect() branch directly above:

} catch (Exception e) {
  logger.warn("Deserialize block {} failed", blockId.getString(), e);
  blockWaitToProcess.remove(unparsedBlock);
  peerConnection.getSyncBlockInProcess().remove(blockId);
  invalid(blockId, peerConnection);
  return;
}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deserialization was performed once before, so it's impossible for it to fail here. This is just to handle exceptions and doesn't require any additional logic.

block = new BlockCapsule(unparsedBlock.getData());
Copy link
Copy Markdown
Collaborator

@317787106 317787106 May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MUST] The purpose of replacing Block with UnparsedBlock is to save memory. But have you evaluated how much memory is actually saved? Encoding the data in a binary (proto) form doesn’t reduce memory usage significantly, so the benefit is minimal, while introducing several downsides:

  • It increases the cognitive burden for users.
  • If the data cannot be constructed into a Block, the peer should be disconnected as early as possible. However, nothing is handled here, which is inconsistent with the previous logic.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will result in a significant improvement. Since deserialization was performed once before, it's impossible for it to fail here. This is simply to handle exceptions and doesn't require any additional logic.

} catch (Exception e) {
logger.warn("Deserialize block {} failed", blockId.getString(), e);
return;
}
processSyncBlock(block, peerConnection);
peerConnection.getSyncBlockInProcess().remove(blockId);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.tron.core.net.service.sync;

import org.tron.core.capsule.BlockCapsule;

public class UnparsedBlock {

private final BlockCapsule.BlockId blockId;
private final byte[] data;

public UnparsedBlock(BlockCapsule.BlockId blockId, byte[] data) {
if (blockId == null) {
throw new IllegalArgumentException("blockId must not be null");
}
this.blockId = blockId;
this.data = data;
}

public BlockCapsule.BlockId getBlockId() {
return blockId;
}

public byte[] getData() {
return data;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof UnparsedBlock)) {
return false;
}
return blockId.equals(((UnparsedBlock) o).blockId);
}

@Override
public int hashCode() {
return blockId.hashCode();
}

@Override
public String toString() {
return blockId.getString();
}
}
5 changes: 5 additions & 0 deletions framework/src/main/resources/config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ node {
fetchBlock.timeout = 200
# syncFetchBatchNum = 2000

# Maximum number of blocks allowed in-flight (requested but not yet processed).
# Throttles block download to reduce memory pressure during sync.
# Range: [50, 2000], default: 500
# maxPendingBlockSize = 500

# Number of validate sign thread, default availableProcessors
# validateSignThreadNum = 16

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.tron.core.net.peer.PeerManager;
import org.tron.core.net.peer.TronState;
import org.tron.core.net.service.sync.SyncService;
import org.tron.core.net.service.sync.UnparsedBlock;
import org.tron.p2p.connection.Channel;
import org.tron.protos.Protocol;

Expand Down Expand Up @@ -98,12 +99,22 @@ public void testProcessBlock() {
ReflectUtils.setFieldValue(c1, "inetSocketAddress", inetSocketAddress);
ReflectUtils.setFieldValue(c1, "inetAddress", inetSocketAddress.getAddress());
peer.setChannel(c1);
service.processBlock(peer,
new BlockMessage(new BlockCapsule(Protocol.Block.newBuilder().build())));

BlockCapsule blockCapsule = new BlockCapsule(Protocol.Block.newBuilder().build());
BlockMessage blockMessage = new BlockMessage(blockCapsule);
service.processBlock(peer, blockMessage);

boolean fetchFlag = (boolean) ReflectUtils.getFieldObject(service, "fetchFlag");
boolean handleFlag = (boolean) ReflectUtils.getFieldObject(service, "handleFlag");
Assert.assertTrue(fetchFlag);
Assert.assertTrue(handleFlag);

Map<UnparsedBlock, PeerConnection> blockJustReceived =
(Map<UnparsedBlock, PeerConnection>)
ReflectUtils.getFieldObject(service, "blockJustReceived");
Assert.assertEquals(1, blockJustReceived.size());
UnparsedBlock stored = blockJustReceived.keySet().iterator().next();
Assert.assertEquals(blockMessage.getBlockId(), stored.getBlockId());
}

@Test
Expand Down Expand Up @@ -169,6 +180,46 @@ public void testStartFetchSyncBlock() throws Exception {
peer.getSyncBlockRequested().remove(blockId);
method.invoke(service);
Assert.assertTrue(peer.getSyncBlockRequested().get(blockId) == null);

// reset maxRequestedBlockNum to 0
Field maxRequestedBlockNumField = service.getClass().getDeclaredField("maxRequestedBlockNum");
maxRequestedBlockNumField.setAccessible(true);
maxRequestedBlockNumField.set(service, 0L);

Map<UnparsedBlock, PeerConnection> blockWaitToProcess =
(Map<UnparsedBlock, PeerConnection>)
ReflectUtils.getFieldObject(service, "blockWaitToProcess");

// target block has num=1, above maxRequestedBlockNum=0 so it can be throttled
BlockCapsule.BlockId highBlockId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 1);
peer.getSyncBlockToFetch().clear();
peer.getSyncBlockToFetch().add(highBlockId);
peer.getSyncBlockRequested().clear();
requestBlockIds.invalidateAll();

// fill blockWaitToProcess to reach maxPendingBlockSize (default 500)
int maxPendingBlockSize = (int) ReflectUtils.getFieldObject(service, "maxPendingBlockSize");
for (int i = 0; i < maxPendingBlockSize; i++) {
BlockCapsule.BlockId fillId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 10000 + i);
blockWaitToProcess.put(new UnparsedBlock(fillId, new byte[0]), peer);
}
method.invoke(service);
// highBlockId must NOT be requested: remainNum <= 0 and num > maxRequestedBlockNum
Assert.assertNull(peer.getSyncBlockRequested().get(highBlockId));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[SHOULD] Missing test for the maxRequestedBlockNum exemption path.

Current coverage only asserts the hard-cap case (budget exhausted + num > maxRequestedBlockNum -> block not requested). The PR's deadlock-avoidance design specifically allows budget exhausted + num <= maxRequestedBlockNum to go through as a retry, and that path is untested - a future refactor could break it silently.

Suggestion: add a symmetric case where maxRequestedBlockNum is set to e.g. 100, blockWaitToProcess is filled to maxPendingBlockNum, the target blockId.num = 50, and assert peer.getSyncBlockRequested().get(blockId) != null.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, the single test has been completed.


// Symmetric retry-exemption case: budget still saturated, but the target block's num
// is below maxRequestedBlockNum, so it must still be requested (deadlock-avoidance
// retry path — guards an explicit invariant of the throttling design).
maxRequestedBlockNumField.set(service, 100L);
BlockCapsule.BlockId retryBlockId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 50);
peer.getSyncBlockToFetch().clear();
peer.getSyncBlockToFetch().add(retryBlockId);
peer.getSyncBlockRequested().clear();
requestBlockIds.invalidateAll();
method.invoke(service);
// retryBlockId MUST be requested: remainNum <= 0 but num=50 <= maxRequestedBlockNum=100
Assert.assertNotNull(peer.getSyncBlockRequested().get(retryBlockId));
blockWaitToProcess.clear();
}

@Test
Expand All @@ -181,39 +232,34 @@ public void testHandleSyncBlock() throws Exception {
Method method = service.getClass().getDeclaredMethod("handleSyncBlock");
method.setAccessible(true);

Map<BlockMessage, PeerConnection> blockJustReceived =
(Map<BlockMessage, PeerConnection>)
Map<UnparsedBlock, PeerConnection> blockJustReceived =
(Map<UnparsedBlock, PeerConnection>)
ReflectUtils.getFieldObject(service, "blockJustReceived");
Protocol.BlockHeader.raw.Builder blockHeaderRawBuild = Protocol.BlockHeader.raw.newBuilder();
Protocol.BlockHeader.raw blockHeaderRaw = blockHeaderRawBuild

Protocol.BlockHeader.raw blockHeaderRaw = Protocol.BlockHeader.raw.newBuilder()
.setNumber(100000)
.build();

// block header
Protocol.BlockHeader.Builder blockHeaderBuild = Protocol.BlockHeader.newBuilder();
Protocol.BlockHeader blockHeader = blockHeaderBuild.setRawData(blockHeaderRaw).build();

BlockCapsule blockCapsule = new BlockCapsule(Protocol.Block.newBuilder()
.setBlockHeader(blockHeader).build());

Protocol.BlockHeader blockHeader = Protocol.BlockHeader.newBuilder()
.setRawData(blockHeaderRaw).build();
BlockCapsule blockCapsule = new BlockCapsule(
Protocol.Block.newBuilder().setBlockHeader(blockHeader).build());
BlockCapsule.BlockId blockId = blockCapsule.getBlockId();


InetSocketAddress a1 = new InetSocketAddress("127.0.0.1", 10001);
Channel c1 = mock(Channel.class);
Mockito.when(c1.getInetSocketAddress()).thenReturn(a1);
Mockito.when(c1.getInetAddress()).thenReturn(a1.getAddress());
PeerManager.add(ctx, c1);
peer = PeerManager.getPeers().get(0);

blockJustReceived.put(new BlockMessage(blockCapsule), peer);
UnparsedBlock unparsedBlock = new UnparsedBlock(blockId, blockCapsule.getData());
blockJustReceived.put(unparsedBlock, peer);

peer.getSyncBlockToFetch().add(blockId);

Cache<BlockCapsule.BlockId, PeerConnection> requestBlockIds =
(Cache<BlockCapsule.BlockId, PeerConnection>)
ReflectUtils.getFieldObject(service, "requestBlockIds");

(Cache<BlockCapsule.BlockId, PeerConnection>)
ReflectUtils.getFieldObject(service, "requestBlockIds");
requestBlockIds.put(blockId, peer);

method.invoke(service);
Expand Down
Loading