Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4133718B41 for ; Thu, 18 Feb 2016 13:29:56 +0000 (UTC) Received: (qmail 16845 invoked by uid 500); 18 Feb 2016 13:29:56 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 16732 invoked by uid 500); 18 Feb 2016 13:29:56 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 16487 invoked by uid 99); 18 Feb 2016 13:29:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Feb 2016 13:29:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C479AE0534; Thu, 18 Feb 2016 13:29:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Thu, 18 Feb 2016 13:30:03 -0000 Message-Id: <2776b55df3a146e0a357eaba446598f3@git.apache.org> In-Reply-To: <30cdbca2fa7b4d4592b9857e3988bedd@git.apache.org> References: <30cdbca2fa7b4d4592b9857e3988bedd@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/13] ignite git commit: IGNITE-2249 - Do not deserialize services on client node IGNITE-2249 - Do not deserialize services on client node Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d5f77e2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d5f77e2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d5f77e2 Branch: refs/heads/ignite-1786 Commit: 7d5f77e2f9ad80ec298b96452e5f55f737a01701 Parents: 2ad4b5c Author: Valentin Kulichenko Authored: Wed Feb 17 18:21:54 2016 -0800 Committer: Valentin Kulichenko Committed: Wed Feb 17 18:21:54 2016 -0800 ---------------------------------------------------------------------- .../ignite/internal/MarshallerContextImpl.java | 3 +- .../binary/CacheObjectBinaryProcessorImpl.java | 3 +- .../CacheDataStructuresManager.java | 4 +- .../continuous/CacheContinuousQueryHandler.java | 21 ++- .../continuous/CacheContinuousQueryManager.java | 18 ++- .../datastructures/DataStructuresProcessor.java | 1 + .../service/GridServiceProcessor.java | 101 ++++++++++++- .../processors/service/GridServiceProxy.java | 22 +-- .../GridServiceSerializationSelfTest.java | 149 +++++++++++++++++++ .../testsuites/IgniteKernalSelfTestSuite.java | 2 + .../hadoop/jobtracker/HadoopJobTracker.java | 5 +- 11 files changed, 287 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java index e3f2bc9..05fe8ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java @@ -83,7 +83,8 @@ public class MarshallerContextImpl extends MarshallerContextAdapter { new ContinuousQueryListener(ctx.log(MarshallerContextImpl.class), workDir), null, ctx.cache().marshallerCache().context().affinityNode(), - true + true, + false ); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index e0da8d1..624a453 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -277,7 +277,8 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm new MetaDataEntryListener(), new MetaDataEntryFilter(), false, - true); + true, + false); while (true) { ClusterNode oldestSrvNode = http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index 47c3dd9..b42e5e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -37,7 +37,6 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteSet; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -285,7 +284,8 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { }, new QueueHeaderPredicate(), cctx.isLocal() || (cctx.isReplicated() && cctx.affinityNode()), - true); + true, + false); } GridCacheQueueProxy queue = queuesMap.get(hdr.id()); http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 08fe62a..0324e41 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -151,6 +151,9 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler /** */ private AffinityTopologyVersion initTopVer; + /** */ + private transient boolean ignoreClassNotFound; + /** * Required by {@link Externalizable}. */ @@ -188,7 +191,8 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler int taskHash, boolean skipPrimaryCheck, boolean locCache, - boolean keepBinary) { + boolean keepBinary, + boolean ignoreClassNotFound) { assert topic != null; assert locLsnr != null; @@ -205,6 +209,7 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler this.skipPrimaryCheck = skipPrimaryCheck; this.locCache = locCache; this.keepBinary = keepBinary; + this.ignoreClassNotFound = ignoreClassNotFound; cacheId = CU.cacheId(cacheName); } @@ -566,6 +571,8 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler final GridCacheContext cctx = cacheContext(ctx); + Collection entries0 = new ArrayList<>(); + for (CacheContinuousQueryEntry e : entries) { GridCacheDeploymentManager depMgr = cctx.deploy(); @@ -582,19 +589,19 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler try { e.unmarshal(cctx, ldr); + + entries0.addAll(handleEvent(ctx, e)); } catch (IgniteCheckedException ex) { - U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex); + if (ignoreClassNotFound) + assert internal; + else + U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex); } } final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); - Collection entries0 = new ArrayList<>(); - - for (CacheContinuousQueryEntry e : entries) - entries0.addAll(handleEvent(ctx, e)); - if (!entries0.isEmpty()) { Iterable> evts = F.viewReadOnly(entries0, new C1>() { http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 840a61b..409c1da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -433,7 +433,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { false, true, loc, - keepBinary); + keepBinary, + false); } /** @@ -447,7 +448,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { public UUID executeInternalQuery(CacheEntryUpdatedListener locLsnr, CacheEntryEventSerializableFilter rmtFilter, boolean loc, - boolean notifyExisting) + boolean notifyExisting, + boolean ignoreClassNotFound) throws IgniteCheckedException { return executeQuery0( @@ -462,7 +464,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { false, true, loc, - false); + false, + ignoreClassNotFound); } /** @@ -560,7 +563,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean sync, boolean ignoreExpired, boolean loc, - final boolean keepBinary) throws IgniteCheckedException + final boolean keepBinary, + boolean ignoreClassNotFound) throws IgniteCheckedException { cctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -582,7 +586,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { taskNameHash, skipPrimaryCheck, cctx.isLocal(), - keepBinary); + keepBinary, + ignoreClassNotFound); IgnitePredicate pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ? F.nodeForNodeId(cctx.localNodeId()) : F.alwaysTrue(); @@ -790,7 +795,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { cfg.isSynchronous(), false, false, - keepBinary); + keepBinary, + false); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 98848ee..445fc3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -248,6 +248,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(), new DataStructuresEntryFilter(), dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(), + false, false); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 2841083..1a48e8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -44,8 +44,10 @@ import org.apache.ignite.configuration.DeploymentMode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; +import org.apache.ignite.internal.GridClosureCallMode; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; @@ -58,21 +60,26 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.query.CacheQuery; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.services.Service; import org.apache.ignite.services.ServiceConfiguration; import org.apache.ignite.services.ServiceDescriptor; @@ -166,11 +173,13 @@ public class GridServiceProcessor extends GridProcessorAdapter { if (ctx.deploy().enabled()) ctx.cache().context().deploy().ignoreOwnership(true); + boolean affNode = cache.context().affinityNode(); + cfgQryId = cache.context().continuousQueries().executeInternalQuery( - new DeploymentListener(), null, cache.context().affinityNode(), true); + new DeploymentListener(), null, affNode, true, !affNode); assignQryId = cache.context().continuousQueries().executeInternalQuery( - new AssignmentListener(), null, cache.context().affinityNode(), true); + new AssignmentListener(), null, affNode, true, !affNode); } finally { if (ctx.deploy().enabled()) @@ -544,6 +553,38 @@ public class GridServiceProcessor extends GridProcessorAdapter { } /** + * @param name Service name. + * @return Service topology. + */ + public Map serviceTopology(String name) throws IgniteCheckedException { + ClusterNode node = cache.affinity().mapKeyToNode(name); + + if (node.version().compareTo(ServiceTopologyCallable.SINCE_VER) >= 0) { + return ctx.closure().callAsyncNoFailover( + GridClosureCallMode.BALANCE, + new ServiceTopologyCallable(name), + Collections.singletonList(node), + false + ).get(); + } + else + return serviceTopology(cache, name); + } + + /** + * @param cache Utility cache. + * @param svcName Service name. + * @return Service topology. + * @throws IgniteCheckedException In case of error. + */ + private static Map serviceTopology(IgniteInternalCache cache, String svcName) + throws IgniteCheckedException { + GridServiceAssignments val = (GridServiceAssignments)cache.get(new GridServiceAssignmentsKey(svcName)); + + return val != null ? val.assigns() : null; + } + + /** * @return Collection of service descriptors. */ public Collection serviceDescriptors() { @@ -1069,7 +1110,17 @@ public class GridServiceProcessor extends GridProcessorAdapter { if (!(e.getKey() instanceof GridServiceDeploymentKey)) continue; - GridServiceDeployment dep = (GridServiceDeployment)e.getValue(); + GridServiceDeployment dep; + + try { + dep = (GridServiceDeployment)e.getValue(); + } + catch (IgniteException ex) { + if (X.hasCause(ex, ClassNotFoundException.class)) + continue; + else + throw ex; + } if (dep != null) { svcName.set(dep.configuration().getName()); @@ -1346,7 +1397,17 @@ public class GridServiceProcessor extends GridProcessorAdapter { if (!(e.getKey() instanceof GridServiceAssignmentsKey)) continue; - GridServiceAssignments assigns = (GridServiceAssignments)e.getValue(); + GridServiceAssignments assigns; + + try { + assigns = (GridServiceAssignments)e.getValue(); + } + catch (IgniteException ex) { + if (X.hasCause(ex, ClassNotFoundException.class)) + continue; + else + throw ex; + } if (assigns != null) { svcName.set(assigns.name()); @@ -1467,4 +1528,34 @@ public class GridServiceProcessor extends GridProcessorAdapter { return S.toString(ServiceAssignmentsPredicate.class, this); } } -} \ No newline at end of file + + /** + */ + @GridInternal + private static class ServiceTopologyCallable implements IgniteCallable> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final IgniteProductVersion SINCE_VER = IgniteProductVersion.fromString("1.5.7"); + + /** */ + private final String svcName; + + /** */ + @IgniteInstanceResource + private IgniteEx ignite; + + /** + * @param svcName Service name. + */ + public ServiceTopologyCallable(String svcName) { + this.svcName = svcName; + } + + /** {@inheritDoc} */ + @Override public Map call() throws Exception { + return serviceTopology(ignite.context().cache().utilityCache(), svcName); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java index e54ec7b..6bec8ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java @@ -47,7 +47,6 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.services.ServiceDescriptor; import org.jsr166.ThreadLocalRandom8; import static org.apache.ignite.internal.GridClosureCallMode.BALANCE; @@ -210,7 +209,7 @@ class GridServiceProxy implements Serializable { * @param name Service name. * @return Node with deployed service or {@code null} if there is no such node. */ - private ClusterNode nodeForService(String name, boolean sticky) { + private ClusterNode nodeForService(String name, boolean sticky) throws IgniteCheckedException { do { // Repeat if reference to remote node was changed. if (sticky) { ClusterNode curNode = rmtNode.get(); @@ -237,11 +236,11 @@ class GridServiceProxy implements Serializable { * @return Local node if it has a given service deployed or randomly chosen remote node, * otherwise ({@code null} if given service is not deployed on any node. */ - private ClusterNode randomNodeForService(String name) { + private ClusterNode randomNodeForService(String name) throws IgniteCheckedException { if (hasLocNode && ctx.service().service(name) != null) return ctx.discovery().localNode(); - Map snapshot = serviceTopology(name); + Map snapshot = ctx.service().serviceTopology(name); if (snapshot == null || snapshot.isEmpty()) return null; @@ -307,19 +306,6 @@ class GridServiceProxy implements Serializable { return null; } - - /** - * @param name Service name. - * @return Map of number of service instances per node ID. - */ - private Map serviceTopology(String name) { - for (ServiceDescriptor desc : ctx.service().serviceDescriptors()) { - if (desc.name().equals(name)) - return desc.topologySnapshot(); - } - - return null; - } } /** @@ -403,4 +389,4 @@ class GridServiceProxy implements Serializable { return S.toString(ServiceProxyCallable.class, this); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceSerializationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceSerializationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceSerializationSelfTest.java new file mode 100644 index 0000000..f709dfe --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceSerializationSelfTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.service; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.Ignition; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.services.Service; +import org.apache.ignite.services.ServiceContext; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.thread.IgniteThread; + +/** + * Service serialization test. + */ +public class GridServiceSerializationSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testServiceSerialization() throws Exception { + try { + Ignite server = startGridsMultiThreaded(3); + + Ignition.setClientMode(true); + + Ignite client = startGrid("client"); + + server.services(server.cluster().forServers()) + .deployClusterSingleton("my-service", new MyServiceImpl()); + + MyService svc = client.services().serviceProxy("my-service", MyService.class, false); + + svc.hello(); + + assert MyServiceImpl.latch.await(2000, TimeUnit.MILLISECONDS); + + assertEquals(0, MyServiceImpl.cnt.get()); + } + finally { + stopAllGrids(); + } + } + + /** + */ + private static interface MyService extends Service { + /** */ + void hello(); + } + + /** + */ + private static class MyServiceImpl implements MyService, Externalizable { + /** */ + static final AtomicInteger cnt = new AtomicInteger(); + + /** */ + static final CountDownLatch latch = new CountDownLatch(1); + + /** + */ + public MyServiceImpl() throws ClassNotFoundException { + if (clientThread()) + throw new ClassNotFoundException("Expected ClassNotFoundException"); + } + + /** {@inheritDoc} */ + @Override public void cancel(ServiceContext ctx) { + if (clientThread()) + cnt.incrementAndGet(); + } + + /** {@inheritDoc} */ + @Override public void init(ServiceContext ctx) throws Exception { + if (clientThread()) + cnt.incrementAndGet(); + } + + /** {@inheritDoc} */ + @Override public void execute(ServiceContext ctx) throws Exception { + if (clientThread()) + cnt.incrementAndGet(); + } + + /** {@inheritDoc} */ + @Override public void hello() { + if (clientThread()) + cnt.incrementAndGet(); + + latch.countDown(); + } + + /** + * @return If current thread belongs to client. + */ + private boolean clientThread() { + assert Thread.currentThread() instanceof IgniteThread; + + return ((IgniteThread)Thread.currentThread()).getGridName().contains("client"); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java index d9e9b0f..214d375 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.service.GridServiceProcessorProxySe import org.apache.ignite.internal.processors.service.GridServiceProcessorSingleNodeSelfTest; import org.apache.ignite.internal.processors.service.GridServiceProcessorStopSelfTest; import org.apache.ignite.internal.processors.service.GridServiceReassignmentSelfTest; +import org.apache.ignite.internal.processors.service.GridServiceSerializationSelfTest; import org.apache.ignite.internal.processors.service.ServicePredicateAccessCacheTest; import org.apache.ignite.internal.util.GridStartupWithSpecifiedWorkDirectorySelfTest; import org.apache.ignite.internal.util.GridStartupWithUndefinedIgniteHomeSelfTest; @@ -123,6 +124,7 @@ public class IgniteKernalSelfTestSuite extends TestSuite { suite.addTestSuite(GridServiceProcessorStopSelfTest.class); suite.addTestSuite(ServicePredicateAccessCacheTest.class); suite.addTestSuite(GridServicePackagePrivateSelfTest.class); + suite.addTestSuite(GridServiceSerializationSelfTest.class); return suite; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java index 81ff8ea..f4cf892 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java @@ -255,7 +255,8 @@ public class HadoopJobTracker extends HadoopComponent { }, null, true, - true + true, + false ); ctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() { @@ -1690,4 +1691,4 @@ public class HadoopJobTracker extends HadoopComponent { */ protected abstract void update(HadoopJobMetadata meta, HadoopJobMetadata cp); } -} \ No newline at end of file +}