Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C8E1718F2C for ; Fri, 26 Feb 2016 13:16:20 +0000 (UTC) Received: (qmail 41270 invoked by uid 500); 26 Feb 2016 13:16:20 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 41169 invoked by uid 500); 26 Feb 2016 13:16:20 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 40241 invoked by uid 99); 26 Feb 2016 13:16:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Feb 2016 13:16:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EE369E8EF2; Fri, 26 Feb 2016 13:16:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Fri, 26 Feb 2016 13:16:36 -0000 Message-Id: <2a587dc07886443dbcdcf43255f752d0@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [18/35] ignite git commit: IGNITE-2683 http://git-wip-us.apache.org/repos/asf/ignite/blob/f33cc0ce/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index b12f7a6..9b2bf46 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -885,7 +885,7 @@ class ClientImpl extends TcpDiscoveryImpl { TcpDiscoveryAbstractMessage msg; try { - msg = spi.marsh.unmarshal(in, U.gridClassLoader()); + msg = spi.marsh.unmarshal(in, U.resolveClassLoader(spi.ignite().configuration())); } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) @@ -1210,7 +1210,8 @@ class ClientImpl extends TcpDiscoveryImpl { List msgs = null; while (!isInterrupted()) { - TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader()); + TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, + U.resolveClassLoader(spi.ignite().configuration())); if (msg instanceof TcpDiscoveryClientReconnectMessage) { TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg; @@ -1642,7 +1643,8 @@ class ClientImpl extends TcpDiscoveryImpl { Map data = msg.newNodeDiscoveryData(); if (data != null) - spi.onExchange(newNodeId, newNodeId, data, null); + spi.onExchange(newNodeId, newNodeId, data, + U.resolveClassLoader(spi.ignite().configuration())); } } else { @@ -1666,7 +1668,7 @@ class ClientImpl extends TcpDiscoveryImpl { if (dataMap != null) { for (Map.Entry> entry : dataMap.entrySet()) spi.onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(), - U.resolveClassLoader(spi.ignite().configuration().getClassLoader())); + U.resolveClassLoader(spi.ignite().configuration())); } locNode.setAttributes(msg.clientNodeAttributes()); @@ -1963,7 +1965,7 @@ class ClientImpl extends TcpDiscoveryImpl { if (node != null && node.visible()) { try { DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh, - spi.ignite().configuration().getClassLoader()); + U.resolveClassLoader(spi.ignite().configuration())); notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f33cc0ce/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index fa0ae1c..3ce983e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2325,7 +2325,8 @@ class ServerImpl extends TcpDiscoveryImpl { if (marshalledMsg == null) marshalledMsg = spi.marsh.marshal(msg); - msgClone = spi.marsh.unmarshal(marshalledMsg, null); + msgClone = spi.marsh.unmarshal(marshalledMsg, + U.resolveClassLoader(spi.ignite().configuration())); } catch (IgniteCheckedException e) { U.error(log, "Failed to marshal message: " + msg, e); @@ -3545,7 +3546,7 @@ class ServerImpl extends TcpDiscoveryImpl { SecurityContext coordSubj = spi.marsh.unmarshal( node.attribute(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT), - U.gridClassLoader()); + U.resolveClassLoader(spi.ignite().configuration())); if (!permissionsEqual(coordSubj.subject().permissions(), subj.subject().permissions())) { // Node has not pass authentication. @@ -3601,7 +3602,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (data != null) spi.onExchange(node.id(), node.id(), data, - U.resolveClassLoader(spi.ignite().configuration().getClassLoader())); + U.resolveClassLoader(spi.ignite().configuration())); msg.addDiscoveryData(locNodeId, spi.collectExchangeData(node.id())); @@ -3681,7 +3682,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (dataMap != null) { for (Map.Entry> entry : dataMap.entrySet()) spi.onExchange(node.id(), entry.getKey(), entry.getValue(), - U.resolveClassLoader(spi.ignite().configuration().getClassLoader())); + U.resolveClassLoader(spi.ignite().configuration())); } processMessageFailedNodes(msg); @@ -4608,7 +4609,7 @@ class ServerImpl extends TcpDiscoveryImpl { DiscoverySpiCustomMessage msgObj = null; try { - msgObj = msg.message(spi.marsh, spi.ignite().configuration().getClassLoader()); + msgObj = msg.message(spi.marsh, U.resolveClassLoader(spi.ignite().configuration())); } catch (Throwable e) { U.error(log, "Failed to unmarshal discovery custom message.", e); @@ -4735,7 +4736,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (node != null) { try { DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh, - spi.ignite().configuration().getClassLoader()); + U.resolveClassLoader(spi.ignite().configuration())); lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, msg.topologyVersion(), @@ -5181,7 +5182,8 @@ class ServerImpl extends TcpDiscoveryImpl { while (!isInterrupted()) { try { - TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader()); + TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, + U.resolveClassLoader(spi.ignite().configuration())); msg.senderNodeId(nodeId); http://git-wip-us.apache.org/repos/asf/ignite/blob/f33cc0ce/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 0d41cd2..277055a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1466,7 +1466,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T try { sock.setSoTimeout((int)timeout); - T res = marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader()); + T res = marsh.unmarshal(in == null ? sock.getInputStream() : in, + U.resolveClassLoader(ignite.configuration())); return res; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f33cc0ce/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java index 9064080..2c759a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java @@ -77,7 +77,7 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage */ @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, ClassLoader ldr) throws Throwable { if (msg == null) { - msg = marsh.unmarshal(msgBytes, U.resolveClassLoader(ldr)); + msg = marsh.unmarshal(msgBytes, ldr); assert msg != null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f33cc0ce/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java index 1fae875..7508e3d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java @@ -26,12 +26,14 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import javax.cache.configuration.CacheEntryListenerConfiguration; import javax.cache.configuration.Factory; import javax.cache.configuration.MutableCacheEntryListenerConfiguration; import javax.cache.event.CacheEntryListener; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteMessaging; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.CacheRebalanceMode; @@ -42,11 +44,13 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventAdapter; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.P2; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.CachePluginConfiguration; @@ -61,9 +65,12 @@ import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS; +import static org.apache.ignite.events.EventType.EVTS_ALL; +import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED; + /** * Tests for replicated cache preloader. */ @@ -86,9 +93,17 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { /** */ private volatile boolean useExtClassLoader = false; + private volatile boolean disableP2p = false; + /** */ private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + /** */ + private static volatile CountDownLatch latch; + + /** */ + private static boolean cutromEvt = false; + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest(); @@ -118,6 +133,9 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { cfg.setEventStorageSpi(spi); + if (disableP2p) + cfg.setPeerClassLoadingEnabled(false); + if (getTestGridName(1).equals(gridName) || useExtClassLoader || cfg.getMarshaller() instanceof BinaryMarshaller) cfg.setClassLoader(getExternalClassLoader()); @@ -125,6 +143,16 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { if (isClient) cfg.setClientMode(true); + if (cutromEvt) { + int[] evts = new int[EVTS_ALL.length + 1]; + + evts[0] = Integer.MAX_VALUE - 1; + + System.arraycopy(EVTS_ALL, 0, evts, 1, EVTS_ALL.length); + + cfg.setIncludeEventTypes(evts); + } + return cfg; } @@ -474,6 +502,113 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { /** * @throws Exception If test failed. */ + public void testExternalClassesAtMessage() throws Exception { + try { + useExtClassLoader = true; + disableP2p = true; + + final Class cls = (Class)getExternalClassLoader(). + loadClass("org.apache.ignite.tests.p2p.CacheDeploymentExternalizableTestValue"); + + Ignite g1 = startGrid(1); + startGrid(2); + + IgniteMessaging rmtMsg = g1.message(); + + latch = new CountDownLatch(2); + + rmtMsg.remoteListen("MyOrderedTopic", new MessageListener()); + + Object o = cls.newInstance(); + + o.toString(); + + rmtMsg.send("MyOrderedTopic", o); + rmtMsg.sendOrdered("MyOrderedTopic", o, 0); + + latch.await(); + + // Custom topic. + + final Class cls2 = (Class)getExternalClassLoader(). + loadClass("org.apache.ignite.tests.p2p.CacheDeploymentTestEnumValue"); + + Object topic = cls2.getEnumConstants()[0]; + + latch = new CountDownLatch(2); + + rmtMsg.remoteListen(topic, new MessageListener()); + + rmtMsg.send(topic, topic); + rmtMsg.sendOrdered(topic, topic, 0); + + latch.await(); + + } + finally { + useExtClassLoader = false; + disableP2p = false; + } + } + + /** + * @throws Exception If test failed. + */ + public void testExternalClassesAtEventP2pDisabled() throws Exception { + testExternalClassesAtEvent0(true); + + } + + /** + * @throws Exception If test failed. + */ + public void testExternalClassesAtEvent() throws Exception { + testExternalClassesAtEvent0(false); + } + + /** + * @throws Exception If test failed. + */ + private void testExternalClassesAtEvent0(boolean p2p) throws Exception { + try { + useExtClassLoader = true; + cutromEvt = true; + + if (p2p) + disableP2p = true; + + final Class cls = (Class)getExternalClassLoader(). + loadClass("org.apache.ignite.tests.p2p.CacheDeploymentExternalizableTestValue"); + final Class cls2 = (Class)getExternalClassLoader(). + loadClass("org.apache.ignite.tests.p2p.GridEventConsumeFilter"); + + Ignite g1 = startGrid(1); + startGrid(2); + + latch = new CountDownLatch(3); + + g1.events().localListen((IgnitePredicate)cls2.newInstance(), EVT_CACHE_OBJECT_PUT); + g1.events().localListen(new EventListener(), EVT_CACHE_OBJECT_PUT); + + g1.events().remoteListen(null, (IgnitePredicate)cls2.newInstance(), EVT_CACHE_OBJECT_PUT); + g1.events().remoteListen(null, new EventListener(), EVT_CACHE_OBJECT_PUT); + + g1.cache(null).put("1", cls.newInstance()); + + latch.await(); + } + finally { + useExtClassLoader = false; + cutromEvt = false; + + if (p2p) + disableP2p = false; + } + } + + /** + * @throws Exception If test failed. + */ public void testSync() throws Exception { preloadMode = SYNC; batchSize = 512; @@ -773,4 +908,32 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { // No-op. } } + + /** + * + */ + private static class MessageListener implements P2 { + /** + * @param nodeId + * @param msg + * @return + */ + @Override public boolean apply(UUID nodeId, Object msg) { + System.out.println("Received message [msg=" + msg + ", from=" + nodeId + ']'); + + latch.countDown(); + + return true; // Return true to continue listening. + } + } + + private static class EventListener implements IgnitePredicate { + @Override public boolean apply(Event evt) { + System.out.println("Cache event: " + evt); + + latch.countDown(); + + return true; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f33cc0ce/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentExternalizableTestValue.java ---------------------------------------------------------------------- diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentExternalizableTestValue.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentExternalizableTestValue.java new file mode 100644 index 0000000..c5c0b4d --- /dev/null +++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentExternalizableTestValue.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.p2p; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; + +/** + * Test value for deployment. + */ +public class CacheDeploymentExternalizableTestValue implements Serializable { + /** */ + private CacheDeploymentExternalizableTestValue2 field; + + /** + * @return value + */ + public CacheDeploymentExternalizableTestValue2 getField() { + return field; + } + + /** + * @param field field + */ + public void setField( + CacheDeploymentExternalizableTestValue2 field) { + this.field = field; + } + + /** {@inheritDoc} */ + @Override public String toString() { + field = new CacheDeploymentExternalizableTestValue2(); + + return super.toString(); + } + + /** + * + */ + public static class CacheDeploymentExternalizableTestValue2 implements Externalizable { + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f33cc0ce/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 288c2b3..cae1a9f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1501,7 +1501,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { } @Override public Object deserialize(byte[] bytes) throws Exception { - return marshaller.unmarshal(bytes, null); + ClassLoader clsLdr = ctx != null ? U.resolveClassLoader(ctx.config()) : null; + + return marshaller.unmarshal(bytes, clsLdr); } }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f33cc0ce/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 21541ec..3d226e5 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -411,7 +411,7 @@ public class GridMapQueryExecutor { Marshaller m = ctx.config().getMarshaller(); for (GridCacheSqlQuery qry : qrys) - qry.unmarshallParams(m); + qry.unmarshallParams(m, ctx); } } catch (IgniteCheckedException e) {