ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sevdoki...@apache.org
Subject incubator-ignite git commit: # IGNITE-312 Implement DiscoveryCustomEvent
Date Tue, 24 Feb 2015 11:05:19 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-312 b9466f094 -> 5a8206436


# IGNITE-312 Implement DiscoveryCustomEvent


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5a820643
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5a820643
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5a820643

Branch: refs/heads/ignite-312
Commit: 5a82064366425228219933a82794a90dbd87b941
Parents: b9466f0
Author: sevdokimov <sevdokimov@gridgain.com>
Authored: Tue Feb 24 14:05:09 2015 +0300
Committer: sevdokimov <sevdokimov@gridgain.com>
Committed: Tue Feb 24 14:05:09 2015 +0300

----------------------------------------------------------------------
 .../ignite/events/DiscoveryCustomEvent.java     | 47 ++++++++++++++++++
 .../discovery/GridDiscoveryManager.java         | 50 ++++++++------------
 .../spi/discovery/DiscoverySpiListener.java     |  3 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 11 ++---
 .../tcp/messages/TcpDiscoveryCustomMessage.java |  2 +-
 .../internal/GridDiscoveryEventSelfTest.java    | 43 +++++++++++++++++
 .../discovery/AbstractDiscoverySelfTest.java    |  6 +--
 7 files changed, 120 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5a820643/modules/core/src/main/java/org/apache/ignite/events/DiscoveryCustomEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/DiscoveryCustomEvent.java
b/modules/core/src/main/java/org/apache/ignite/events/DiscoveryCustomEvent.java
new file mode 100644
index 0000000..d804ae8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/events/DiscoveryCustomEvent.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.events;
+
+/**
+ *
+ */
+public class DiscoveryCustomEvent extends EventAdapter {
+    /** */
+    private Object data;
+
+    /**
+     * Default constructor.
+     */
+    public DiscoveryCustomEvent() {
+        type(EventType.EVT_DISCOVERY_CUSTOM_EVT);
+    }
+
+    /**
+     * @return Data.
+     */
+    public Object data() {
+        return data;
+    }
+
+    /**
+     * @param data New data.
+     */
+    public void data(Object data) {
+        this.data = data;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5a820643/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index d3734a4..81a5e07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -162,9 +162,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
     /** Metrics update worker. */
     private final MetricsUpdater metricsUpdater = new MetricsUpdater();
 
-    /** */
-    private final CopyOnWriteArrayList<IgniteClosure<Object, Void>> customMsgListeners
= new CopyOnWriteArrayList<>();
-
     /** @param ctx Context. */
     public GridDiscoveryManager(GridKernalContext ctx) {
         super(ctx, ctx.config().getDiscoverySpi());
@@ -308,11 +305,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
                         ", evt=" + U.gridEventName(type) + ']';
                 }
 
-                discoWrk.addEvent(type, topVer, node, topSnapshot);
+                discoWrk.addEvent(type, topVer, node, topSnapshot, null);
             }
 
-            @Override public void onCustomMessage(Object msg, long topVer) {
-                discoWrk.addCustomEvent(msg, topVer);
+            @Override public void onCustomMessage(@NotNull Object data, @NotNull ClusterNode
node, long topVer) {
+                discoWrk.addEvent(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, null, data);
             }
         });
 
@@ -1191,19 +1188,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
         ).start();
     }
 
+    /**
+     * @param evt Event.
+     */
     public void sendCustomEvent(Object evt) {
         getSpi().sendCustomEvent(evt);
     }
 
-    public void addCustomEvantListener(IgniteClosure<Object, Void> listener) {
-        customMsgListeners.add(listener);
-    }
-
-    public void removeCustomEvantListener(IgniteClosure<Object, Void> listener) {
-        customMsgListeners.remove(listener);
-    }
-
-
     /** Worker for network segment checks. */
     private class SegmentCheckWorker extends GridWorker {
         /** */
@@ -1260,7 +1251,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
 
                     if (!segValid) {
                         discoWrk.addEvent(EVT_NODE_SEGMENTED, 0, getSpi().getLocalNode(),
-                            Collections.<ClusterNode>emptyList());
+                            Collections.<ClusterNode>emptyList(), null);
 
                         lastSegChkRes.set(false);
                     }
@@ -1341,19 +1332,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
          * @param node Node.
          * @param topSnapshot Topology snapshot.
          */
-        void addEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode>
topSnapshot) {
+        void addEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode>
topSnapshot, @Nullable Object data) {
             assert node != null;
 
-            evts.add(F.t(type, topVer, node, topSnapshot, null));
-        }
-
-        /**
-         * @param evt Event.
-         * @param topVer Topology version.
-         */
-        void addCustomEvent(Object evt, long topVer) {
-            evts.add(new T5<Integer, Long, ClusterNode, Collection<ClusterNode>,
Object>(EVT_DISCOVERY_CUSTOM_EVT,
-                topVer, null, null, evt));
+            evts.add(F.t(type, topVer, node, topSnapshot, data));
         }
 
         /**
@@ -1395,7 +1377,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
 
             ClusterNode node = evt.get3();
 
-            boolean isDaemon = node == null || node.isDaemon();
+            boolean isDaemon = node.isDaemon();
 
             boolean segmented = false;
 
@@ -1497,10 +1479,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
                 }
 
                 case EVT_DISCOVERY_CUSTOM_EVT: {
-                    for (IgniteClosure<Object, Void> listener : customMsgListeners)
-                        listener.apply(evt.get5());
+                    DiscoveryCustomEvent customEvt = new DiscoveryCustomEvent();
 
-                    break;
+                    customEvt.node(evt.get3());
+                    customEvt.data(evt.get5());
+
+                    if (ctx.event().isRecordable(EVT_DISCOVERY_CUSTOM_EVT))
+                        ctx.event().record(customEvt);
+
+
+                    return;
                 }
 
                 // Don't log metric update to avoid flooding the log.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5a820643/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
index 89e615e..960094c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
@@ -45,8 +45,9 @@ public interface DiscoverySpiListener {
      * Notification for node custom events.
      *
      * @param msg The custom message.
+     * @param node The mode that created message.
      * @param topVer Topology version or {@code 0} if configured discovery SPI implementation
      *      does not support versioning.
      */
-    public void onCustomMessage(Object msg, long topVer);
+    public void onCustomMessage(Object msg, ClusterNode node, long topVer);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5a820643/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index fbdab51..baf62d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -4391,14 +4391,13 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
 
                 msg.senderNodeId(getLocalNodeId());
             }
-            else {
-                DiscoverySpiListener lsnr = TcpDiscoverySpi.this.lsnr;
 
-                TcpDiscoverySpiState spiState = spiStateCopy();
+            DiscoverySpiListener lsnr = TcpDiscoverySpi.this.lsnr;
 
-                if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING))
-                    lsnr.onCustomMessage(msg.message(), msg.topologyVersion());
-            }
+            TcpDiscoverySpiState spiState = spiStateCopy();
+
+            if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING))
+                lsnr.onCustomMessage(msg.message(), ring.node(msg.creatorNodeId()), msg.topologyVersion());
 
             if (ring.hasRemoteNodes())
                 sendMessageAcrossRing(msg);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5a820643/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomMessage.java
index 36887c9..2f0e1c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomMessage.java
@@ -28,7 +28,7 @@ public class TcpDiscoveryCustomMessage extends TcpDiscoveryAbstractMessage
{
     private Object msg;
 
     /**
-     * Public default no-arg constructor for {@link java.io.Externalizable} interface.
+     * Public default no-arg constructor for {@link Externalizable} interface.
      */
     public TcpDiscoveryCustomMessage() {
         // No-op.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5a820643/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoveryEventSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoveryEventSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoveryEventSelfTest.java
index ce8b3e9..97963b2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoveryEventSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoveryEventSelfTest.java
@@ -417,4 +417,47 @@ public class GridDiscoveryEventSelfTest extends GridCommonAbstractTest
{
             stopAllGrids();
         }
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCustomEvents() throws Exception {
+        try {
+            Ignite g0 = startGrid(0);
+            final Ignite g1 = startGrid(1);
+            Ignite g2 = startGrid(2);
+
+            final CountDownLatch cnt = new CountDownLatch(3);
+
+            IgnitePredicate<DiscoveryCustomEvent> lsnr = new IgnitePredicate<DiscoveryCustomEvent>()
{
+                @Override public boolean apply(DiscoveryCustomEvent evt) {
+                    assert cnt.getCount() > 0;
+
+                    cnt.countDown();
+
+                    return true;
+                }
+            };
+
+            g0.events().localListen(lsnr, EVT_DISCOVERY_CUSTOM_EVT);
+            g1.events().localListen(lsnr, EVT_DISCOVERY_CUSTOM_EVT);
+            g2.events().localListen(lsnr, EVT_DISCOVERY_CUSTOM_EVT);
+
+            ((IgniteKernal)g1).context().discovery().sendCustomEvent("a");
+
+            cnt.await();
+
+            g0.events().localQuery(new IgnitePredicate<DiscoveryCustomEvent>() {
+                @Override public boolean apply(DiscoveryCustomEvent evt) {
+                    assert "a".equals(evt.data());
+                    assert ((IgniteEx)g1).localNode().id().equals(evt.node().id());
+
+                    return true;
+                }
+            }, EVT_DISCOVERY_CUSTOM_EVT);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5a820643/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
index 7490623..64de9d4 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
@@ -138,7 +138,7 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi>
extends Gri
         }
 
         /** {@inheritDoc} */
-        @Override public void onCustomMessage(Object msg, long topVer) {
+        @Override public void onCustomMessage(Object msg, ClusterNode node, long topVer)
{
             // No-op.
         }
     }
@@ -216,7 +216,7 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi>
extends Gri
                         spiCnt.addAndGet(1);
                 }
 
-                @Override public void onCustomMessage(Object msg, long topVer) {
+                @Override public void onCustomMessage(Object msg, ClusterNode node, long
topVer) {
                     // No-op.
                 }
             };
@@ -385,7 +385,7 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi>
extends Gri
                         }
                     }
 
-                    @Override public void onCustomMessage(Object msg, long topVer) {
+                    @Override public void onCustomMessage(Object msg, ClusterNode node, long
topVer) {
                         // No-op.
                     }
                 });


Mime
View raw message