Skip to content

Commit 47fea7c

Browse files
committed
TcpDiscoveryCollectionMessage fixes
1 parent f5a91a0 commit 47fea7c

2 files changed

Lines changed: 148 additions & 21 deletions

File tree

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,11 @@
2020
import java.util.Collection;
2121
import java.util.Objects;
2222
import java.util.UUID;
23-
import org.apache.ignite.IgniteCheckedException;
2423
import org.apache.ignite.internal.Order;
2524
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
2625
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
2726
import org.apache.ignite.internal.util.typedef.internal.S;
28-
import org.apache.ignite.internal.util.typedef.internal.U;
2927
import org.apache.ignite.lang.IgniteUuid;
30-
import org.apache.ignite.marshaller.Marshaller;
3128
import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
3229

3330
/**
@@ -48,11 +45,8 @@ public class TcpDiscoveryClientReconnectMessage extends TcpDiscoveryAbstractMess
4845

4946
/** Pending messages. */
5047
@GridToStringExclude
51-
private Collection<TcpDiscoveryAbstractMessage> msgs;
52-
53-
/** Srialized bytes of {@link #msgs}. */
5448
@Order(2)
55-
byte[] msgsBytes;
49+
TcpDiscoveryCollectionMessage msgs;
5650

5751
/** Constructor for {@link DiscoveryMessageFactory}. */
5852
public TcpDiscoveryClientReconnectMessage() {
@@ -89,14 +83,14 @@ public IgniteUuid lastMessageId() {
8983
* @param msgs Pending messages.
9084
*/
9185
public void pendingMessages(Collection<TcpDiscoveryAbstractMessage> msgs) {
92-
this.msgs = msgs;
86+
this.msgs = new TcpDiscoveryCollectionMessage(msgs);
9387
}
9488

9589
/**
9690
* @return Pending messages.
9791
*/
9892
public Collection<TcpDiscoveryAbstractMessage> pendingMessages() {
99-
return msgs;
93+
return msgs.messages();
10094
}
10195

10296
/**
@@ -128,18 +122,6 @@ public boolean success() {
128122
Objects.equals(lastMsgId, other.lastMsgId);
129123
}
130124

131-
/** {@inheritDoc} */
132-
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
133-
if (msgs != null)
134-
msgsBytes = U.marshal(marsh, msgs);
135-
}
136-
137-
/** {@inheritDoc} */
138-
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
139-
if (msgsBytes != null)
140-
msgs = U.unmarshal(marsh, msgsBytes, clsLdr);
141-
}
142-
143125
/** {@inheritDoc} */
144126
@Override public short directType() {
145127
return 28;
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.spi.discovery.tcp.messages;
19+
20+
import java.io.Serializable;
21+
import java.util.ArrayList;
22+
import java.util.Collection;
23+
import java.util.Collections;
24+
import java.util.List;
25+
import java.util.Map;
26+
import org.apache.ignite.IgniteCheckedException;
27+
import org.apache.ignite.internal.Order;
28+
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
29+
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
30+
import org.apache.ignite.internal.util.typedef.F;
31+
import org.apache.ignite.internal.util.typedef.internal.S;
32+
import org.apache.ignite.internal.util.typedef.internal.U;
33+
import org.apache.ignite.marshaller.Marshaller;
34+
import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
35+
import org.apache.ignite.plugin.extensions.communication.Message;
36+
import org.jetbrains.annotations.Nullable;
37+
38+
/**
39+
* TODO: Remove/revise after https://issues.apache.org/jira/browse/IGNITE-27627
40+
* Message to transfer a collection of {@link TcpDiscoveryAbstractMessage} with the original order.
41+
* Several of them might be a {@link Message} without {@link Serializable} serialization.
42+
*/
43+
public class TcpDiscoveryCollectionMessage implements MarshallableMessage {
44+
/** {@link TcpDiscoveryAbstractMessage} pending messages which are a {@link Message}. */
45+
@Order(0)
46+
@Nullable Map<Integer, Message> writableMsgs;
47+
48+
/** Marshallable or Java-serializable pending messages which are not a {@link Message}. */
49+
@Nullable private Map<Integer, TcpDiscoveryAbstractMessage> marshallableMsgs;
50+
51+
/** Marshalled {@link #marshallableMsgs}. */
52+
@Order(1)
53+
@GridToStringExclude
54+
@Nullable byte[] marshallableMsgsBytes;
55+
56+
/** Constructor for {@link DiscoveryMessageFactory}. */
57+
public TcpDiscoveryCollectionMessage() {
58+
// No-op.
59+
}
60+
61+
/** @param msgs Discovery messages to hold. */
62+
public TcpDiscoveryCollectionMessage(Collection<TcpDiscoveryAbstractMessage> msgs) {
63+
if (F.isEmpty(msgs))
64+
return;
65+
66+
// Keeps the original message order.
67+
int idx = 0;
68+
69+
for (TcpDiscoveryAbstractMessage m : msgs) {
70+
if (m instanceof Message) {
71+
if (writableMsgs == null)
72+
writableMsgs = U.newHashMap(msgs.size());
73+
74+
writableMsgs.put(idx++, (Message)m);
75+
76+
continue;
77+
}
78+
79+
if (marshallableMsgs == null)
80+
marshallableMsgs = U.newHashMap(msgs.size());
81+
82+
marshallableMsgs.put(idx++, m);
83+
}
84+
}
85+
86+
/** @param marsh marshaller. */
87+
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
88+
if (marshallableMsgs != null && marshallableMsgsBytes == null)
89+
marshallableMsgsBytes = U.marshal(marsh, marshallableMsgs);
90+
}
91+
92+
/** {@inheritDoc} */
93+
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
94+
if (marshallableMsgsBytes != null && marshallableMsgs == null) {
95+
marshallableMsgs = U.unmarshal(marsh, marshallableMsgsBytes, clsLdr);
96+
97+
marshallableMsgsBytes = null;
98+
}
99+
}
100+
101+
/**
102+
* Gets pending messages sent to new node by its previous.
103+
*
104+
* @return Pending messages from previous node.
105+
*/
106+
public Collection<TcpDiscoveryAbstractMessage> messages() {
107+
if (F.isEmpty(writableMsgs) && F.isEmpty(marshallableMsgs))
108+
return Collections.emptyList();
109+
110+
int totalSz = (F.isEmpty(writableMsgs) ? 0 : writableMsgs.size())
111+
+ (F.isEmpty(marshallableMsgs) ? 0 : marshallableMsgs.size());
112+
113+
List<TcpDiscoveryAbstractMessage> res = new ArrayList<>(totalSz);
114+
115+
for (int i = 0; i < totalSz; ++i) {
116+
Message m = F.isEmpty(writableMsgs) ? null : writableMsgs.get(i);
117+
118+
if (m == null) {
119+
TcpDiscoveryAbstractMessage adm = marshallableMsgs.get(i);
120+
121+
assert adm != null;
122+
123+
res.add(adm);
124+
}
125+
else {
126+
assert marshallableMsgs == null || marshallableMsgs.get(i) == null;
127+
assert m instanceof TcpDiscoveryAbstractMessage;
128+
129+
res.add((TcpDiscoveryAbstractMessage)m);
130+
}
131+
}
132+
133+
return res;
134+
}
135+
136+
/** {@inheritDoc} */
137+
@Override public short directType() {
138+
return -108;
139+
}
140+
141+
/** {@inheritDoc} */
142+
@Override public String toString() {
143+
return S.toString(TcpDiscoveryCollectionMessage.class, this, "super", super.toString());
144+
}
145+
}

0 commit comments

Comments
 (0)