Skip to content

Commit 81dda22

Browse files
IGNITE-28203 Use MarshalableMessage for ErrorMessage (#12878)
1 parent 5fb65a9 commit 81dda22

27 files changed

Lines changed: 207 additions & 188 deletions

File tree

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,17 @@
1818
package org.apache.ignite.internal.processors.query.calcite.message;
1919

2020
import java.util.UUID;
21+
import org.apache.ignite.IgniteCheckedException;
2122
import org.apache.ignite.internal.Order;
22-
import org.apache.ignite.internal.managers.communication.ErrorMessage;
23+
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
24+
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
25+
import org.apache.ignite.internal.util.typedef.internal.U;
26+
import org.jetbrains.annotations.Nullable;
2327

2428
/**
2529
*
2630
*/
27-
public class CalciteErrorMessage extends ErrorMessage implements CalciteMessage {
31+
public class CalciteErrorMessage implements CalciteMarshalableMessage {
2832
/** */
2933
@Order(0)
3034
UUID qryId;
@@ -33,19 +37,26 @@ public class CalciteErrorMessage extends ErrorMessage implements CalciteMessage
3337
@Order(1)
3438
long fragmentId;
3539

40+
/** Error bytes. */
41+
@Order(2)
42+
@GridToStringExclude
43+
@Nullable public byte[] errBytes;
44+
45+
/** Error. */
46+
private @Nullable Throwable err;
47+
3648
/** */
3749
public CalciteErrorMessage() {
3850
// No-op.
3951
}
4052

4153
/** */
4254
public CalciteErrorMessage(UUID qryId, long fragmentId, Throwable err) {
43-
super(err);
44-
4555
assert err != null;
4656

4757
this.qryId = qryId;
4858
this.fragmentId = fragmentId;
59+
this.err = err;
4960
}
5061

5162
/**
@@ -62,6 +73,11 @@ public long fragmentId() {
6273
return fragmentId;
6374
}
6475

76+
/** */
77+
public @Nullable Throwable error() {
78+
return err;
79+
}
80+
6581
/** {@inheritDoc} */
6682
@Override public MessageType type() {
6783
return MessageType.QUERY_ERROR_MESSAGE;
@@ -71,4 +87,16 @@ public long fragmentId() {
7187
@Override public short directType() {
7288
return MessageType.QUERY_ERROR_MESSAGE.directType();
7389
}
90+
91+
/** {@inheritDoc} */
92+
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
93+
if (err != null)
94+
errBytes = U.marshal(ctx.marshaller(), err);
95+
}
96+
97+
/** {@inheritDoc} */
98+
@Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
99+
if (errBytes != null)
100+
err = U.unmarshal(ctx.marshaller(), errBytes, U.resolveClassLoader(ctx.cache().context().gridConfig()));
101+
}
74102
}

modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,9 +280,29 @@ private void start(Collection<String> code, boolean write) {
280280
returnFalseIfWriteFailed(code, "writer.writeHeader", "directType()");
281281

282282
if (write && marshallableMessage()) {
283+
imports.add("org.apache.ignite.IgniteCheckedException");
284+
imports.add("org.apache.ignite.IgniteException");
285+
283286
code.add(EMPTY);
284287

288+
code.add(identedLine("try {"));
289+
290+
indent++;
291+
285292
code.add(identedLine("msg.prepareMarshal(marshaller);"));
293+
294+
indent--;
295+
296+
code.add(identedLine("}"));
297+
code.add(identedLine("catch (IgniteCheckedException e) {"));
298+
299+
indent++;
300+
301+
code.add(identedLine("throw new IgniteException(\"Failed to marshal object\" + msg.getClass().getSimpleName(), e);"));
302+
303+
indent--;
304+
305+
code.add(identedLine("}"));
286306
}
287307

288308
code.add(EMPTY);
@@ -949,8 +969,28 @@ private void finish(List<String> code, boolean read, boolean marshallable) {
949969
code.add(EMPTY);
950970

951971
if (read && marshallable) {
972+
imports.add("org.apache.ignite.IgniteCheckedException");
973+
imports.add("org.apache.ignite.IgniteException");
974+
975+
code.add(identedLine("try {"));
976+
977+
indent++;
978+
952979
code.add(identedLine("msg.finishUnmarshal(marshaller, clsLdr);"));
953980

981+
indent--;
982+
983+
code.add(identedLine("}"));
984+
code.add(identedLine("catch (IgniteCheckedException e) {"));
985+
986+
indent++;
987+
988+
code.add(identedLine("throw new IgniteException(\"Failed to unmarshal object\" + msg.getClass().getSimpleName(), e);"));
989+
990+
indent--;
991+
992+
code.add(identedLine("}"));
993+
954994
code.add(EMPTY);
955995
}
956996

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java

Lines changed: 18 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -17,41 +17,26 @@
1717

1818
package org.apache.ignite.internal.managers.communication;
1919

20-
import java.io.Serializable;
2120
import org.apache.ignite.IgniteCheckedException;
22-
import org.apache.ignite.IgniteException;
23-
import org.apache.ignite.internal.MessageProcessor;
2421
import org.apache.ignite.internal.Order;
2522
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
26-
import org.apache.ignite.internal.util.typedef.F;
2723
import org.apache.ignite.internal.util.typedef.internal.S;
2824
import org.apache.ignite.internal.util.typedef.internal.U;
29-
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
30-
import org.apache.ignite.plugin.extensions.communication.Message;
31-
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
25+
import org.apache.ignite.marshaller.Marshaller;
26+
import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
3227
import org.jetbrains.annotations.Nullable;
3328

34-
import static org.apache.ignite.marshaller.Marshallers.jdk;
35-
3629
/**
3730
* Message used to transfer {@link Throwable} objects.
38-
* <p>Because raw serialization of throwables is prohibited, you should use this message when it is necessary
39-
* to transfer some error as part of some message. See {@link MessageProcessor} for details.
40-
* <p>Currently, under the hood marshalling and unmarshalling is performed by {@link JdkMarshaller}.
41-
* <p>If the message serialization fails, wraps this error with own one.
4231
*/
4332
@SuppressWarnings({"NullableProblems", "unused"})
44-
// TODO Remove Serializable once https://issues.apache.org/jira/browse/IGNITE-27627 is completed.
45-
public class ErrorMessage implements Message, Serializable {
46-
/** */
47-
private static final long serialVersionUID = 0L;
48-
49-
/** Serialization and deserealization call holder. */
50-
@Order(value = 0, method = "errorBytes")
33+
public class ErrorMessage implements MarshallableMessage {
34+
/** Error bytes. */
35+
@Order(0)
5136
@GridToStringExclude
5237
@Nullable public byte[] errBytes;
5338

54-
/** Original error. It is transient and necessary only to avoid duplicated serialization and deserializtion. */
39+
/** Error. */
5540
private @Nullable Throwable err;
5641

5742
/**
@@ -62,61 +47,32 @@ public ErrorMessage() {
6247
}
6348

6449
/**
65-
* @param err Original error. Will be lazily serialized.
50+
* @param err Original error.
6651
*/
6752
public ErrorMessage(@Nullable Throwable err) {
6853
this.err = err;
6954
}
7055

71-
/**
72-
* Provides serialized bytes of the error. Should be called only once.
73-
*
74-
* @return Serialized error.
75-
* @see MessageWriter
76-
*/
77-
public @Nullable byte[] errorBytes() {
78-
if (err == null)
79-
return null;
80-
56+
/** {@inheritDoc} */
57+
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
8158
try {
82-
return U.marshal(jdk(), err);
59+
if (err != null)
60+
errBytes = U.marshal(marsh, err);
8361
}
84-
catch (IgniteCheckedException e0) {
62+
catch (IgniteCheckedException e) {
8563
IgniteCheckedException wrappedErr = new IgniteCheckedException(err.getMessage());
8664

8765
wrappedErr.setStackTrace(err.getStackTrace());
88-
wrappedErr.addSuppressed(e0);
66+
wrappedErr.addSuppressed(e);
8967

90-
try {
91-
return U.marshal(jdk(), wrappedErr);
92-
}
93-
catch (IgniteCheckedException e1) {
94-
IgniteException marshErr = new IgniteException("Unable to marshal the wrapping error.", e1);
95-
96-
marshErr.addSuppressed(wrappedErr);
97-
98-
throw marshErr;
99-
}
68+
errBytes = U.marshal(marsh, wrappedErr);
10069
}
10170
}
10271

103-
/**
104-
* Deserializes the error from {@code errBytes}. Should be called only once.
105-
*
106-
* @param errBytes Serialized error.
107-
* @see MessageWriter
108-
*/
109-
public void errorBytes(@Nullable byte[] errBytes) {
110-
if (F.isEmpty(errBytes))
111-
err = null;
112-
else {
113-
try {
114-
err = U.unmarshal(jdk(), errBytes, U.gridClassLoader());
115-
}
116-
catch (IgniteCheckedException e) {
117-
throw new IgniteException("Failed to unmarshal error data bytes.", e);
118-
}
119-
}
72+
/** {@inheritDoc} */
73+
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
74+
if (errBytes != null)
75+
err = U.unmarshal(marsh, errBytes, clsLdr);
12076
}
12177

12278
/** */
@@ -125,8 +81,6 @@ public void errorBytes(@Nullable byte[] errBytes) {
12581
}
12682

12783
/**
128-
* Safely gets original error from an error message.
129-
*
13084
* @param errorMsg Error message.
13185
* @return Error containing in the message.
13286
*/

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ public void resetMetrics() {
449449

450450
List<MessageFactoryProvider> compMsgs = new ArrayList<>();
451451

452-
compMsgs.add(new GridIoMessageFactory());
452+
compMsgs.add(new GridIoMessageFactory(marsh, U.gridClassLoader()));
453453

454454
for (IgniteComponentType compType : IgniteComponentType.values()) {
455455
MessageFactoryProvider f = compType.messageFactory();

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@
330330
import org.apache.ignite.internal.util.UUIDCollectionMessage;
331331
import org.apache.ignite.internal.util.UUIDCollectionMessageSerializer;
332332
import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
333+
import org.apache.ignite.marshaller.Marshaller;
333334
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
334335
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
335336
import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest;
@@ -350,12 +351,27 @@
350351
* Message factory implementation.
351352
*/
352353
public class GridIoMessageFactory implements MessageFactoryProvider {
354+
/** Custom data marshaller. */
355+
private final Marshaller cstDataMarshall;
356+
357+
/** Class loader for the custom data marshalling. */
358+
private final ClassLoader cstDataMarshallClsLdr;
359+
360+
/**
361+
* @param cstDataMarshall Custom data marshaller.
362+
* @param cstDataMarshallClsLdr Class loader for the custom data marshalling.
363+
*/
364+
public GridIoMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMarshallClsLdr) {
365+
this.cstDataMarshall = cstDataMarshall;
366+
this.cstDataMarshallClsLdr = cstDataMarshallClsLdr;
367+
}
368+
353369
/** {@inheritDoc} */
354370
@Override public void registerAll(MessageFactory factory) {
355371
// -54 is reserved for SQL.
356372
// We don't use the code‑generated serializer for CompressedMessage - serialization is highly customized.
357373
factory.register(CompressedMessage.TYPE_CODE, CompressedMessage::new);
358-
factory.register((short)-66, ErrorMessage::new, new ErrorMessageSerializer());
374+
factory.register((short)-66, ErrorMessage::new, new ErrorMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
359375
factory.register((short)-65, TxInfo::new, new TxInfoSerializer());
360376
factory.register((short)-64, TxEntriesInfo::new, new TxEntriesInfoSerializer());
361377
factory.register((short)-63, ExchangeInfo::new, new ExchangeInfoSerializer());

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.ignite.internal.managers.discovery;
1919

2020
import org.apache.ignite.internal.managers.communication.ErrorMessage;
21-
import org.apache.ignite.internal.managers.communication.ErrorMessageSerializer;
21+
import org.apache.ignite.internal.managers.communication.ErrorMessageMarshallableSerializer;
2222
import org.apache.ignite.internal.processors.authentication.User;
2323
import org.apache.ignite.internal.processors.authentication.UserAcceptedMessage;
2424
import org.apache.ignite.internal.processors.authentication.UserAcceptedMessageSerializer;
@@ -132,23 +132,20 @@
132132
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryServerOnlyCustomEventMessageSerializer;
133133
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;
134134
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessageSerializer;
135-
import org.jetbrains.annotations.Nullable;
136135

137136
/** Message factory for discovery messages. */
138137
public class DiscoveryMessageFactory implements MessageFactoryProvider {
139138
/** Custom data marshaller. */
140-
private final @Nullable Marshaller cstDataMarshall;
139+
private final Marshaller cstDataMarshall;
141140

142141
/** Class loader for the custom data marshalling. */
143-
private final @Nullable ClassLoader cstDataMarshallClsLdr;
142+
private final ClassLoader cstDataMarshallClsLdr;
144143

145144
/**
146145
* @param cstDataMarshall Custom data marshaller.
147146
* @param cstDataMarshallClsLdr Class loader for the custom data marshalling.
148147
*/
149-
public DiscoveryMessageFactory(@Nullable Marshaller cstDataMarshall, @Nullable ClassLoader cstDataMarshallClsLdr) {
150-
assert cstDataMarshall == null && cstDataMarshallClsLdr == null || cstDataMarshall != null && cstDataMarshallClsLdr != null;
151-
148+
public DiscoveryMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMarshallClsLdr) {
152149
this.cstDataMarshall = cstDataMarshall;
153150
this.cstDataMarshallClsLdr = cstDataMarshallClsLdr;
154151
}
@@ -166,7 +163,7 @@ public DiscoveryMessageFactory(@Nullable Marshaller cstDataMarshall, @Nullable C
166163
factory.register((short)-102, TcpDiscoveryNodeMetricsMessage::new, new TcpDiscoveryNodeMetricsMessageSerializer());
167164
factory.register((short)-101, InetSocketAddressMessage::new, new InetSocketAddressMessageSerializer());
168165
factory.register((short)-100, InetAddressMessage::new, new InetAddressMessageSerializer());
169-
factory.register((short)-66, ErrorMessage::new, new ErrorMessageSerializer());
166+
factory.register((short)-66, ErrorMessage::new, new ErrorMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
170167

171168
// TcpDiscoveryAbstractMessage
172169
factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new TcpDiscoveryCheckFailedMessageSerializer());

modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MarshallableMessage.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,21 @@
1717

1818
package org.apache.ignite.plugin.extensions.communication;
1919

20+
import org.apache.ignite.IgniteCheckedException;
2021
import org.apache.ignite.marshaller.Marshaller;
2122

2223
/** A {@link Message} which still requires external custom pre-marshalling and post-unmarshalling. */
2324
public interface MarshallableMessage extends Message {
2425
/** @param marsh External custom marshaller. */
25-
public default void prepareMarshal(Marshaller marsh) {
26+
public default void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
2627
throw new UnsupportedOperationException();
2728
}
2829

2930
/**
3031
* @param marsh External custom marshaller.
3132
* @param clsLdr External class loader to post-unmarshall.
3233
*/
33-
public default void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) {
34+
public default void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
3435
throw new UnsupportedOperationException();
3536
}
3637
}

0 commit comments

Comments
 (0)