Skip to content

Commit 127d15d

Browse files
committed
IGNITE-27631 Use MessageSerializer for SingleNodeMessage
1 parent 29f5f27 commit 127d15d

43 files changed

Lines changed: 1270 additions & 490 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

modules/core/pom.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
<properties>
3636
<ignite.update.notifier.product>apache-ignite</ignite.update.notifier.product>
3737
<ignite.generated.source.path>${project.build.directory}/generated-sources/codegen</ignite.generated.source.path>
38+
<ignite.generated.test.source.path>${project.build.directory}/generated-test-sources/codegen</ignite.generated.test.source.path>
3839
</properties>
3940

4041
<dependencies>
@@ -376,13 +377,26 @@
376377
</sources>
377378
</configuration>
378379
</execution>
380+
<execution>
381+
<id>add-generated-test-sources</id>
382+
<phase>generate-test-sources</phase>
383+
<goals>
384+
<goal>add-test-source</goal>
385+
</goals>
386+
<configuration>
387+
<sources>
388+
<source>${ignite.generated.test.source.path}</source>
389+
</sources>
390+
</configuration>
391+
</execution>
379392
</executions>
380393
</plugin>
381394

382395
<plugin>
383396
<artifactId>maven-compiler-plugin</artifactId>
384397
<configuration>
385398
<generatedSourcesDirectory>${ignite.generated.source.path}</generatedSourcesDirectory>
399+
<generatedTestSourcesDirectory>${ignite.generated.test.source.path}</generatedTestSourcesDirectory>
386400
<annotationProcessors>
387401
<annotationProcessor>org.apache.ignite.internal.MessageProcessor</annotationProcessor>
388402
<annotationProcessor>org.apache.ignite.internal.idto.IgniteDataTransferObjectProcessor</annotationProcessor>

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,12 +231,34 @@
231231
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequestSerializer;
232232
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
233233
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponseSerializer;
234+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.DataStreamerUpdatesHandlerResult;
235+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.DataStreamerUpdatesHandlerResultSerializer;
234236
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotAwareMessage;
235237
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotAwareMessageSerializer;
238+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotVerifyResult;
239+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotVerifyResultSerializer;
240+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckHandlersNodeResponse;
241+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckHandlersNodeResponseSerializer;
242+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckHandlersResponse;
243+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckHandlersResponseSerializer;
244+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckPartitionHashesResponse;
245+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckPartitionHashesResponseSerializer;
246+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckResponse;
247+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckResponseSerializer;
236248
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFilesFailureMessage;
237249
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFilesFailureMessageSerializer;
238250
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFilesRequestMessage;
239251
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFilesRequestMessageSerializer;
252+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerResult;
253+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerResultSerializer;
254+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadataResponse;
255+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadataResponseSerializer;
256+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperationResponse;
257+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperationResponseSerializer;
258+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyHandlerResponse;
259+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyHandlerResponseSerializer;
260+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreOperationResponse;
261+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreOperationResponseSerializer;
240262
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
241263
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequestSerializer;
242264
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
@@ -330,6 +352,7 @@
330352
import org.apache.ignite.internal.util.UUIDCollectionMessage;
331353
import org.apache.ignite.internal.util.UUIDCollectionMessageSerializer;
332354
import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
355+
import org.apache.ignite.internal.util.distributed.SingleNodeMessageSerializer;
333356
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
334357
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
335358
import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest;
@@ -501,7 +524,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
501524
factory.register(GridQueryKillResponse.TYPE_CODE, GridQueryKillResponse::new, new GridQueryKillResponseSerializer());
502525
factory.register(GridIoSecurityAwareMessage.TYPE_CODE, GridIoSecurityAwareMessage::new, new GridIoSecurityAwareMessageSerializer());
503526
factory.register(SessionChannelMessage.TYPE_CODE, SessionChannelMessage::new, new SessionChannelMessageSerializer());
504-
factory.register(SingleNodeMessage.TYPE_CODE, SingleNodeMessage::new);
527+
factory.register(SingleNodeMessage.TYPE_CODE, SingleNodeMessage::new, new SingleNodeMessageSerializer());
505528
factory.register((short)177, TcpInverseConnectionResponseMessage::new, new TcpInverseConnectionResponseMessageSerializer());
506529
factory.register(SnapshotFilesRequestMessage.TYPE_CODE, SnapshotFilesRequestMessage::new,
507530
new SnapshotFilesRequestMessageSerializer());
@@ -545,6 +568,17 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
545568
factory.register(GridPartitionStateMap.TYPE_CODE, GridPartitionStateMap::new, new GridPartitionStateMapSerializer());
546569
factory.register(GridDhtPartitionMap.TYPE_CODE, GridDhtPartitionMap::new, new GridDhtPartitionMapSerializer());
547570
factory.register(GridDhtPartitionFullMap.TYPE_CODE, GridDhtPartitionFullMap::new, new GridDhtPartitionFullMapSerializer());
571+
factory.register((short)520, SnapshotOperationResponse::new, new SnapshotOperationResponseSerializer());
572+
factory.register((short)521, SnapshotHandlerResult::new, new SnapshotHandlerResultSerializer());
573+
factory.register((short)522, DataStreamerUpdatesHandlerResult::new, new DataStreamerUpdatesHandlerResultSerializer());
574+
factory.register((short)523, SnapshotCheckResponse::new, new SnapshotCheckResponseSerializer());
575+
factory.register((short)524, IncrementalSnapshotVerifyResult::new, new IncrementalSnapshotVerifyResultSerializer());
576+
factory.register((short)525, SnapshotRestoreOperationResponse::new, new SnapshotRestoreOperationResponseSerializer());
577+
factory.register((short)526, SnapshotMetadataResponse::new, new SnapshotMetadataResponseSerializer());
578+
factory.register((short)527, SnapshotCheckPartitionHashesResponse::new, new SnapshotCheckPartitionHashesResponseSerializer());
579+
factory.register((short)528, SnapshotCheckHandlersResponse::new, new SnapshotCheckHandlersResponseSerializer());
580+
factory.register((short)529, SnapshotCheckHandlersNodeResponse::new, new SnapshotCheckHandlersNodeResponseSerializer());
581+
factory.register((short)530, SnapshotPartitionsVerifyHandlerResponse::new, new SnapshotPartitionsVerifyHandlerResponseSerializer());
548582

549583
// [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this
550584
// [120..123] - DR

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,30 @@
4141
import org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposedMessageSerializer;
4242
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage;
4343
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessageSerializer;
44+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.DataStreamerUpdatesHandlerResult;
45+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.DataStreamerUpdatesHandlerResultSerializer;
46+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotVerifyResult;
47+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotVerifyResultSerializer;
48+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckHandlersNodeResponse;
49+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckHandlersNodeResponseSerializer;
50+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckHandlersResponse;
51+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckHandlersResponseSerializer;
52+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckPartitionHashesResponse;
53+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckPartitionHashesResponseSerializer;
54+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckResponse;
55+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckResponseSerializer;
56+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerResult;
57+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerResultSerializer;
58+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadataResponse;
59+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadataResponseSerializer;
60+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperationResponse;
61+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperationResponseSerializer;
62+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyHandlerResponse;
63+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyHandlerResponseSerializer;
64+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreOperationResponse;
65+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreOperationResponseSerializer;
66+
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
67+
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionSerializer;
4468
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
4569
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessageSerializer;
4670
import org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage;
@@ -51,6 +75,8 @@
5175
import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessageSerializer;
5276
import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
5377
import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessageSerializer;
78+
import org.apache.ignite.internal.util.distributed.FullMessage;
79+
import org.apache.ignite.internal.util.distributed.FullMessageSerializer;
5480
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
5581
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
5682
import org.apache.ignite.spi.communication.tcp.internal.TcpConnectionRequestDiscoveryMessage;
@@ -161,6 +187,9 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
161187
factory.register((short)22, TcpDiscoveryServerOnlyCustomEventMessage::new,
162188
new TcpDiscoveryServerOnlyCustomEventMessageSerializer());
163189
factory.register((short)23, TcpConnectionRequestDiscoveryMessage::new, new TcpConnectionRequestDiscoveryMessageSerializer());
190+
factory.register((short)24, FullMessage::new, new FullMessageSerializer());
191+
192+
factory.register((short)86, GridCacheVersion::new, new GridCacheVersionSerializer());
164193

165194
// DiscoveryCustomMessage
166195
factory.register((short)500, CacheStatisticsModeChangeMessage::new, new CacheStatisticsModeChangeMessageSerializer());
@@ -180,5 +209,17 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
180209
factory.register((short)512, ChangeGlobalStateFinishMessage::new, new ChangeGlobalStateFinishMessageSerializer());
181210
factory.register((short)513, StopRoutineAckDiscoveryMessage::new, new StopRoutineAckDiscoveryMessageSerializer());
182211
factory.register((short)514, StopRoutineDiscoveryMessage::new, new StopRoutineDiscoveryMessageSerializer());
212+
213+
factory.register((short)520, SnapshotOperationResponse::new, new SnapshotOperationResponseSerializer());
214+
factory.register((short)521, SnapshotHandlerResult::new, new SnapshotHandlerResultSerializer());
215+
factory.register((short)522, DataStreamerUpdatesHandlerResult::new, new DataStreamerUpdatesHandlerResultSerializer());
216+
factory.register((short)523, SnapshotCheckResponse::new, new SnapshotCheckResponseSerializer());
217+
factory.register((short)524, IncrementalSnapshotVerifyResult::new, new IncrementalSnapshotVerifyResultSerializer());
218+
factory.register((short)525, SnapshotRestoreOperationResponse::new, new SnapshotRestoreOperationResponseSerializer());
219+
factory.register((short)526, SnapshotMetadataResponse::new, new SnapshotMetadataResponseSerializer());
220+
factory.register((short)527, SnapshotCheckPartitionHashesResponse::new, new SnapshotCheckPartitionHashesResponseSerializer());
221+
factory.register((short)528, SnapshotCheckHandlersResponse::new, new SnapshotCheckHandlersResponseSerializer());
222+
factory.register((short)529, SnapshotCheckHandlersNodeResponse::new, new SnapshotCheckHandlersNodeResponseSerializer());
223+
factory.register((short)530, SnapshotPartitionsVerifyHandlerResponse::new, new SnapshotPartitionsVerifyHandlerResponseSerializer());
183224
}
184225
}

modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.apache.ignite.lang.IgniteFuture;
7272
import org.apache.ignite.lang.IgniteFutureCancelledException;
7373
import org.apache.ignite.lang.IgniteUuid;
74+
import org.apache.ignite.plugin.extensions.communication.Message;
7475
import org.apache.ignite.spi.IgniteNodeValidationResult;
7576
import org.apache.ignite.spi.IgniteSpiException;
7677
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
@@ -203,10 +204,10 @@ public class GridEncryptionManager extends GridManagerAdapter<EncryptionSpi> imp
203204
* Master key change prepare process. Checks that all server nodes have the same new master key and then starts
204205
* finish process.
205206
*/
206-
private DistributedProcess<MasterKeyChangeRequest, EmptyResult> prepareMKChangeProc;
207+
private DistributedProcess<MasterKeyChangeRequest, Message> prepareMKChangeProc;
207208

208209
/** Process to perform the master key change. Changes master key and reencrypt group keys. */
209-
private DistributedProcess<MasterKeyChangeRequest, EmptyResult> performMKChangeProc;
210+
private DistributedProcess<MasterKeyChangeRequest, Message> performMKChangeProc;
210211

211212
/**
212213
* A two-phase distributed process that rotates the encryption keys of specified cache groups and initiates
@@ -1488,7 +1489,7 @@ public void applyReencryptionStartRecord(ReencryptionStartRecord rec) {
14881489
* @param req Request.
14891490
* @return Result future.
14901491
*/
1491-
private IgniteInternalFuture<EmptyResult> prepareMasterKeyChange(MasterKeyChangeRequest req) {
1492+
private IgniteInternalFuture<Message> prepareMasterKeyChange(MasterKeyChangeRequest req) {
14921493
if (masterKeyChangeRequest != null) {
14931494
return new GridFinishedFuture<>(new IgniteException("Master key change was rejected. " +
14941495
"The previous change was not completed."));
@@ -1524,7 +1525,7 @@ private IgniteInternalFuture<EmptyResult> prepareMasterKeyChange(MasterKeyChange
15241525
ctx.localNodeId() + ']', e));
15251526
}
15261527

1527-
return new GridFinishedFuture<>(new EmptyResult());
1528+
return new GridFinishedFuture<>();
15281529
}
15291530

15301531
/**
@@ -1534,7 +1535,7 @@ private IgniteInternalFuture<EmptyResult> prepareMasterKeyChange(MasterKeyChange
15341535
* @param res Results.
15351536
* @param err Errors.
15361537
*/
1537-
private void finishPrepareMasterKeyChange(UUID id, Map<UUID, EmptyResult> res, Map<UUID, Throwable> err) {
1538+
private void finishPrepareMasterKeyChange(UUID id, Map<UUID, Message> res, Map<UUID, Throwable> err) {
15381539
if (!err.isEmpty()) {
15391540
if (masterKeyChangeRequest != null && masterKeyChangeRequest.requestId().equals(id))
15401541
masterKeyChangeRequest = null;
@@ -1551,7 +1552,7 @@ else if (U.isLocalNodeCoordinator(ctx.discovery()))
15511552
* @param req Request.
15521553
* @return Result future.
15531554
*/
1554-
private IgniteInternalFuture<EmptyResult> performMasterKeyChange(MasterKeyChangeRequest req) {
1555+
private IgniteInternalFuture<Message> performMasterKeyChange(MasterKeyChangeRequest req) {
15551556
if (masterKeyChangeRequest == null || !masterKeyChangeRequest.equals(req))
15561557
return new GridFinishedFuture<>(new IgniteException("Unknown master key change was rejected."));
15571558

@@ -1569,7 +1570,7 @@ private IgniteInternalFuture<EmptyResult> performMasterKeyChange(MasterKeyChange
15691570

15701571
masterKeyDigest = req.digest();
15711572

1572-
return new GridFinishedFuture<>(new EmptyResult());
1573+
return new GridFinishedFuture<>();
15731574
}
15741575

15751576
/**
@@ -1579,7 +1580,7 @@ private IgniteInternalFuture<EmptyResult> performMasterKeyChange(MasterKeyChange
15791580
* @param res Results.
15801581
* @param err Errors.
15811582
*/
1582-
private void finishPerformMasterKeyChange(UUID id, Map<UUID, EmptyResult> res, Map<UUID, Throwable> err) {
1583+
private void finishPerformMasterKeyChange(UUID id, Map<UUID, Message> res, Map<UUID, Throwable> err) {
15831584
completeMasterKeyChangeFuture(id, err);
15841585
}
15851586

@@ -1818,12 +1819,6 @@ byte[] digest() {
18181819
}
18191820
}
18201821

1821-
/** */
1822-
protected static class EmptyResult implements Serializable {
1823-
/** Serial version uid. */
1824-
private static final long serialVersionUID = 0L;
1825-
}
1826-
18271822
/** */
18281823
protected static class NodeEncryptionKeys implements Serializable {
18291824
/** */

0 commit comments

Comments
 (0)