ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [12/45] incubator-ignite git commit: ignite-688 Age related cluster group doesn't refresh dynamically
Date Fri, 10 Apr 2015 15:27:02 GMT
ignite-688 Age related cluster group doesn't refresh dynamically


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/162fa23e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/162fa23e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/162fa23e

Branch: refs/heads/ignite-676
Commit: 162fa23ea0a2e9ea0c4a6e1be147e9108fed5cec
Parents: 52644c5
Author: agura <agura@gridgain.com>
Authored: Tue Apr 7 02:03:50 2015 +0300
Committer: Andrey Gura <agura@gridgain.com>
Committed: Fri Apr 10 13:07:19 2015 +0300

----------------------------------------------------------------------
 .../internal/cluster/ClusterGroupAdapter.java   | 150 ++++++++++++++++---
 .../ignite/internal/GridProjectionSelfTest.java |  82 ++++++++++
 2 files changed, 215 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/162fa23e/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
index 0daffcc..e52bed4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
@@ -60,7 +60,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable
{
     private String gridName;
 
     /** Subject ID. */
-    private UUID subjId;
+    protected UUID subjId;
 
     /** Cluster group predicate. */
     protected IgnitePredicate<ClusterNode> p;
@@ -320,12 +320,12 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable
{
     }
 
     /** {@inheritDoc} */
-    @Override public final IgnitePredicate<ClusterNode> predicate() {
+    @Override public IgnitePredicate<ClusterNode> predicate() {
         return p != null ? p : F.<ClusterNode>alwaysTrue();
     }
 
     /** {@inheritDoc} */
-    @Override public final ClusterGroup forPredicate(IgnitePredicate<ClusterNode> p)
{
+    @Override public ClusterGroup forPredicate(IgnitePredicate<ClusterNode> p) {
         A.notNull(p, "p");
 
         guard();
@@ -657,7 +657,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable
{
             IgniteKernal g = IgnitionEx.gridx(gridName);
 
             return ids != null ? new ClusterGroupAdapter(g.context(), subjId, ids) :
-                p != null ? new ClusterGroupAdapter(g.context(), subjId, p) : g;
+                new ClusterGroupAdapter(g.context(), subjId, p);
         }
         catch (IllegalStateException e) {
             throw U.withCause(new InvalidObjectException(e.getMessage()), e);
@@ -788,11 +788,8 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable
{
         /** Oldest flag. */
         private boolean isOldest;
 
-        /** Selected node. */
-        private volatile ClusterNode node;
-
-        /** Last topology version. */
-        private volatile long lastTopVer;
+        /** State. */
+        private volatile AgeClusterGroupState state;
 
         /**
          * Required for {@link Externalizable}.
@@ -806,7 +803,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable
{
          * @param isOldest Oldest flag.
          */
         private AgeClusterGroup(ClusterGroupAdapter parent, boolean isOldest) {
-            super(parent.ctx, parent.subjId, (IgnitePredicate<ClusterNode>) null);
+            super(parent.ctx, parent.subjId, parent.p, parent.ids);
 
             this.isOldest = isOldest;
 
@@ -820,10 +817,13 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable
{
             guard();
 
             try {
-                lastTopVer = ctx.discovery().topologyVersion();
+                long lastTopVer = ctx.discovery().topologyVersion();
+
+                ClusterNode node = isOldest ? U.oldest(super.nodes(), null) : U.youngest(super.nodes(),
null);
+
+                IgnitePredicate<ClusterNode> p = F.nodeForNodes(node);
 
-                this.node = isOldest ? U.oldest(super.nodes(), null) : U.youngest(super.nodes(),
null);
-                this.p = F.nodeForNodes(node);
+                state = new AgeClusterGroupState(node, p, lastTopVer);
             }
             finally {
                 unguard();
@@ -832,20 +832,136 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable
{
 
         /** {@inheritDoc} */
         @Override public ClusterNode node() {
-            if (ctx.discovery().topologyVersion() != lastTopVer)
+            if (ctx.discovery().topologyVersion() != state.lastTopVer)
                 reset();
 
-            return node;
+            return state.node;
         }
 
         /** {@inheritDoc} */
         @Override public Collection<ClusterNode> nodes() {
-            if (ctx.discovery().topologyVersion() != lastTopVer)
+            if (ctx.discovery().topologyVersion() != state.lastTopVer)
                 reset();
 
-            ClusterNode node = this.node;
+            ClusterNode node = state.node;
 
             return node == null ? Collections.<ClusterNode>emptyList() : Collections.singletonList(node);
         }
+
+        /** {@inheritDoc} */
+        @Override public IgnitePredicate<ClusterNode> predicate() {
+            if (ctx.discovery().topologyVersion() != state.lastTopVer)
+                reset();
+
+            return state.p;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClusterGroup forPredicate(IgnitePredicate<ClusterNode> p)
{
+            A.notNull(p, "p");
+
+            guard();
+
+            try {
+                if (p != null)
+                    ctx.resource().injectGeneric(p);
+
+                return new ClusterGroupAdapter(ctx, this.subjId, new GroupPredicate(this,
p));
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+            finally {
+                unguard();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            super.writeExternal(out);
+
+            out.writeBoolean(isOldest);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
+            super.readExternal(in);
+
+            isOldest = in.readBoolean();
+        }
+
+        /**
+         * Reconstructs object on unmarshalling.
+         *
+         * @return Reconstructed object.
+         * @throws ObjectStreamException Thrown in case of unmarshalling error.
+         */
+        protected Object readResolve() throws ObjectStreamException {
+            ClusterGroupAdapter parent = (ClusterGroupAdapter)super.readResolve();
+
+            return new AgeClusterGroup(parent, isOldest);
+        }
+    }
+
+    /**
+     * Container for age-based cluster group state.
+     */
+    private static class AgeClusterGroupState {
+        /** Selected node. */
+        private final ClusterNode node;
+
+        /** Node predicate. */
+        private final IgnitePredicate<ClusterNode> p;
+
+        /** Last topology version. */
+        private final long lastTopVer;
+
+        /**
+         * @param node Node.
+         * @param p Predicate.
+         * @param lastTopVer Last topology version.
+         */
+        public AgeClusterGroupState(ClusterNode node, IgnitePredicate<ClusterNode>
p, long lastTopVer) {
+            this.node = node;
+            this.p = p;
+            this.lastTopVer = lastTopVer;
+        }
+    }
+
+    /**
+     * Dynamic cluster group based predicate.
+     */
+    private static class GroupPredicate implements IgnitePredicate<ClusterNode> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Target cluster group. */
+        private final ClusterGroup grp;
+
+        /** Predicate. */
+        private final IgnitePredicate<ClusterNode> p;
+
+        /**
+         * @param grp Cluster group.
+         * @param p Predicate.
+         */
+        public GroupPredicate(ClusterGroup grp, IgnitePredicate<ClusterNode> p) {
+            this.grp = grp;
+            this.p = p;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode node) {
+            A.notNull(node, "node is null");
+
+            return grp.predicate().apply(node) && p.apply(node);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return getClass().getName() +
+                " [grp='" + grp.getClass().getName() +
+                "', p='" + p.getClass().getName() + "']";
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/162fa23e/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionSelfTest.java
index 1fc5535..9fbad80 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionSelfTest.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.*;
 import org.apache.ignite.testframework.junits.common.*;
 
 import java.util.*;
@@ -153,6 +155,74 @@ public class GridProjectionSelfTest extends GridProjectionAbstractTest
{
     /**
      * @throws Exception If failed.
      */
+    public void testForPredicate() throws Exception {
+        IgnitePredicate<ClusterNode> evenP = new IgnitePredicate<ClusterNode>()
{
+            @Override public boolean apply(ClusterNode node) {
+                return node.order() % 2 == 0;
+            }
+        };
+
+        IgnitePredicate<ClusterNode> oddP = new IgnitePredicate<ClusterNode>()
{
+            @Override public boolean apply(ClusterNode node) {
+                return node.order() % 2 == 1;
+            }
+        };
+
+        ClusterGroup remotes = ignite.cluster().forRemotes();
+
+        ClusterGroup evenYoungest = remotes.forPredicate(evenP).forYoungest();
+        ClusterGroup evenOldest = remotes.forPredicate(evenP).forOldest();
+
+        ClusterGroup oddYoungest = remotes.forPredicate(oddP).forYoungest();
+        ClusterGroup oddOldest = remotes.forPredicate(oddP).forOldest();
+
+        int clusterSize = ignite.cluster().nodes().size();
+
+        assertEquals(grid(gridMaxOrder(clusterSize, true)).localNode().id(), evenYoungest.node().id());
+        assertEquals(grid(1).localNode().id(), evenOldest.node().id());
+
+        assertEquals(grid(gridMaxOrder(clusterSize, false)).localNode().id(), oddYoungest.node().id());
+        assertEquals(grid(2).localNode().id(), oddOldest.node().id());
+
+        try (Ignite g4 = startGrid(NODES_CNT);
+            Ignite g5 = startGrid(NODES_CNT + 1))
+        {
+            clusterSize = g4.cluster().nodes().size();
+
+            assertEquals(grid(gridMaxOrder(clusterSize, true)).localNode().id(), evenYoungest.node().id());
+            assertEquals(grid(1).localNode().id(), evenOldest.node().id());
+
+            assertEquals(grid(gridMaxOrder(clusterSize, false)).localNode().id(), oddYoungest.node().id());
+            assertEquals(grid(2).localNode().id(), oddOldest.node().id());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAgeClusterGroupSerialization() throws Exception {
+        Marshaller marshaller = getConfiguration().getMarshaller();
+
+        ClusterGroup grp = ignite.cluster().forYoungest();
+        ClusterNode node = grp.node();
+
+        byte[] arr = marshaller.marshal(grp);
+
+        ClusterGroup obj = marshaller.unmarshal(arr, null);
+
+        assertEquals(node.id(), obj.node().id());
+
+        try (Ignite ignore = startGrid()) {
+            obj = marshaller.unmarshal(arr, null);
+
+            assertEquals(grp.node().id(), obj.node().id());
+            assertFalse(node.id().equals(obj.node().id()));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testClientServer() throws Exception {
         ClusterGroup srv = ignite.cluster().forServers();
 
@@ -166,4 +236,16 @@ public class GridProjectionSelfTest extends GridProjectionAbstractTest
{
         assertTrue(cli.nodes().contains(ignite(2).cluster().localNode()));
         assertTrue(cli.nodes().contains(ignite(3).cluster().localNode()));
     }
+
+    /**
+     * @param cnt Count.
+     * @param even Even.
+     */
+    private static int gridMaxOrder(int cnt, boolean even) {
+        assert cnt > 2;
+
+        cnt = cnt - (cnt % 2);
+
+        return even ? cnt - 1 : cnt - 2;
+    }
 }


Mime
View raw message