Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 156621899E for ; Fri, 29 May 2015 12:46:26 +0000 (UTC) Received: (qmail 68514 invoked by uid 500); 29 May 2015 12:46:25 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 68483 invoked by uid 500); 29 May 2015 12:46:25 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 68474 invoked by uid 99); 29 May 2015 12:46:25 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 May 2015 12:46:25 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 6B8A8C0C94 for ; Fri, 29 May 2015 12:46:25 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id sid8BzpdUhHT for ; Fri, 29 May 2015 12:46:17 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 5AA7C253C7 for ; Fri, 29 May 2015 12:46:09 +0000 (UTC) Received: (qmail 67170 invoked by uid 99); 29 May 2015 12:46:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 May 2015 12:46:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1477BE1110; Fri, 29 May 2015 12:46:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Fri, 29 May 2015 12:46:13 -0000 Message-Id: <63983ff9f8d84a368112f9286c9c03ac@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [06/50] incubator-ignite git commit: IGNITE-709 Fix CacheListenerTest#testDeregistration() IGNITE-709 Fix CacheListenerTest#testDeregistration() Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8fbb590f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8fbb590f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8fbb590f Branch: refs/heads/ignite-929 Commit: 8fbb590f2e659c785c6d89ea49e1e4bb3fdc666f Parents: 0ae75d8 Author: sevdokimov Authored: Sun May 24 20:23:01 2015 +0300 Committer: sevdokimov Committed: Sun May 24 20:23:01 2015 +0300 ---------------------------------------------------------------------- .../discovery/CustomMessageWrapper.java | 5 ++ .../discovery/DiscoveryCustomMessage.java | 5 ++ .../cache/DynamicCacheChangeBatch.java | 5 ++ .../continuous/AbstractContinuousMessage.java | 54 ++++++++++++++++++++ .../StartRoutineAckDiscoveryMessage.java | 20 ++------ .../StartRoutineDiscoveryMessage.java | 25 +++------ .../StopRoutineAckDiscoveryMessage.java | 19 +------ .../continuous/StopRoutineDiscoveryMessage.java | 19 +------ .../discovery/DiscoverySpiCustomMessage.java | 5 ++ .../discovery/tcp/TcpClientDiscoverySpi.java | 4 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 14 ++--- .../TcpDiscoveryCustomEventMessage.java | 31 +++++++++-- 12 files changed, 126 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java index 0afb6cf..23f8bda 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java @@ -44,6 +44,11 @@ class CustomMessageWrapper implements DiscoverySpiCustomMessage { return res == null ? null : new CustomMessageWrapper(res); } + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return delegate.isMutable(); + } + /** * @return Delegate. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java index 13c0b9c..693bbef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java @@ -40,4 +40,9 @@ public interface DiscoveryCustomMessage extends Serializable { * @return Ack message or {@code null} if ack is not required. */ @Nullable public DiscoveryCustomMessage ackMessage(); + + /** + * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes. + */ + public boolean isMutable(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java index ca257a9..5fcd0e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -83,4 +83,9 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { @Nullable @Override public DiscoveryCustomMessage ackMessage() { return null; } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java new file mode 100644 index 0000000..f375777 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import org.apache.ignite.internal.managers.discovery.*; + +import java.util.*; + +/** + * + */ +public abstract class AbstractContinuousMessage implements DiscoveryCustomMessage { + /** Routine ID. */ + protected final UUID routineId; + + /** + * @param id Id. + */ + protected AbstractContinuousMessage(UUID id) { + routineId = id; + } + + /** + * @return Routine ID. + */ + public UUID routineId() { + return routineId; + } + + /** {@inheritDoc} */ + @Override public boolean incrementMinorTopologyVersion() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java index 66892b1..3e3e6fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java @@ -26,13 +26,10 @@ import java.util.*; /** * */ -public class StartRoutineAckDiscoveryMessage implements DiscoveryCustomMessage { +public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage { /** */ private static final long serialVersionUID = 0L; - /** Routine ID. */ - private final UUID routineId; - /** */ private final Map errs; @@ -41,13 +38,9 @@ public class StartRoutineAckDiscoveryMessage implements DiscoveryCustomMessage { * @param errs Errs. */ public StartRoutineAckDiscoveryMessage(UUID routineId, Map errs) { - this.routineId = routineId; - this.errs = new HashMap<>(errs); - } + super(routineId); - /** {@inheritDoc} */ - @Override public boolean incrementMinorTopologyVersion() { - return false; + this.errs = new HashMap<>(errs); } /** {@inheritDoc} */ @@ -56,13 +49,6 @@ public class StartRoutineAckDiscoveryMessage implements DiscoveryCustomMessage { } /** - * @return Routine ID. - */ - public UUID routineId() { - return routineId; - } - - /** * @return Errs. */ public Map errs() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java index 2199fd0..ec0d36b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java @@ -26,13 +26,10 @@ import java.util.*; /** * */ -public class StartRoutineDiscoveryMessage implements DiscoveryCustomMessage { +public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { /** */ private static final long serialVersionUID = 0L; - /** Routine ID. */ - private final UUID routineId; - /** */ private final StartRequestData startReqData; @@ -44,13 +41,9 @@ public class StartRoutineDiscoveryMessage implements DiscoveryCustomMessage { * @param startReqData Start request data. */ public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData) { - this.routineId = routineId; - this.startReqData = startReqData; - } + super(routineId); - /** {@inheritDoc} */ - @Override public boolean incrementMinorTopologyVersion() { - return false; + this.startReqData = startReqData; } /** @@ -69,13 +62,6 @@ public class StartRoutineDiscoveryMessage implements DiscoveryCustomMessage { } /** - * @return Routine ID. - */ - public UUID routineId() { - return routineId; - } - - /** * @return Errs. */ public Map errs() { @@ -83,6 +69,11 @@ public class StartRoutineDiscoveryMessage implements DiscoveryCustomMessage { } /** {@inheritDoc} */ + @Override public boolean isMutable() { + return true; + } + + /** {@inheritDoc} */ @Override public DiscoveryCustomMessage ackMessage() { return new StartRoutineAckDiscoveryMessage(routineId, errs); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java index a640222..350f13c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java @@ -25,34 +25,19 @@ import java.util.*; /** * */ -public class StopRoutineAckDiscoveryMessage implements DiscoveryCustomMessage { +public class StopRoutineAckDiscoveryMessage extends AbstractContinuousMessage { /** */ private static final long serialVersionUID = 0L; - /** Routine ID. */ - private final UUID routineId; - /** * @param routineId Routine id. */ public StopRoutineAckDiscoveryMessage(UUID routineId) { - this.routineId = routineId; - } - - /** {@inheritDoc} */ - @Override public boolean incrementMinorTopologyVersion() { - return false; + super(routineId); } /** {@inheritDoc} */ @Nullable @Override public DiscoveryCustomMessage ackMessage() { return null; } - - /** - * @return Routine ID. - */ - public UUID routineId() { - return routineId; - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java index e8a43a3..5b0dc5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java @@ -25,30 +25,15 @@ import java.util.*; /** * */ -public class StopRoutineDiscoveryMessage implements DiscoveryCustomMessage { +public class StopRoutineDiscoveryMessage extends AbstractContinuousMessage { /** */ private static final long serialVersionUID = 0L; - /** Routine ID. */ - private final UUID routineId; - /** * @param routineId Routine id. */ public StopRoutineDiscoveryMessage(UUID routineId) { - this.routineId = routineId; - } - - /** {@inheritDoc} */ - @Override public boolean incrementMinorTopologyVersion() { - return false; - } - - /** - * @return Routine ID. - */ - public UUID routineId() { - return routineId; + super(routineId); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java index 72ba9db..15e943b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java @@ -32,4 +32,9 @@ public interface DiscoverySpiCustomMessage extends Serializable { * Called when message passed the ring. */ @Nullable public DiscoverySpiCustomMessage ackMessage(); + + /** + * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes. + */ + public boolean isMutable(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index 46e9635..22bb49b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -461,7 +461,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp throw new IgniteException("Failed to send custom message: client is disconnected"); try { - sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(evt))); + sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, marsh.marshal(evt))); } catch (IgniteCheckedException e) { throw new IgniteSpiException("Failed to marshal custom event: " + evt, e); @@ -1481,7 +1481,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp if (node != null && node.visible()) { try { - DiscoverySpiCustomMessage msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader()); + DiscoverySpiCustomMessage msgObj = msg.message(marsh); notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 0164e5c..34e1ca8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1266,7 +1266,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov /** {@inheritDoc} */ @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) { try { - msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(evt))); + msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, marsh.marshal(evt))); } catch (IgniteCheckedException e) { throw new IgniteSpiException("Failed to marshal custom event: " + evt, e); @@ -4536,7 +4536,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov DiscoverySpiCustomMessage msgObj = null; try { - msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader()); + msgObj = msg.message(marsh); } catch (Throwable e) { U.error(log, "Failed to unmarshal discovery custom message.", e); @@ -4547,7 +4547,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (nextMsg != null) { try { - addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(nextMsg))); + addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), nextMsg, + marsh.marshal(nextMsg))); } catch (IgniteCheckedException e) { U.error(log, "Failed to marshal discovery custom message.", e); @@ -4584,13 +4585,11 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov Collection snapshot = hist.get(msg.topologyVersion()); if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) { - assert msg.messageBytes() != null; - TcpDiscoveryNode node = ring.node(msg.creatorNodeId()); if (node != null) { try { - DiscoverySpiCustomMessage msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader()); + DiscoverySpiCustomMessage msgObj = msg.message(marsh); lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, msg.topologyVersion(), @@ -4599,7 +4598,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov hist, msgObj); - msg.messageBytes(marsh.marshal(msgObj)); + if (msgObj.isMutable()) + msg.message(msgObj, marsh.marshal(msgObj)); } catch (Throwable e) { U.error(log, "Failed to unmarshal discovery custom message.", e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java index 372aa18..0739c1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java @@ -18,6 +18,9 @@ package org.apache.ignite.spi.discovery.tcp.messages; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.spi.discovery.*; +import org.jetbrains.annotations.*; import java.util.*; @@ -31,15 +34,21 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage private static final long serialVersionUID = 0L; /** */ + private transient volatile DiscoverySpiCustomMessage msg; + + /** */ private byte[] msgBytes; /** * @param creatorNodeId Creator node id. + * @param msg Message. * @param msgBytes Serialized message. */ - public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, byte[] msgBytes) { + public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, @Nullable DiscoverySpiCustomMessage msg, + @NotNull byte[] msgBytes) { super(creatorNodeId); + this.msg = msg; this.msgBytes = msgBytes; } @@ -51,12 +60,28 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage } /** - * @param msgBytes New message bytes. + * @param msg Message. + * @param msgBytes Serialized message. */ - public void messageBytes(byte[] msgBytes) { + public void message(@Nullable DiscoverySpiCustomMessage msg, @NotNull byte[] msgBytes) { + this.msg = msg; this.msgBytes = msgBytes; } + /** + * @return Deserialized message, + * @throws java.lang.Throwable if unmarshal failed. + */ + @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh) throws Throwable { + if (msg == null) { + msg = marsh.unmarshal(msgBytes, U.gridClassLoader()); + + assert msg != null; + } + + return msg; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryCustomEventMessage.class, this, "super", super.toString());