-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat(net): reduce sync memory via lazy parsing and throttling #6717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ | |
| import com.google.common.cache.Cache; | ||
| import com.google.common.cache.CacheBuilder; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.LinkedList; | ||
|
|
@@ -44,9 +45,9 @@ public class SyncService { | |
| @Autowired | ||
| private PbftDataSyncHandler pbftDataSyncHandler; | ||
|
|
||
| private Map<BlockMessage, PeerConnection> blockWaitToProcess = new ConcurrentHashMap<>(); | ||
| private Map<UnparsedBlock, PeerConnection> blockWaitToProcess = new ConcurrentHashMap<>(); | ||
|
|
||
| private Map<BlockMessage, PeerConnection> blockJustReceived = new ConcurrentHashMap<>(); | ||
| private Map<UnparsedBlock, PeerConnection> blockJustReceived = new ConcurrentHashMap<>(); | ||
|
|
||
| private long blockCacheTimeout = Args.getInstance().getBlockCacheTimeout(); | ||
| private Cache<BlockId, PeerConnection> requestBlockIds = CacheBuilder.newBuilder() | ||
|
|
@@ -69,6 +70,10 @@ public class SyncService { | |
|
|
||
| private final long syncFetchBatchNum = Args.getInstance().getSyncFetchBatchNum(); | ||
|
|
||
| private final int maxPendingBlockSize = Args.getInstance().getMaxPendingBlockSize(); | ||
|
|
||
| private volatile long maxRequestedBlockNum = 0; | ||
|
|
||
| public void init() { | ||
| ExecutorServiceManager.scheduleWithFixedDelay(fetchExecutor, () -> { | ||
| try { | ||
|
|
@@ -135,7 +140,9 @@ public void syncNext(PeerConnection peer) { | |
|
|
||
| public void processBlock(PeerConnection peer, BlockMessage blockMessage) { | ||
| synchronized (blockJustReceived) { | ||
| blockJustReceived.put(blockMessage, peer); | ||
| UnparsedBlock unparsedBlock = new UnparsedBlock( | ||
| blockMessage.getBlockId(), blockMessage.getData()); | ||
| blockJustReceived.put(unparsedBlock, peer); | ||
| } | ||
| handleFlag = true; | ||
| if (peer.isSyncIdle()) { | ||
|
|
@@ -227,8 +234,18 @@ private BlockId getBlockIdByNum(long num) throws P2pException { | |
| } | ||
|
|
||
| private void startFetchSyncBlock() { | ||
| Collection<PeerConnection> activePeers = tronNetDelegate.getActivePeer(); | ||
| int reqNum = activePeers.stream() | ||
| .mapToInt(p -> p.getSyncBlockRequested().size()).sum(); | ||
| int remainNum; | ||
| synchronized (blockJustReceived) { | ||
| remainNum = maxPendingBlockSize - reqNum | ||
| - blockJustReceived.size() - blockWaitToProcess.size(); | ||
| } | ||
|
|
||
| HashMap<PeerConnection, List<BlockId>> send = new HashMap<>(); | ||
| tronNetDelegate.getActivePeer().stream() | ||
| int[] fetchingBlockSize = {0}; | ||
| activePeers.stream() | ||
| .filter(peer -> peer.isNeedSyncFromPeer() && peer.isSyncIdle()) | ||
| .filter(peer -> peer.isFetchAble()) | ||
| .forEach(peer -> { | ||
|
|
@@ -238,9 +255,16 @@ private void startFetchSyncBlock() { | |
| for (BlockId blockId : peer.getSyncBlockToFetch()) { | ||
| if (requestBlockIds.getIfPresent(blockId) == null | ||
| && !peer.getSyncBlockInProcess().contains(blockId)) { | ||
| if (fetchingBlockSize[0] >= remainNum && blockId.getNum() > maxRequestedBlockNum) { | ||
| break; | ||
| } | ||
| if (blockId.getNum() > maxRequestedBlockNum) { | ||
| maxRequestedBlockNum = blockId.getNum(); | ||
| } | ||
| requestBlockIds.put(blockId, peer); | ||
| peer.getSyncBlockRequested().put(blockId, System.currentTimeMillis()); | ||
| send.get(peer).add(blockId); | ||
| fetchingBlockSize[0]++; | ||
| if (send.get(peer).size() >= MAX_BLOCK_FETCH_PER_PEER) { | ||
| break; | ||
| } | ||
|
|
@@ -269,29 +293,37 @@ private synchronized void handleSyncBlock() { | |
|
|
||
| isProcessed[0] = false; | ||
|
|
||
| blockWaitToProcess.forEach((msg, peerConnection) -> { | ||
| blockWaitToProcess.forEach((unparsedBlock, peerConnection) -> { | ||
| synchronized (tronNetDelegate.getBlockLock()) { | ||
| BlockId blockId = unparsedBlock.getBlockId(); | ||
| if (peerConnection.isDisconnect()) { | ||
| blockWaitToProcess.remove(msg); | ||
| invalid(msg.getBlockId(), peerConnection); | ||
| blockWaitToProcess.remove(unparsedBlock); | ||
| invalid(blockId, peerConnection); | ||
| return; | ||
| } | ||
| if (msg.getBlockId().getNum() <= solidNum) { | ||
| blockWaitToProcess.remove(msg); | ||
| peerConnection.getSyncBlockInProcess().remove(msg.getBlockId()); | ||
| if (blockId.getNum() <= solidNum) { | ||
| blockWaitToProcess.remove(unparsedBlock); | ||
| peerConnection.getSyncBlockInProcess().remove(blockId); | ||
| return; | ||
| } | ||
| final boolean[] isFound = {false}; | ||
| tronNetDelegate.getActivePeer().stream() | ||
| .filter(peer -> msg.getBlockId().equals(peer.getSyncBlockToFetch().peek())) | ||
| .filter(peer -> blockId.equals(peer.getSyncBlockToFetch().peek())) | ||
| .forEach(peer -> { | ||
| isFound[0] = true; | ||
| }); | ||
| if (isFound[0]) { | ||
| blockWaitToProcess.remove(msg); | ||
| blockWaitToProcess.remove(unparsedBlock); | ||
| isProcessed[0] = true; | ||
| processSyncBlock(msg.getBlockCapsule(), peerConnection); | ||
| peerConnection.getSyncBlockInProcess().remove(msg.getBlockId()); | ||
| BlockCapsule block; | ||
| try { | ||
| block = new BlockCapsule(unparsedBlock.getData()); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [MUST] The purpose of replacing Block with
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will result in a significant improvement. Since deserialization was performed once before, it's impossible for it to fail here. This is simply to handle exceptions and doesn't require any additional logic. |
||
| } catch (Exception e) { | ||
| logger.warn("Deserialize block {} failed", blockId.getString(), e); | ||
| return; | ||
| } | ||
| processSyncBlock(block, peerConnection); | ||
| peerConnection.getSyncBlockInProcess().remove(blockId); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| package org.tron.core.net.service.sync; | ||
|
|
||
| import org.tron.core.capsule.BlockCapsule; | ||
|
|
||
| public class UnparsedBlock { | ||
|
|
||
| private final BlockCapsule.BlockId blockId; | ||
| private final byte[] data; | ||
|
|
||
| public UnparsedBlock(BlockCapsule.BlockId blockId, byte[] data) { | ||
| if (blockId == null) { | ||
| throw new IllegalArgumentException("blockId must not be null"); | ||
| } | ||
| this.blockId = blockId; | ||
| this.data = data; | ||
| } | ||
|
|
||
| public BlockCapsule.BlockId getBlockId() { | ||
| return blockId; | ||
| } | ||
|
|
||
| public byte[] getData() { | ||
| return data; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) { | ||
| if (this == o) { | ||
| return true; | ||
| } | ||
| if (!(o instanceof UnparsedBlock)) { | ||
| return false; | ||
| } | ||
| return blockId.equals(((UnparsedBlock) o).blockId); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return blockId.hashCode(); | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return blockId.getString(); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ | |
| import org.tron.core.net.peer.PeerManager; | ||
| import org.tron.core.net.peer.TronState; | ||
| import org.tron.core.net.service.sync.SyncService; | ||
| import org.tron.core.net.service.sync.UnparsedBlock; | ||
| import org.tron.p2p.connection.Channel; | ||
| import org.tron.protos.Protocol; | ||
|
|
||
|
|
@@ -98,12 +99,22 @@ public void testProcessBlock() { | |
| ReflectUtils.setFieldValue(c1, "inetSocketAddress", inetSocketAddress); | ||
| ReflectUtils.setFieldValue(c1, "inetAddress", inetSocketAddress.getAddress()); | ||
| peer.setChannel(c1); | ||
| service.processBlock(peer, | ||
| new BlockMessage(new BlockCapsule(Protocol.Block.newBuilder().build()))); | ||
|
|
||
| BlockCapsule blockCapsule = new BlockCapsule(Protocol.Block.newBuilder().build()); | ||
| BlockMessage blockMessage = new BlockMessage(blockCapsule); | ||
| service.processBlock(peer, blockMessage); | ||
|
|
||
| boolean fetchFlag = (boolean) ReflectUtils.getFieldObject(service, "fetchFlag"); | ||
| boolean handleFlag = (boolean) ReflectUtils.getFieldObject(service, "handleFlag"); | ||
| Assert.assertTrue(fetchFlag); | ||
| Assert.assertTrue(handleFlag); | ||
|
|
||
| Map<UnparsedBlock, PeerConnection> blockJustReceived = | ||
| (Map<UnparsedBlock, PeerConnection>) | ||
| ReflectUtils.getFieldObject(service, "blockJustReceived"); | ||
| Assert.assertEquals(1, blockJustReceived.size()); | ||
| UnparsedBlock stored = blockJustReceived.keySet().iterator().next(); | ||
| Assert.assertEquals(blockMessage.getBlockId(), stored.getBlockId()); | ||
| } | ||
|
|
||
| @Test | ||
|
|
@@ -169,6 +180,46 @@ public void testStartFetchSyncBlock() throws Exception { | |
| peer.getSyncBlockRequested().remove(blockId); | ||
| method.invoke(service); | ||
| Assert.assertTrue(peer.getSyncBlockRequested().get(blockId) == null); | ||
|
|
||
| // reset maxRequestedBlockNum to 0 | ||
| Field maxRequestedBlockNumField = service.getClass().getDeclaredField("maxRequestedBlockNum"); | ||
| maxRequestedBlockNumField.setAccessible(true); | ||
| maxRequestedBlockNumField.set(service, 0L); | ||
|
|
||
| Map<UnparsedBlock, PeerConnection> blockWaitToProcess = | ||
| (Map<UnparsedBlock, PeerConnection>) | ||
| ReflectUtils.getFieldObject(service, "blockWaitToProcess"); | ||
|
|
||
| // target block has num=1, above maxRequestedBlockNum=0 so it can be throttled | ||
| BlockCapsule.BlockId highBlockId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 1); | ||
| peer.getSyncBlockToFetch().clear(); | ||
| peer.getSyncBlockToFetch().add(highBlockId); | ||
| peer.getSyncBlockRequested().clear(); | ||
| requestBlockIds.invalidateAll(); | ||
|
|
||
| // fill blockWaitToProcess to reach maxPendingBlockSize (default 500) | ||
| int maxPendingBlockSize = (int) ReflectUtils.getFieldObject(service, "maxPendingBlockSize"); | ||
| for (int i = 0; i < maxPendingBlockSize; i++) { | ||
| BlockCapsule.BlockId fillId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 10000 + i); | ||
| blockWaitToProcess.put(new UnparsedBlock(fillId, new byte[0]), peer); | ||
| } | ||
| method.invoke(service); | ||
| // highBlockId must NOT be requested: remainNum <= 0 and num > maxRequestedBlockNum | ||
| Assert.assertNull(peer.getSyncBlockRequested().get(highBlockId)); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [SHOULD] Missing test for the Current coverage only asserts the hard-cap case ( Suggestion: add a symmetric case where
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, the single test has been completed. |
||
|
|
||
| // Symmetric retry-exemption case: budget still saturated, but the target block's num | ||
| // is below maxRequestedBlockNum, so it must still be requested (deadlock-avoidance | ||
| // retry path — guards an explicit invariant of the throttling design). | ||
| maxRequestedBlockNumField.set(service, 100L); | ||
| BlockCapsule.BlockId retryBlockId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 50); | ||
| peer.getSyncBlockToFetch().clear(); | ||
| peer.getSyncBlockToFetch().add(retryBlockId); | ||
| peer.getSyncBlockRequested().clear(); | ||
| requestBlockIds.invalidateAll(); | ||
| method.invoke(service); | ||
| // retryBlockId MUST be requested: remainNum <= 0 but num=50 <= maxRequestedBlockNum=100 | ||
| Assert.assertNotNull(peer.getSyncBlockRequested().get(retryBlockId)); | ||
| blockWaitToProcess.clear(); | ||
| } | ||
|
|
||
| @Test | ||
|
|
@@ -181,39 +232,34 @@ public void testHandleSyncBlock() throws Exception { | |
| Method method = service.getClass().getDeclaredMethod("handleSyncBlock"); | ||
| method.setAccessible(true); | ||
|
|
||
| Map<BlockMessage, PeerConnection> blockJustReceived = | ||
| (Map<BlockMessage, PeerConnection>) | ||
| Map<UnparsedBlock, PeerConnection> blockJustReceived = | ||
| (Map<UnparsedBlock, PeerConnection>) | ||
| ReflectUtils.getFieldObject(service, "blockJustReceived"); | ||
| Protocol.BlockHeader.raw.Builder blockHeaderRawBuild = Protocol.BlockHeader.raw.newBuilder(); | ||
| Protocol.BlockHeader.raw blockHeaderRaw = blockHeaderRawBuild | ||
|
|
||
| Protocol.BlockHeader.raw blockHeaderRaw = Protocol.BlockHeader.raw.newBuilder() | ||
| .setNumber(100000) | ||
| .build(); | ||
|
|
||
| // block header | ||
| Protocol.BlockHeader.Builder blockHeaderBuild = Protocol.BlockHeader.newBuilder(); | ||
| Protocol.BlockHeader blockHeader = blockHeaderBuild.setRawData(blockHeaderRaw).build(); | ||
|
|
||
| BlockCapsule blockCapsule = new BlockCapsule(Protocol.Block.newBuilder() | ||
| .setBlockHeader(blockHeader).build()); | ||
|
|
||
| Protocol.BlockHeader blockHeader = Protocol.BlockHeader.newBuilder() | ||
| .setRawData(blockHeaderRaw).build(); | ||
| BlockCapsule blockCapsule = new BlockCapsule( | ||
| Protocol.Block.newBuilder().setBlockHeader(blockHeader).build()); | ||
| BlockCapsule.BlockId blockId = blockCapsule.getBlockId(); | ||
|
|
||
|
|
||
| InetSocketAddress a1 = new InetSocketAddress("127.0.0.1", 10001); | ||
| Channel c1 = mock(Channel.class); | ||
| Mockito.when(c1.getInetSocketAddress()).thenReturn(a1); | ||
| Mockito.when(c1.getInetAddress()).thenReturn(a1.getAddress()); | ||
| PeerManager.add(ctx, c1); | ||
| peer = PeerManager.getPeers().get(0); | ||
|
|
||
| blockJustReceived.put(new BlockMessage(blockCapsule), peer); | ||
| UnparsedBlock unparsedBlock = new UnparsedBlock(blockId, blockCapsule.getData()); | ||
| blockJustReceived.put(unparsedBlock, peer); | ||
|
|
||
| peer.getSyncBlockToFetch().add(blockId); | ||
|
|
||
| Cache<BlockCapsule.BlockId, PeerConnection> requestBlockIds = | ||
| (Cache<BlockCapsule.BlockId, PeerConnection>) | ||
| ReflectUtils.getFieldObject(service, "requestBlockIds"); | ||
|
|
||
| (Cache<BlockCapsule.BlockId, PeerConnection>) | ||
| ReflectUtils.getFieldObject(service, "requestBlockIds"); | ||
| requestBlockIds.put(blockId, peer); | ||
|
|
||
| method.invoke(service); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[MUST] On failure this branch only logs and returns, which leaves three problems:
unparsedBlockstays inblockWaitToProcess- next tick re-parses, re-fails, and the entry permanently consumes onemaxPendingBlockNumslot.peer.getSyncBlockInProcess()is not cleared for thisblockId.invalid(blockId, peerConnection)is not called, so the peer is not penalised and the block is not rescheduled to another peer.The trigger probability is very low (bytes were already parsed once in the
BlockMessagector), but the cleanup should mirror theisDisconnect()branch directly above:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The deserialization was performed once before, so it's impossible for it to fail here. This is just to handle exceptions and doesn't require any additional logic.