ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/2] ignite git commit: zk
Date Thu, 07 Dec 2017 09:49:41 GMT
zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a11e06a5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a11e06a5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a11e06a5

Branch: refs/heads/ignite-zk
Commit: a11e06a5bde8b5e25b37374504030423c0062183
Parents: 3d1c0e8
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Dec 7 12:49:31 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Dec 7 12:49:31 2017 +0300

----------------------------------------------------------------------
 .../discovery/zk/internal/ZkIgnitePaths.java    |  61 ++++++++---
 .../discovery/zk/internal/ZookeeperClient.java  |   6 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 102 +++++++++++++++----
 .../ZookeeperDiscoverySpiBasicTest.java         |  77 +++++++++-----
 4 files changed, 183 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a11e06a5/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index c9c0281..0d47658 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -30,13 +30,16 @@ class ZkIgnitePaths {
     private static final String JOIN_DATA_DIR = "jd";
 
     /** */
-    private static final String CUSTOM_EVTS_DIR = "c";
+    private static final String CUSTOM_EVTS_DIR = "ce";
+
+    /** */
+    private static final String CUSTOM_EVTS_PARTS_DIR = "cp";
 
     /** */
     private static final String CUSTOM_EVTS_ACKS_DIR = "ca";
 
     /** */
-    private static final String ALIVE_NODES_DIR = "n";
+    static final String ALIVE_NODES_DIR = "n";
 
     /** */
     private static final String DISCO_EVENTS_PATH = "e";
@@ -57,6 +60,9 @@ class ZkIgnitePaths {
     final String customEvtsDir;
 
     /** */
+    final String customEvtsPartsDir;
+
+    /** */
     final String customEvtsAcksDir;
 
     /**
@@ -69,6 +75,7 @@ class ZkIgnitePaths {
         joinDataDir = zkPath(JOIN_DATA_DIR);
         evtsPath = zkPath(DISCO_EVENTS_PATH);
         customEvtsDir = zkPath(CUSTOM_EVTS_DIR);
+        customEvtsPartsDir = zkPath(CUSTOM_EVTS_PARTS_DIR);
         customEvtsAcksDir = zkPath(CUSTOM_EVTS_ACKS_DIR);
     }
 
@@ -201,7 +208,7 @@ class ZkIgnitePaths {
      * @return Event node ID.
      */
     static UUID customEventSendNodeId(String path) {
-        // <uuid prefix>:<node id>|<seq>
+        // <uuid prefix>:<node id>:<partCnt>|<seq>
         int startIdx = ZkIgnitePaths.UUID_LEN + 1;
 
         String idStr = path.substring(startIdx, startIdx + ZkIgnitePaths.UUID_LEN);
@@ -209,6 +216,41 @@ class ZkIgnitePaths {
         return UUID.fromString(idStr);
     }
 
+    static String customEventPrefix(String path) {
+        // <uuid prefix>:<node id>:<partCnt>|<seq>
+
+        return path.substring(0, ZkIgnitePaths.UUID_LEN);
+    }
+
+    /**
+     * @param path Custom event zl path.
+     * @return Event node ID.
+     */
+    static int customEventPartsCount(String path) {
+        // <uuid prefix>:<node id>:<partCnt>|<seq>
+        int startIdx = 2 * ZkIgnitePaths.UUID_LEN + 2;
+
+        String cntStr = path.substring(startIdx, startIdx + 4);
+
+        int partCnt = Integer.parseInt(cntStr);
+
+        assert partCnt >= 1 : partCnt;
+
+        return partCnt;
+    }
+
+    String createCustomEventPath(String prefix, UUID nodeId, int partCnt) {
+        return customEvtsDir + "/" + prefix + ":" + nodeId + ":" + String.format("%04d",
partCnt) + '|';
+    }
+
+    String customEventPartsBasePath(String prefix, UUID nodeId) {
+        return customEvtsPartsDir + "/" + prefix + ":" + nodeId + ":";
+    }
+
+    String customEventPartPath(String prefix, UUID nodeId, int part) {
+        return customEventPartsBasePath(prefix, nodeId) + String.format("%04d", part);
+    }
+
     /**
      * @param evtId Event ID.
      * @return Event zk path.
@@ -222,17 +264,6 @@ class ZkIgnitePaths {
      * @return Path for custom event ack.
      */
     String ackEventDataPath(long evtId) {
-        return customEventDataPath(true, String.valueOf(evtId));
-    }
-
-    /**
-     * @param ack Ack event flag.
-     * @param child Event child path.
-     * @return Full event data path.
-     */
-    String customEventDataPath(boolean ack, String child) {
-        String baseDir = ack ? customEvtsAcksDir : customEvtsDir;
-
-        return baseDir + "/" + child;
+        return customEvtsAcksDir + "/" + String.valueOf(evtId);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a11e06a5/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index 4886572..0d81cb1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -359,14 +359,14 @@ public class ZookeeperClient implements Watcher {
     /**
      * @param checkPrefix Unique prefix to check in case of retry.
      * @param parentPath Parent node path.
-     * @param childPath  Child path.
+     * @param path Node to create.
      * @param data Node data.
      * @param createMode Create mode.
      * @return Create path.
      * @throws ZookeeperClientFailedException If connection to zk was lost.
      * @throws InterruptedException If interrupted.
      */
-    String createSequential(String checkPrefix, String parentPath, String childPath, byte[]
data, CreateMode createMode)
+    String createSequential(String checkPrefix, String parentPath, String path, byte[] data,
CreateMode createMode)
         throws ZookeeperClientFailedException, InterruptedException
     {
         assert createMode.isSequential() : createMode;
@@ -376,8 +376,6 @@ public class ZookeeperClient implements Watcher {
 
         boolean first = true;
 
-        String path = parentPath + "/" + childPath;
-
         for (;;) {
             long connStartTime = this.connStartTime;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a11e06a5/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 9f405b4..93015ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -435,11 +435,33 @@ public class ZookeeperDiscoveryImpl {
             checkState();
 
         try {
+            ZookeeperClient zkClient = rtState.zkClient;
+
             String prefix = UUID.randomUUID().toString();
 
-            rtState.zkClient.createSequential(prefix,
+            int partCnt = 1;
+
+            int overhead = 10;
+
+            UUID locId = locNode.id();
+
+            String path = zkPaths.createCustomEventPath(prefix, locId, partCnt);
+
+            if (zkClient.needSplitNodeData(path, msgBytes, overhead)) {
+                List<byte[]> parts = zkClient.splitNodeData(path, msgBytes, overhead);
+
+                String partsBasePath = zkPaths.customEventPartsBasePath(prefix, locId);
+
+                saveMultipleParts(zkClient, partsBasePath, parts);
+
+                msgBytes = null;
+
+                partCnt = parts.size();
+            }
+
+            zkClient.createSequential(prefix,
                 zkPaths.customEvtsDir,
-                prefix + ":" + locNode.id() + '|',
+                zkPaths.createCustomEventPath(prefix, locId, partCnt),
                 msgBytes,
                 CreateMode.PERSISTENT_SEQUENTIAL);
         }
@@ -545,6 +567,7 @@ public class ZookeeperDiscoveryImpl {
             dirs.add(zkPaths.evtsPath);
             dirs.add(zkPaths.joinDataDir);
             dirs.add(zkPaths.customEvtsDir);
+            dirs.add(zkPaths.customEvtsPartsDir);
             dirs.add(zkPaths.customEvtsAcksDir);
             dirs.add(zkPaths.aliveNodesDir);
 
@@ -674,7 +697,7 @@ public class ZookeeperDiscoveryImpl {
             rtState.locNodeZkPath = zkClient.createSequential(
                 prefix,
                 zkPaths.aliveNodesDir,
-                prefix + ":" + locNode.id() + "|",
+                zkPaths.aliveNodesDir + "/" + prefix + ":" + locNode.id() + "|",
                 null,
                 EPHEMERAL_SEQUENTIAL);
 
@@ -1343,6 +1366,10 @@ public class ZookeeperDiscoveryImpl {
             rtState.zkClient.getChildren(zkPaths.customEvtsDir),
             -1);
 
+        rtState.zkClient.deleteAll(zkPaths.customEvtsPartsDir,
+            rtState.zkClient.getChildren(zkPaths.customEvtsPartsDir),
+            -1);
+
         rtState.zkClient.deleteAll(zkPaths.customEvtsAcksDir,
             rtState.zkClient.getChildren(zkPaths.customEvtsAcksDir),
             -1);
@@ -1371,6 +1398,19 @@ public class ZookeeperDiscoveryImpl {
         return rtState.zkClient;
     }
 
+    private byte[] readCustomEventData(String evtPath, UUID sndNodeId) throws Exception {
+        int partCnt = ZkIgnitePaths.customEventPartsCount(evtPath);
+
+        if (partCnt > 1) {
+            String partsBasePath = zkPaths.customEventPartsBasePath(
+                ZkIgnitePaths.customEventPrefix(evtPath), sndNodeId);
+
+            return readMultipleParts(rtState.zkClient, partsBasePath, partCnt);
+        }
+        else
+            return rtState.zkClient.getData(zkPaths.customEvtsDir + "/" + evtPath);
+    }
+
     /**
      * @param customEvtNodes ZK nodes representing custom events to process.
      * @throws Exception If failed.
@@ -1397,17 +1437,17 @@ public class ZookeeperDiscoveryImpl {
             Set<UUID> alives = null;
 
             for (Map.Entry<Integer, String> evtE : newEvts.entrySet()) {
-                UUID sndNodeId = ZkIgnitePaths.customEventSendNodeId(evtE.getValue());
+                String evtPath = evtE.getValue();
+
+                UUID sndNodeId = ZkIgnitePaths.customEventSendNodeId(evtPath);
 
                 ZookeeperClusterNode sndNode = rtState.top.nodesById.get(sndNodeId);
 
                 if (alives != null && !alives.contains(sndNode.id()))
                     sndNode = null;
 
-                String evtDataPath = zkPaths.customEvtsDir + "/" + evtE.getValue();
-
                 if (sndNode != null) {
-                    byte[] evtBytes = rtState.zkClient.getData(zkPaths.customEvtsDir + "/"
+ evtE.getValue());
+                    byte[] evtBytes = readCustomEventData(evtPath, sndNodeId);
 
                     DiscoverySpiCustomMessage msg;
 
@@ -1449,7 +1489,7 @@ public class ZookeeperDiscoveryImpl {
                             rtState.evtsData.evtIdGen,
                             rtState.evtsData.topVer,
                             sndNodeId,
-                            evtE.getValue(),
+                            evtPath,
                             false);
 
                         evtData.msg = msg;
@@ -1461,12 +1501,14 @@ public class ZookeeperDiscoveryImpl {
                     }
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to unmarshal custom discovery message: " + e,
e);
+
+                        deleteCustomEventData(rtState.zkClient, evtPath);
                     }
                 }
                 else {
                     U.warn(log, "Ignore custom event from unknown node: " + sndNodeId);
 
-                    rtState.zkClient.deleteIfExistsAsync(evtDataPath);
+                    deleteCustomEventData(rtState.zkClient, evtPath);
                 }
 
                 rtState.evtsData.procCustEvt = evtE.getKey();
@@ -1476,6 +1518,27 @@ public class ZookeeperDiscoveryImpl {
         }
     }
 
+    private void deleteCustomEventData(ZookeeperClient zkClient, String evtPath) {
+        if (log.isDebugEnabled())
+            log.debug("Delete custom event data: " + evtPath);
+
+        String prefix = ZkIgnitePaths.customEventPrefix(evtPath);
+        UUID sndNodeId = ZkIgnitePaths.customEventSendNodeId(evtPath);
+        int partCnt = ZkIgnitePaths.customEventPartsCount(evtPath);
+
+        assert partCnt >= 1 : partCnt;
+
+        if (partCnt > 1) {
+            for (int i = 0; i < partCnt; i++) {
+                String path0 = zkPaths.customEventPartPath(prefix, sndNodeId, i);
+
+                zkClient.deleteIfExistsAsync(path0);
+            }
+        }
+
+        zkClient.deleteIfExistsAsync(zkPaths.customEvtsDir + "/" + evtPath);
+    }
+
     /**
      * @param data Marshalled events.
      * @throws Exception If failed.
@@ -1586,14 +1649,16 @@ public class ZookeeperDiscoveryImpl {
                             msg = evtData0.msg;
                         }
                         else {
-                            String path;
+                            if (evtData0.ackEvent()) {
+                                String path = zkPaths.ackEventDataPath(evtData0.eventId());
 
-                            if (evtData0.ackEvent())
-                                path = zkPaths.ackEventDataPath(evtData0.eventId());
-                            else
-                                path = zkPaths.customEventDataPath(false, evtData0.evtPath);
+                                msg = unmarshalZip(rtState.zkClient.getData(path));
+                            }
+                            else {
+                                byte[] msgBytes = readCustomEventData(evtData0.evtPath, evtData0.sndNodeId);
 
-                            msg = unmarshalZip(rtState.zkClient.getData(path));
+                                msg = unmarshalZip(msgBytes);
+                            }
 
                             evtData0.msg = msg;
                         }
@@ -2035,12 +2100,7 @@ public class ZookeeperDiscoveryImpl {
             log.debug("All nodes processed custom event [ctx=" + ctx + ", evtData=" + evtData
+ ']');
 
         if (!evtData.ackEvent()) {
-            String path = zkPaths.customEventDataPath(false, evtData.evtPath);
-
-            if (log.isDebugEnabled())
-                log.debug("Delete path: " + path);
-
-            rtState.zkClient.deleteIfExistsAsync(path);
+            deleteCustomEventData(rtState.zkClient, evtData.evtPath);
 
             assert evtData.msg != null || locNode.order() > evtData.topologyVersion()
: evtData;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a11e06a5/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 56d1eaf..e0062c1 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -41,7 +41,6 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -76,6 +75,7 @@ import org.apache.zookeeper.ZooKeeper;
 import org.jetbrains.annotations.Nullable;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
 import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -158,7 +158,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
 
         CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
 
-        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
 
         cfg.setCacheConfiguration(ccfg);
 
@@ -780,7 +780,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
             assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
                 @Override public boolean apply() {
                     try {
-                        List<String> c = zkClient.getChildren(IGNITE_ZK_ROOT + "/n");
+                        List<String> c = zkClient.getChildren(IGNITE_ZK_ROOT + "/"
+ ZkIgnitePaths.ALIVE_NODES_DIR);
 
                         for (String failedZkNode : failedZkNodes) {
                             if (c.contains(failedZkNode)) {
@@ -1364,32 +1364,29 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
     public void testLargeCustomEvent() throws Exception {
         Ignite srv0 = startGrid(0);
 
-        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>("c1");
+        // Send large message, single node in topology.
+        IgniteCache cache = srv0.createCache(largeCacheConfiguration("c1"));
 
-        ccfg.setAffinity(new TestAffinityFunction(1024 * 1024));
+        for (int i = 0; i < 100; i++)
+            cache.put(i, i);
 
-        srv0.createCache(ccfg);
-    }
+        assertEquals(1, cache.get(1));
 
-    /**
-     *
-     */
-    static class TestAffinityFunction extends RendezvousAffinityFunction {
-        /** */
-        private static final long serialVersionUID = 0L;
+        waitForEventsAcks(ignite(0));
 
-        /** */
-        private int[] dummyData;
+        startGridsMultiThreaded(1, 3);
 
-        /**
-         * @param dataSize Dummy data size.
-         */
-        TestAffinityFunction(int dataSize) {
-            dummyData = new int[dataSize];
+        srv0.destroyCache("c1");
 
-            for (int i = 0; i < dataSize; i++)
-                dummyData[i] = i;
-        }
+        // Send large message, multiple nodes in topology.
+        cache = srv0.createCache(largeCacheConfiguration("c1"));
+
+        for (int i = 0; i < 100; i++)
+            cache.put(i, i);
+
+        printZkNodes();
+
+        waitForTopology(4);
     }
 
     /**
@@ -2068,6 +2065,40 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
     }
 
     /**
+     * @param cacheName Cache name.
+     * @return Configuration.
+     */
+    private CacheConfiguration largeCacheConfiguration(String cacheName) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(cacheName);
+
+        ccfg.setAffinity(new TestAffinityFunction(1024 * 1024));
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    static class TestAffinityFunction extends RendezvousAffinityFunction {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private int[] dummyData;
+
+        /**
+         * @param dataSize Dummy data size.
+         */
+        TestAffinityFunction(int dataSize) {
+            dummyData = new int[dataSize];
+
+            for (int i = 0; i < dataSize; i++)
+                dummyData[i] = i;
+        }
+    }
+
+    /**
      *
      */
     private static class DummyCallable implements IgniteCallable<Object> {


Mime
View raw message