ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [14/15] ignite git commit: zk
Date Wed, 22 Nov 2017 10:23:30 GMT
zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e0aba812
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e0aba812
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e0aba812

Branch: refs/heads/ignite-zk
Commit: e0aba812643c0d773359a95b514daead9730ee6e
Parents: 4090eb7
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Nov 22 11:47:55 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Nov 22 13:21:21 2017 +0300

----------------------------------------------------------------------
 .../zk/internal/ZkDiscoveryCustomEventData.java |  12 +-
 .../zk/internal/ZkDiscoveryEventData.java       |   7 +
 .../discovery/zk/internal/ZkIgnitePaths.java    |  11 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 226 ++++++++++++-------
 .../zk/ZookeeperDiscoverySpiBasicTest.java      |   4 +-
 5 files changed, 177 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e0aba812/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
index 1346c24..5668428 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
@@ -27,6 +27,9 @@ import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
  */
 class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData {
     /** */
+    private static final int CUSTOM_MSG_ACK_FLAG = 1;
+
+    /** */
     final UUID sndNodeId;
 
     /** */
@@ -41,7 +44,7 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData {
      * @param sndNodeId Sender node ID.
      * @param evtPath Event path.
      */
-    ZkDiscoveryCustomEventData(long evtId, long topVer, UUID sndNodeId, String evtPath) {
+    ZkDiscoveryCustomEventData(long evtId, long topVer, UUID sndNodeId, String evtPath, boolean
ack) {
         super(evtId, DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, topVer);
 
         assert sndNodeId != null;
@@ -49,6 +52,13 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData {
 
         this.sndNodeId = sndNodeId;
         this.evtPath = evtPath;
+
+        if (ack)
+            flags |= CUSTOM_MSG_ACK_FLAG;
+    }
+
+    boolean ackEvent() {
+        return flagSet(CUSTOM_MSG_ACK_FLAG);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0aba812/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
index 9f18f4f..00330e4 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
@@ -45,6 +45,9 @@ abstract class ZkDiscoveryEventData implements Serializable {
     /** */
     private transient Set<Integer> remainingAcks;
 
+    /** */
+    int flags;
+
     /**
      * @param evtType Event type.
      * @param topVer Topology version.
@@ -89,6 +92,10 @@ abstract class ZkDiscoveryEventData implements Serializable {
         return remainingAcks.isEmpty();
     }
 
+    boolean flagSet(int flag) {
+        return (flags & flag) == flag;
+    }
+
     long eventId() {
         return evtId;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0aba812/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index ad35c05..591f18d 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -33,6 +33,9 @@ class ZkIgnitePaths {
     private static final String CUSTOM_EVTS_DIR = "customEvts";
 
     /** */
+    private static final String CUSTOM_EVTS_ACKS_DIR = "customEvtsAcks";
+
+    /** */
     private static final String ALIVE_NODES_DIR = "alive";
 
     /** */
@@ -59,6 +62,9 @@ class ZkIgnitePaths {
     /** */
     final String customEvtsDir;
 
+    /** */
+    final String customEvtsAcksDir;
+
     /**
      * @param basePath Base directory.
      * @param clusterName Cluster name.
@@ -73,6 +79,7 @@ class ZkIgnitePaths {
         joinDataDir = zkPath(JOIN_DATA_DIR);
         evtsPath = zkPath(DISCO_EVENTS_PATH);
         customEvtsDir = zkPath(CUSTOM_EVTS_DIR);
+        customEvtsAcksDir = zkPath(CUSTOM_EVTS_ACKS_DIR);
     }
 
     /**
@@ -122,7 +129,7 @@ class ZkIgnitePaths {
         return evtsPath + "/joined-" + evtId;
     }
 
-    String customEventDataPath(String child) {
-        return customEvtsDir + "/" + child;
+    String customEventDataPath(boolean ack, String child) {
+        return ack ? customEvtsAcksDir : customEvtsDir + "/" + child;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0aba812/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 8246e19..5e9c5a3 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -232,6 +232,7 @@ public class ZookeeperDiscoveryImpl {
         }
 
         try {
+            // TODO ZK: handle retries.
             zkClient.createIfNeeded(zkPaths.customEvtsDir + "/" + locNode.id() + '|', msgBytes,
CreateMode.PERSISTENT_SEQUENTIAL);
         }
         catch (ZookeeperClientFailedException e) {
@@ -347,6 +348,7 @@ public class ZookeeperDiscoveryImpl {
     private void startJoin(byte[] joinDataBytes) throws InterruptedException {
         try {
             // TODO ZK: handle max size.
+            // TODO ZK: handle retries.
             String path = zkClient.createIfNeeded(zkPaths.joinDataDir + "/" + locNode.id()
+ "|",
                 joinDataBytes,
                 EPHEMERAL_SEQUENTIAL);
@@ -494,7 +496,7 @@ public class ZookeeperDiscoveryImpl {
         byte[] evtsData = zkClient.getData(zkPaths.evtsPath);
 
         if (evtsData.length > 0)
-            onEventsUpdate(evtsData);
+            processNewEvents(evtsData);
 
         crd = true;
 
@@ -511,13 +513,9 @@ public class ZookeeperDiscoveryImpl {
                 ZkDiscoveryEventData evtData = it.next();
 
                 evtData.remainingAcks(top.nodesByOrder.values());
-
-                if (evtData.allAcksReceived()) {
-                    processNodesAckEvent(evtData);
-
-                    it.remove();
-                }
             }
+
+            handleProcessedEvents();
         }
         else {
             if (log.isInfoEnabled())
@@ -586,15 +584,17 @@ public class ZookeeperDiscoveryImpl {
 
                     Iterator<ZkDiscoveryEventData> it = evtsData.evts.values().iterator();
 
+                    boolean processed = false;
+
                     while (it.hasNext()) {
                         ZkDiscoveryEventData evtData = it.next();
 
-                        if (evtData.onAckReceived(nodeInternalId, nodeData.lastProcEvt))
{
-                            processNodesAckEvent(evtData);
-
-                            it.remove();
-                        }
+                        if (evtData.onAckReceived(nodeInternalId, nodeData.lastProcEvt))
+                            processed = true;
                     }
+
+                    if (processed)
+                        handleProcessedEvents();
                 }
             }
             catch (Throwable e) {
@@ -647,18 +647,24 @@ public class ZookeeperDiscoveryImpl {
             }
         }
 
-        if (newEvts) {
-            long start = System.currentTimeMillis();
+        if (newEvts)
+            saveAndProcessNewEvents();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void saveAndProcessNewEvents() throws Exception {
+        long start = System.currentTimeMillis();
 
-            zkClient.setData(zkPaths.evtsPath, marsh.marshal(evtsData), -1);
+        zkClient.setData(zkPaths.evtsPath, marsh.marshal(evtsData), -1);
 
-            long time = System.currentTimeMillis() - start;
+        long time = System.currentTimeMillis() - start;
 
-            if (log.isInfoEnabled())
-                log.info("Discovery coordinator saved new topology events [topVer=" + evtsData.topVer
+ ", saveTime=" + time + ']');
+        if (log.isInfoEnabled())
+            log.info("Discovery coordinator saved new topology events [topVer=" + evtsData.topVer
+ ", saveTime=" + time + ']');
 
-            onEventsUpdate(evtsData);
-        }
+        processNewEvents(evtsData);
     }
 
     /**
@@ -877,7 +883,8 @@ public class ZookeeperDiscoveryImpl {
                             evtsData.evtIdGen,
                             evtsData.topVer,
                             sndNodeId,
-                            evtE.getValue());
+                            evtE.getValue(),
+                            false);
 
                         evtData.msg = msg;
 
@@ -899,16 +906,7 @@ public class ZookeeperDiscoveryImpl {
                 evtsData.procCustEvt = evtE.getKey();
             }
 
-            long start = System.currentTimeMillis();
-
-            zkClient.setData(zkPaths.evtsPath, marsh.marshal(evtsData), -1);
-
-            long time = System.currentTimeMillis() - start;
-
-            if (log.isInfoEnabled())
-                log.info("Discovery coordinator saved new topology events [topVer=" + evtsData.topVer
+ ", saveTime=" + time + ']');
-
-            onEventsUpdate(evtsData);
+            saveAndProcessNewEvents();
         }
     }
 
@@ -916,7 +914,7 @@ public class ZookeeperDiscoveryImpl {
      * @param data Marshalled events.
      * @throws Exception If failed.
      */
-    private void onEventsUpdate(byte[] data) throws Exception {
+    private void processNewEvents(byte[] data) throws Exception {
         if (data.length == 0)
             return;
 
@@ -924,7 +922,7 @@ public class ZookeeperDiscoveryImpl {
 
         ZkDiscoveryEventsData evtsData = unmarshal(data);
 
-        onEventsUpdate(evtsData);
+        processNewEvents(evtsData);
 
         this.evtsData = evtsData;
     }
@@ -937,7 +935,7 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      */
     @SuppressWarnings("unchecked")
-    private void onEventsUpdate(ZkDiscoveryEventsData evtsData) throws Exception {
+    private void processNewEvents(ZkDiscoveryEventsData evtsData) throws Exception {
         TreeMap<Long, ZkDiscoveryEventData> evts = evtsData.evts;
 
         boolean updateNodeInfo = false;
@@ -1002,6 +1000,9 @@ public class ZookeeperDiscoveryImpl {
                     case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: {
                         ZkDiscoveryCustomEventData evtData0 = (ZkDiscoveryCustomEventData)evtData;
 
+                        if (evtData0.ackEvent() && evtData0.topologyVersion() <
locNode.order())
+                            break;
+
                         DiscoverySpiCustomMessage msg;
 
                         if (crd) {
@@ -1010,27 +1011,25 @@ public class ZookeeperDiscoveryImpl {
                             msg = evtData0.msg;
                         }
                         else {
-                            String path = zkPaths.customEventDataPath(evtData0.evtPath);
+                            String path = zkPaths.customEventDataPath(evtData0.ackEvent(),
+                                evtData0.evtPath);
 
                             msg = unmarshal(zkClient.getData(path));
+
+                            evtData0.msg = msg;
                         }
 
                         notifyCustomEvent(evtData0, msg);
 
+                        if (!evtData0.ackEvent())
+                            updateNodeInfo = true;
+
                         break;
                     }
 
                     default:
                         assert false : "Invalid event: " + evtData;
                 }
-
-                if (crd) {
-                    if (evtData.allAcksReceived()) {
-                        processNodesAckEvent(evtData);
-
-                        it.remove();
-                    }
-                }
             }
 
             if (joined) {
@@ -1043,13 +1042,93 @@ public class ZookeeperDiscoveryImpl {
             }
         }
 
-        if (!crd && updateNodeInfo) {
+        if (crd) {
+            handleProcessedEvents();
+        }
+        else if (updateNodeInfo) {
             assert locNodeZkPath != null;
 
             zkClient.setData(locNodeZkPath, marshal(locNodeInfo), -1);
         }
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    private void handleProcessedEvents() throws Exception {
+        Iterator<ZkDiscoveryEventData> it = this.evtsData.evts.values().iterator();
+
+        List<ZkDiscoveryCustomEventData> newEvts = null;
+
+        while (it.hasNext()) {
+            ZkDiscoveryEventData evtData = it.next();
+
+            if (evtData.allAcksReceived()) {
+                switch (evtData.eventType()) {
+                    case EventType.EVT_NODE_JOINED: {
+                        processNodesAckJoinEvent((ZkDiscoveryNodeJoinEventData)evtData);
+
+                        break;
+                    }
+
+                    case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: {
+                        DiscoverySpiCustomMessage ack = handleProcessedCustomEvent((ZkDiscoveryCustomEventData)evtData);
+
+                        if (ack != null) {
+                            evtsData.evtIdGen++;
+
+                            long evtId = evtsData.evtIdGen;
+
+                            byte[] ackBytes = marshal(ack);
+
+                            String evtChildPath = String.valueOf(evtId);
+
+                            zkClient.createIfNeeded(
+                                zkPaths.customEventDataPath(true, evtChildPath),
+                                ackBytes,
+                                CreateMode.PERSISTENT);
+
+                            ZkDiscoveryCustomEventData ackEvtData = new ZkDiscoveryCustomEventData(
+                                evtId,
+                                evtData.topologyVersion(), // Use topology version from original
event.
+                                locNode.id(),
+                                evtChildPath,
+                                true);
+
+                            ackEvtData.msg = ack;
+
+                            if (newEvts == null)
+                                newEvts = new ArrayList<>();
+
+                            newEvts.add(ackEvtData);
+
+                            if (log.isInfoEnabled())
+                                log.info("Generated CUSTOM event [topVer=" + evtData.topologyVersion()
+ ", evt=" + ack + ']');
+                        }
+
+                        break;
+                    }
+
+                    case EventType.EVT_NODE_FAILED: {
+                        log.info("All nodes processed node fail [evtId=" + evtData.eventId()
+ ']');
+
+                        // Do not need cleanup.
+                        break;
+                    }
+                }
+
+                it.remove();
+            }
+        }
+
+        if (newEvts != null) {
+            for (int i = 0; i < newEvts.size(); i++)
+                evtsData.addEvent(top.nodesByOrder.values(), newEvts.get(i));
+
+            saveAndProcessNewEvents();
+        }
+    }
+
     private void processLocalJoin(ZkDiscoveryEventsData evtsData, ZkDiscoveryNodeJoinEventData
evtData)
         throws Exception
     {
@@ -1161,47 +1240,22 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
-     * @param evtData
-     * @throws Exception
-     */
-    private void processNodesAckEvent(ZkDiscoveryEventData evtData) throws Exception {
-        switch (evtData.eventType()) {
-            case EventType.EVT_NODE_JOINED: {
-                processNodesAckJoinEvent((ZkDiscoveryNodeJoinEventData)evtData);
-
-                break;
-            }
-
-            case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: {
-                processNodesAckCustomEvent((ZkDiscoveryCustomEventData)evtData);
-
-                break;
-            }
-
-            case EventType.EVT_NODE_FAILED: {
-                log.info("All nodes processed node fail [evtId=" + evtData.eventId() + ']');
-
-                // Do not need cleanup.
-                break;
-            }
-        }
-    }
-
-    /**
      * @param failedNode Failed node.
      */
     private void processEventAcksOnNodeFail(ZookeeperClusterNode failedNode) throws Exception
{
+        boolean processed = false;
+
         for (Iterator<Map.Entry<Long, ZkDiscoveryEventData>> it = evtsData.evts.entrySet().iterator();
it.hasNext();) {
             Map.Entry<Long, ZkDiscoveryEventData> e = it.next();
 
             ZkDiscoveryEventData evtData = e.getValue();
 
-            if (evtData.onNodeFail(failedNode)) {
-                processNodesAckEvent(evtData);
-
-                it.remove();
-            }
+            if (evtData.onNodeFail(failedNode))
+                processed = true;
         }
+
+        if (processed)
+            handleProcessedEvents();
     }
 
     /**
@@ -1218,11 +1272,25 @@ public class ZookeeperDiscoveryImpl {
     /**
      * @param evtData Event data.
      * @throws Exception If failed.
+     * @return Ack message.
      */
-    private void processNodesAckCustomEvent(ZkDiscoveryCustomEventData evtData) throws Exception
{
+    @Nullable private DiscoverySpiCustomMessage handleProcessedCustomEvent(ZkDiscoveryCustomEventData
evtData)
+        throws Exception
+    {
         log.info("All nodes processed custom event [evtId=" + evtData.eventId() + ']');
 
-        zkClient.deleteIfExists(zkPaths.customEventDataPath(evtData.evtPath), -1);
+        if (!evtData.ackEvent()) {
+            zkClient.deleteIfExists(zkPaths.customEventDataPath(false, evtData.evtPath),
-1);
+
+            assert evtData.msg != null || locNode.order() > evtData.topologyVersion()
: evtData;
+
+            if (evtData.msg != null)
+                return evtData.msg.ackMessage();
+        }
+        else
+            zkClient.deleteIfExists(zkPaths.customEventDataPath(true, evtData.evtPath), -1);
+
+        return null;
     }
 
     /**
@@ -1352,7 +1420,7 @@ public class ZookeeperDiscoveryImpl {
 
                 if (path.equals(zkPaths.evtsPath)) {
                     if (!crd)
-                        onEventsUpdate(data);
+                        processNewEvents(data);
                 }
                 else
                     U.warn(log, "Data callback for unknown path: " + path);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0aba812/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
index aa1f836..d579c08 100644
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
@@ -121,7 +121,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
 
         cfg.setCacheConfiguration(ccfg);
 
-        cfg.setMarshaller(new JdkMarshaller());
+        // cfg.setMarshaller(new JdkMarshaller());
 
         cfg.setClientMode(client);
 
@@ -609,6 +609,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
             node.compute().broadcast(new DummyCallable(null));
 
         awaitPartitionMapExchange();
+
+        waitForEventsAcks(ignite(0));
     }
 
     /**


Mime
View raw message