ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [09/13] ignite git commit: IGNITE-2249 - Do not deserialize services on client node
Date Thu, 18 Feb 2016 13:30:03 GMT
IGNITE-2249 - Do not deserialize services on client node


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d5f77e2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d5f77e2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d5f77e2

Branch: refs/heads/ignite-1786
Commit: 7d5f77e2f9ad80ec298b96452e5f55f737a01701
Parents: 2ad4b5c
Author: Valentin Kulichenko <valentin.kulichenko@gmail.com>
Authored: Wed Feb 17 18:21:54 2016 -0800
Committer: Valentin Kulichenko <valentin.kulichenko@gmail.com>
Committed: Wed Feb 17 18:21:54 2016 -0800

----------------------------------------------------------------------
 .../ignite/internal/MarshallerContextImpl.java  |   3 +-
 .../binary/CacheObjectBinaryProcessorImpl.java  |   3 +-
 .../CacheDataStructuresManager.java             |   4 +-
 .../continuous/CacheContinuousQueryHandler.java |  21 ++-
 .../continuous/CacheContinuousQueryManager.java |  18 ++-
 .../datastructures/DataStructuresProcessor.java |   1 +
 .../service/GridServiceProcessor.java           | 101 ++++++++++++-
 .../processors/service/GridServiceProxy.java    |  22 +--
 .../GridServiceSerializationSelfTest.java       | 149 +++++++++++++++++++
 .../testsuites/IgniteKernalSelfTestSuite.java   |   2 +
 .../hadoop/jobtracker/HadoopJobTracker.java     |   5 +-
 11 files changed, 287 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index e3f2bc9..05fe8ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -83,7 +83,8 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
             new ContinuousQueryListener(ctx.log(MarshallerContextImpl.class), workDir),
             null,
             ctx.cache().marshallerCache().context().affinityNode(),
-            true
+            true,
+            false
         );
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index e0da8d1..624a453 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -277,7 +277,8 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
                 new MetaDataEntryListener(),
                 new MetaDataEntryFilter(),
                 false,
-                true);
+                true,
+                false);
 
             while (true) {
                 ClusterNode oldestSrvNode =

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index 47c3dd9..b42e5e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -37,7 +37,6 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteSet;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -285,7 +284,8 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter
{
                     },
                     new QueueHeaderPredicate(),
                     cctx.isLocal() || (cctx.isReplicated() && cctx.affinityNode()),
-                    true);
+                    true,
+                    false);
             }
 
             GridCacheQueueProxy queue = queuesMap.get(hdr.id());

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 08fe62a..0324e41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -151,6 +151,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     /** */
     private AffinityTopologyVersion initTopVer;
 
+    /** */
+    private transient boolean ignoreClassNotFound;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -188,7 +191,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         int taskHash,
         boolean skipPrimaryCheck,
         boolean locCache,
-        boolean keepBinary) {
+        boolean keepBinary,
+        boolean ignoreClassNotFound) {
         assert topic != null;
         assert locLsnr != null;
 
@@ -205,6 +209,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         this.skipPrimaryCheck = skipPrimaryCheck;
         this.locCache = locCache;
         this.keepBinary = keepBinary;
+        this.ignoreClassNotFound = ignoreClassNotFound;
 
         cacheId = CU.cacheId(cacheName);
     }
@@ -566,6 +571,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         final GridCacheContext cctx = cacheContext(ctx);
 
+        Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>();
+
         for (CacheContinuousQueryEntry e : entries) {
             GridCacheDeploymentManager depMgr = cctx.deploy();
 
@@ -582,19 +589,19 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
             try {
                 e.unmarshal(cctx, ldr);
+
+                entries0.addAll(handleEvent(ctx, e));
             }
             catch (IgniteCheckedException ex) {
-                U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex);
+                if (ignoreClassNotFound)
+                    assert internal;
+                else
+                    U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex);
             }
         }
 
         final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
 
-        Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>();
-
-        for (CacheContinuousQueryEntry e : entries)
-            entries0.addAll(handleEvent(ctx, e));
-
         if (!entries0.isEmpty()) {
             Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
                 new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends
V>>() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 840a61b..409c1da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -433,7 +433,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
             false,
             true,
             loc,
-            keepBinary);
+            keepBinary,
+            false);
     }
 
     /**
@@ -447,7 +448,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
     public UUID executeInternalQuery(CacheEntryUpdatedListener<?, ?> locLsnr,
         CacheEntryEventSerializableFilter rmtFilter,
         boolean loc,
-        boolean notifyExisting)
+        boolean notifyExisting,
+        boolean ignoreClassNotFound)
         throws IgniteCheckedException
     {
         return executeQuery0(
@@ -462,7 +464,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
             false,
             true,
             loc,
-            false);
+            false,
+            ignoreClassNotFound);
     }
 
     /**
@@ -560,7 +563,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
         boolean sync,
         boolean ignoreExpired,
         boolean loc,
-        final boolean keepBinary) throws IgniteCheckedException
+        final boolean keepBinary,
+        boolean ignoreClassNotFound) throws IgniteCheckedException
     {
         cctx.checkSecurity(SecurityPermission.CACHE_READ);
 
@@ -582,7 +586,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
             taskNameHash,
             skipPrimaryCheck,
             cctx.isLocal(),
-            keepBinary);
+            keepBinary,
+            ignoreClassNotFound);
 
         IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() ==
CacheMode.LOCAL) ?
             F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue();
@@ -790,7 +795,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
                 cfg.isSynchronous(),
                 false,
                 false,
-                keepBinary);
+                keepBinary,
+                false);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 98848ee..445fc3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -248,6 +248,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
                     qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(),
                         new DataStructuresEntryFilter(),
                         dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(),
+                        false,
                         false);
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 2841083..1a48e8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -44,8 +44,10 @@ import org.apache.ignite.configuration.DeploymentMode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.GridClosureCallMode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -58,21 +60,26 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.query.CacheQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.services.Service;
 import org.apache.ignite.services.ServiceConfiguration;
 import org.apache.ignite.services.ServiceDescriptor;
@@ -166,11 +173,13 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             if (ctx.deploy().enabled())
                 ctx.cache().context().deploy().ignoreOwnership(true);
 
+            boolean affNode = cache.context().affinityNode();
+
             cfgQryId = cache.context().continuousQueries().executeInternalQuery(
-                new DeploymentListener(), null, cache.context().affinityNode(), true);
+                new DeploymentListener(), null, affNode, true, !affNode);
 
             assignQryId = cache.context().continuousQueries().executeInternalQuery(
-                new AssignmentListener(), null, cache.context().affinityNode(), true);
+                new AssignmentListener(), null, affNode, true, !affNode);
         }
         finally {
             if (ctx.deploy().enabled())
@@ -544,6 +553,38 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param name Service name.
+     * @return Service topology.
+     */
+    public Map<UUID, Integer> serviceTopology(String name) throws IgniteCheckedException
{
+        ClusterNode node = cache.affinity().mapKeyToNode(name);
+
+        if (node.version().compareTo(ServiceTopologyCallable.SINCE_VER) >= 0) {
+            return ctx.closure().callAsyncNoFailover(
+                GridClosureCallMode.BALANCE,
+                new ServiceTopologyCallable(name),
+                Collections.singletonList(node),
+                false
+            ).get();
+        }
+        else
+            return serviceTopology(cache, name);
+    }
+
+    /**
+     * @param cache Utility cache.
+     * @param svcName Service name.
+     * @return Service topology.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private static Map<UUID, Integer> serviceTopology(IgniteInternalCache<Object,
Object> cache, String svcName)
+        throws IgniteCheckedException {
+        GridServiceAssignments val = (GridServiceAssignments)cache.get(new GridServiceAssignmentsKey(svcName));
+
+        return val != null ? val.assigns() : null;
+    }
+
+    /**
      * @return Collection of service descriptors.
      */
     public Collection<ServiceDescriptor> serviceDescriptors() {
@@ -1069,7 +1110,17 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                         if (!(e.getKey() instanceof GridServiceDeploymentKey))
                             continue;
 
-                        GridServiceDeployment dep = (GridServiceDeployment)e.getValue();
+                        GridServiceDeployment dep;
+
+                        try {
+                            dep = (GridServiceDeployment)e.getValue();
+                        }
+                        catch (IgniteException ex) {
+                            if (X.hasCause(ex, ClassNotFoundException.class))
+                                continue;
+                            else
+                                throw ex;
+                        }
 
                         if (dep != null) {
                             svcName.set(dep.configuration().getName());
@@ -1346,7 +1397,17 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                         if (!(e.getKey() instanceof GridServiceAssignmentsKey))
                             continue;
 
-                        GridServiceAssignments assigns = (GridServiceAssignments)e.getValue();
+                        GridServiceAssignments assigns;
+
+                        try {
+                            assigns = (GridServiceAssignments)e.getValue();
+                        }
+                        catch (IgniteException ex) {
+                            if (X.hasCause(ex, ClassNotFoundException.class))
+                                continue;
+                            else
+                                throw ex;
+                        }
 
                         if (assigns != null) {
                             svcName.set(assigns.name());
@@ -1467,4 +1528,34 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             return S.toString(ServiceAssignmentsPredicate.class, this);
         }
     }
-}
\ No newline at end of file
+
+    /**
+     */
+    @GridInternal
+    private static class ServiceTopologyCallable implements IgniteCallable<Map<UUID,
Integer>> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private static final IgniteProductVersion SINCE_VER = IgniteProductVersion.fromString("1.5.7");
+
+        /** */
+        private final String svcName;
+
+        /** */
+        @IgniteInstanceResource
+        private IgniteEx ignite;
+
+        /**
+         * @param svcName Service name.
+         */
+        public ServiceTopologyCallable(String svcName) {
+            this.svcName = svcName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<UUID, Integer> call() throws Exception {
+            return serviceTopology(ignite.context().cache().utilityCache(), svcName);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
index e54ec7b..6bec8ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
@@ -47,7 +47,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.services.ServiceDescriptor;
 import org.jsr166.ThreadLocalRandom8;
 
 import static org.apache.ignite.internal.GridClosureCallMode.BALANCE;
@@ -210,7 +209,7 @@ class GridServiceProxy<T> implements Serializable {
          * @param name Service name.
          * @return Node with deployed service or {@code null} if there is no such node.
          */
-        private ClusterNode nodeForService(String name, boolean sticky) {
+        private ClusterNode nodeForService(String name, boolean sticky) throws IgniteCheckedException
{
             do { // Repeat if reference to remote node was changed.
                 if (sticky) {
                     ClusterNode curNode = rmtNode.get();
@@ -237,11 +236,11 @@ class GridServiceProxy<T> implements Serializable {
          * @return Local node if it has a given service deployed or randomly chosen remote
node,
          * otherwise ({@code null} if given service is not deployed on any node.
          */
-        private ClusterNode randomNodeForService(String name) {
+        private ClusterNode randomNodeForService(String name) throws IgniteCheckedException
{
             if (hasLocNode && ctx.service().service(name) != null)
                 return ctx.discovery().localNode();
 
-            Map<UUID, Integer> snapshot = serviceTopology(name);
+            Map<UUID, Integer> snapshot = ctx.service().serviceTopology(name);
 
             if (snapshot == null || snapshot.isEmpty())
                 return null;
@@ -307,19 +306,6 @@ class GridServiceProxy<T> implements Serializable {
 
             return null;
         }
-
-        /**
-         * @param name Service name.
-         * @return Map of number of service instances per node ID.
-         */
-        private Map<UUID, Integer> serviceTopology(String name) {
-            for (ServiceDescriptor desc : ctx.service().serviceDescriptors()) {
-                if (desc.name().equals(name))
-                    return desc.topologySnapshot();
-            }
-
-            return null;
-        }
     }
 
     /**
@@ -403,4 +389,4 @@ class GridServiceProxy<T> implements Serializable {
             return S.toString(ServiceProxyCallable.class, this);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceSerializationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceSerializationSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceSerializationSelfTest.java
new file mode 100644
index 0000000..f709dfe
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceSerializationSelfTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.thread.IgniteThread;
+
+/**
+ * Service serialization test.
+ */
+public class GridServiceSerializationSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testServiceSerialization() throws Exception {
+        try {
+            Ignite server = startGridsMultiThreaded(3);
+
+            Ignition.setClientMode(true);
+
+            Ignite client = startGrid("client");
+
+            server.services(server.cluster().forServers())
+                .deployClusterSingleton("my-service", new MyServiceImpl());
+
+            MyService svc = client.services().serviceProxy("my-service", MyService.class,
false);
+
+            svc.hello();
+
+            assert MyServiceImpl.latch.await(2000, TimeUnit.MILLISECONDS);
+
+            assertEquals(0, MyServiceImpl.cnt.get());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     */
+    private static interface MyService extends Service {
+        /** */
+        void hello();
+    }
+
+    /**
+     */
+    private static class MyServiceImpl implements MyService, Externalizable {
+        /** */
+        static final AtomicInteger cnt = new AtomicInteger();
+
+        /** */
+        static final CountDownLatch latch = new CountDownLatch(1);
+
+        /**
+         */
+        public MyServiceImpl() throws ClassNotFoundException {
+            if (clientThread())
+                throw new ClassNotFoundException("Expected ClassNotFoundException");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel(ServiceContext ctx) {
+            if (clientThread())
+                cnt.incrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void init(ServiceContext ctx) throws Exception {
+            if (clientThread())
+                cnt.incrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute(ServiceContext ctx) throws Exception {
+            if (clientThread())
+                cnt.incrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void hello() {
+            if (clientThread())
+                cnt.incrementAndGet();
+
+            latch.countDown();
+        }
+
+        /**
+         * @return If current thread belongs to client.
+         */
+        private boolean clientThread() {
+            assert Thread.currentThread() instanceof IgniteThread;
+
+            return ((IgniteThread)Thread.currentThread()).getGridName().contains("client");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index d9e9b0f..214d375 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.service.GridServiceProcessorProxySe
 import org.apache.ignite.internal.processors.service.GridServiceProcessorSingleNodeSelfTest;
 import org.apache.ignite.internal.processors.service.GridServiceProcessorStopSelfTest;
 import org.apache.ignite.internal.processors.service.GridServiceReassignmentSelfTest;
+import org.apache.ignite.internal.processors.service.GridServiceSerializationSelfTest;
 import org.apache.ignite.internal.processors.service.ServicePredicateAccessCacheTest;
 import org.apache.ignite.internal.util.GridStartupWithSpecifiedWorkDirectorySelfTest;
 import org.apache.ignite.internal.util.GridStartupWithUndefinedIgniteHomeSelfTest;
@@ -123,6 +124,7 @@ public class IgniteKernalSelfTestSuite extends TestSuite {
         suite.addTestSuite(GridServiceProcessorStopSelfTest.class);
         suite.addTestSuite(ServicePredicateAccessCacheTest.class);
         suite.addTestSuite(GridServicePackagePrivateSelfTest.class);
+        suite.addTestSuite(GridServiceSerializationSelfTest.class);
 
         return suite;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
index 81ff8ea..f4cf892 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
@@ -255,7 +255,8 @@ public class HadoopJobTracker extends HadoopComponent {
             },
             null,
             true,
-            true
+            true,
+            false
         );
 
         ctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() {
@@ -1690,4 +1691,4 @@ public class HadoopJobTracker extends HadoopComponent {
          */
         protected abstract void update(HadoopJobMetadata meta, HadoopJobMetadata cp);
     }
-}
\ No newline at end of file
+}


Mime
View raw message