ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject ignite git commit: IGNITE-6562: Dynamic service deployment should use projection if NodeFilter is not set. This closes #2810.
Date Mon, 16 Oct 2017 11:36:30 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 0b304042d -> 6679b6cbe


IGNITE-6562: Dynamic service deployment should use projection if NodeFilter is not set. This
closes #2810.

Signed-off-by: nikolay_tikhonov <ntikhonov@gridgain.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6679b6cb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6679b6cb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6679b6cb

Branch: refs/heads/master
Commit: 6679b6cbe6a26f8e9ba2a02bcf56801811e99abd
Parents: 0b30404
Author: Andrey V. Mashenkov <andrey.mashenkov@gmail.com>
Authored: Mon Oct 16 14:35:21 2017 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Mon Oct 16 14:35:21 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/IgniteServicesImpl.java     |  4 +-
 .../service/GridServiceProcessor.java           | 59 +++++++++-------
 .../GridServiceProcessorMultiNodeSelfTest.java  | 71 +++++++++++++++++++-
 3 files changed, 105 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6679b6cb/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
index 00d6078..7cbd4b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
@@ -235,7 +235,7 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements
IgniteSer
         guard();
 
         try {
-            saveOrGet(ctx.service().deployAll(cfgs));
+            saveOrGet(ctx.service().deployAll(prj, cfgs));
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -252,7 +252,7 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements
IgniteSer
         guard();
 
         try {
-            return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deployAll(cfgs));
+            return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deployAll(prj,
cfgs));
         }
         finally {
             unguard();

http://git-wip-us.apache.org/repos/asf/ignite/blob/6679b6cb/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 6f1dfc7..7097735 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -89,6 +89,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.security.SecurityException;
@@ -263,15 +264,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements
Ignite
 
         ServiceConfiguration[] cfgs = ctx.config().getServiceConfiguration();
 
-        if (cfgs != null) {
-            for (ServiceConfiguration c : cfgs) {
-                // Deploy only on server nodes by default.
-                if (c.getNodeFilter() == null)
-                    c.setNodeFilter(ctx.cluster().get().forServers().predicate());
-            }
-
-            deployAll(Arrays.asList(cfgs)).get();
-        }
+        if (cfgs != null)
+            deployAll(Arrays.asList(cfgs), ctx.cluster().get().forServers().predicate()).get();
 
         if (log.isDebugEnabled())
             log.debug("Started service processor.");
@@ -474,9 +468,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements
Ignite
         cfg.setService(svc);
         cfg.setTotalCount(totalCnt);
         cfg.setMaxPerNodeCount(maxPerNodeCnt);
-        cfg.setNodeFilter(F.<ClusterNode>alwaysTrue() == prj.predicate() ? null : prj.predicate());
 
-        return deploy(cfg);
+        return deployAll(prj, Collections.singleton(cfg));
     }
 
     /**
@@ -499,14 +492,17 @@ public class GridServiceProcessor extends GridProcessorAdapter implements
Ignite
         cfg.setTotalCount(1);
         cfg.setMaxPerNodeCount(1);
 
-        return deploy(cfg);
+        // Ignore projection here.
+        return deployAll(Collections.singleton(cfg), null);
     }
 
     /**
      * @param cfgs Service configurations.
+     * @param dfltNodeFilter Default NodeFilter.
      * @return Configurations to deploy.
      */
-    private PreparedConfigurations prepareServiceConfigurations(Collection<ServiceConfiguration>
cfgs) {
+    private PreparedConfigurations prepareServiceConfigurations(Collection<ServiceConfiguration>
cfgs,
+        IgnitePredicate<ClusterNode> dfltNodeFilter) {
         List<ServiceConfiguration> cfgsCp = new ArrayList<>(cfgs.size());
 
         Marshaller marsh = ctx.config().getMarshaller();
@@ -516,6 +512,11 @@ public class GridServiceProcessor extends GridProcessorAdapter implements
Ignite
         for (ServiceConfiguration cfg : cfgs) {
             Exception err = null;
 
+            // Deploy to projection node by default
+            // or only on server nodes if no projection .
+            if (cfg.getNodeFilter() == null && dfltNodeFilter != null)
+                cfg.setNodeFilter(dfltNodeFilter);
+
             try {
                 validate(cfg);
             }
@@ -568,13 +569,31 @@ public class GridServiceProcessor extends GridProcessorAdapter implements
Ignite
     }
 
     /**
+     * @param prj Grid projection.
      * @param cfgs Service configurations.
      * @return Future for deployment.
      */
-    public IgniteInternalFuture<?> deployAll(Collection<ServiceConfiguration>
cfgs) {
+    public IgniteInternalFuture<?> deployAll(ClusterGroup prj, Collection<ServiceConfiguration>
cfgs) {
+        if (prj == null)
+            // Deploy to servers by default if no projection specified.
+            return deployAll(cfgs,  ctx.cluster().get().forServers().predicate());
+        else if (prj.predicate() == F.<ClusterNode>alwaysTrue())
+            return deployAll(cfgs,  null);
+        else
+            // Deploy to predicate nodes by default.
+            return deployAll(cfgs,  prj.predicate());
+    }
+
+    /**
+     * @param cfgs Service configurations.
+     * @param dfltNodeFilter Default NodeFilter.
+     * @return Future for deployment.
+     */
+    private IgniteInternalFuture<?> deployAll(Collection<ServiceConfiguration>
cfgs,
+        @Nullable IgnitePredicate<ClusterNode> dfltNodeFilter) {
         assert cfgs != null;
 
-        PreparedConfigurations srvCfg = prepareServiceConfigurations(cfgs);
+        PreparedConfigurations srvCfg = prepareServiceConfigurations(cfgs, dfltNodeFilter);
 
         List<ServiceConfiguration> cfgsCp = srvCfg.cfgs;
 
@@ -733,16 +752,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements
Ignite
     }
 
     /**
-     * @param cfg Service configuration.
-     * @return Future for deployment.
-     */
-    public IgniteInternalFuture<?> deploy(ServiceConfiguration cfg) {
-        A.notNull(cfg, "cfg");
-
-        return deployAll(Collections.singleton(cfg));
-    }
-
-    /**
      * @param name Service name.
      * @return Future.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6679b6cb/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java
index df7ddf1..517f061 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java
@@ -169,7 +169,7 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA
                 // Since we start extra nodes, there may be extra start and cancel events,
                 // so we check only the difference between start and cancel and
                 // not start and cancel events individually.
-                assertEquals(name, nodeCount() + servers,  DummyService.started(name) - DummyService.cancelled(name));
+                assertEquals(name, nodeCount() + servers, DummyService.started(name) - DummyService.cancelled(name));
 
                 checkCount(name, g.services().serviceDescriptors(), nodeCount() + servers);
             }
@@ -185,6 +185,73 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA
     /**
      * @throws Exception If failed.
      */
+    public void testDeployOnEachProjectionNodeUpdateTopology() throws Exception {
+        // Prestart client node.
+        Ignite client = startGrid("client", getConfiguration("client").setClientMode(true));
+
+        try {
+            final String name = "serviceOnEachProjectionNodeUpdateTopology";
+
+            Ignite g = randomGrid();
+
+            int prestartedSrvcs = 1;
+
+            CountDownLatch latch = new CountDownLatch(prestartedSrvcs);
+
+            DummyService.exeLatch(name, latch);
+
+            IgniteServices svcs = g.services(g.cluster().forClients());
+
+            IgniteFuture<?> fut = svcs.deployNodeSingletonAsync(name, new DummyService());
+
+            info("Deployed service: " + name);
+
+            fut.get();
+
+            info("Finished waiting for service future: " + name);
+
+            latch.await();
+
+            // Ensure service is deployed
+            assertNotNull(client.services().serviceProxy(name, Service.class, false, 2000));
+
+            assertEquals(name, prestartedSrvcs, DummyService.started(name));
+            assertEquals(name, 0, DummyService.cancelled(name));
+
+            int servers = 2;
+
+            int clients = 2;
+
+            latch = new CountDownLatch(clients);
+
+            DummyService.exeLatch(name, latch);
+
+            startExtraNodes(servers, clients);
+
+            try {
+                latch.await();
+
+                waitForDeployment(name, clients);
+
+                // Since we start extra nodes, there may be extra start and cancel events,
+                // so we check only the difference between start and cancel and
+                // not start and cancel events individually.
+                assertEquals(name, clients + prestartedSrvcs, DummyService.started(name)
- DummyService.cancelled(name));
+
+                checkCount(name, g.services().serviceDescriptors(), clients + prestartedSrvcs);
+            }
+            finally {
+                stopExtraNodes(servers + clients);
+            }
+        }
+        finally {
+            stopGrid("client");
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testDeployOnEachNodeUpdateTopology() throws Exception {
         // Prestart client node.
         Ignite client = startGrid("client", getConfiguration("client").setClientMode(true));
@@ -315,7 +382,7 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA
             // Since we start extra nodes, there may be extra start and cancel events,
             // so we check only the difference between start and cancel and
             // not start and cancel events individually.
-            assertEquals(name, totalInstances,  DummyService.started(name) - DummyService.cancelled(name));
+            assertEquals(name, totalInstances, DummyService.started(name) - DummyService.cancelled(name));
 
             checkCount(name, g.services().serviceDescriptors(), totalInstances);
         }


Mime
View raw message