ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: zk
Date Fri, 29 Dec 2017 13:05:40 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 308bef23b -> 0918da57c


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0918da57
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0918da57
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0918da57

Branch: refs/heads/ignite-zk
Commit: 0918da57ca7439c333566c3404531ff6f0e2312a
Parents: 308bef2
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Dec 29 14:58:31 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Dec 29 15:58:53 2017 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/IgnitionEx.java  |  6 ++
 .../processors/cache/GridCacheProcessor.java    |  4 +-
 .../discovery/zk/internal/ZookeeperClient.java  | 71 ++++++++++++++------
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 12 ++++
 .../cache/GridCacheAbstractSelfTest.java        | 10 ++-
 .../CacheVersionedEntryAbstractTest.java        | 33 +++------
 .../zk/internal/ZookeeperDiscoverySpiTest.java  | 54 +++++++++++++++
 7 files changed, 144 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0918da57/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index b4345b9..36b3ce6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -102,6 +102,7 @@ import org.apache.ignite.spi.checkpoint.noop.NoopCheckpointSpi;
 import org.apache.ignite.spi.collision.noop.NoopCollisionSpi;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.deployment.local.LocalDeploymentSpi;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
 import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
@@ -2294,6 +2295,11 @@ public class IgnitionEx {
 
                 ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi();
 
+                DiscoverySpi spi = myCfg.getDiscoverySpi();
+
+                if (spi instanceof TcpDiscoverySpi)
+                    zkSpi.setClientReconnectDisabled(((TcpDiscoverySpi)spi).isClientReconnectDisabled());
+
                 zkSpi.setSessionTimeout(20_000);
                 zkSpi.setZkConnectionString(zkCluster.getConnectString());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0918da57/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 30033d3..4bf1ce6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -3250,7 +3250,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @return Validation result or {@code null} in case of success.
      */
     @Nullable private IgniteNodeValidationResult validateHashIdResolvers(ClusterNode node)
{
-        if (!node.isClient()) {
+        if (!CU.clientNode(node)) {
             for (DynamicCacheDescriptor desc : cacheDescriptors().values()) {
                 CacheConfiguration cfg = desc.cacheConfiguration();
 
@@ -3259,7 +3259,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                     Object nodeHashObj = aff.resolveNodeHash(node);
 
-                    for (ClusterNode topNode : ctx.discovery().allNodes()) {
+                    for (ClusterNode topNode : ctx.discovery().aliveServerNodes()) {
                         Object topNodeHashObj = aff.resolveNodeHash(topNode);
 
                         if (nodeHashObj.hashCode() == topNodeHashObj.hashCode()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0918da57/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index df0bb43..f5ecf52 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -47,6 +47,9 @@ public class ZookeeperClient implements Watcher {
         IgniteSystemProperties.getLong("IGNITE_ZOOKEEPER_DISCOVERY_RETRY_TIMEOUT", 1000);
 
     /** */
+    private static final int MAX_REQ_SIZE = 1048528;
+
+    /** */
     private static final List<ACL> ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
 
     /** */
@@ -142,6 +145,15 @@ public class ZookeeperClient implements Watcher {
         return zk;
     }
 
+    /**
+     * @return {@code True} if connected to ZooKeeper.
+     */
+    boolean connected() {
+        synchronized (stateMux) {
+            return state == ConnectionState.Connected;
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void process(WatchedEvent evt) {
         if (closing)
@@ -179,14 +191,14 @@ public class ZookeeperClient implements Watcher {
                         break;
 
                     default:
-                        U.error(log, "Unexpected state for zookeeper client, close connection:
" + zkState);
+                        U.error(log, "Unexpected state for ZooKeeper client, close connection:
" + zkState);
 
                         newState = ConnectionState.Lost;
                 }
 
                 if (newState != state) {
                     if (log.isInfoEnabled())
-                        log.info("Zookeeper client state changed [prevState=" + state + ",
newState=" + newState + ']');
+                        log.info("ZooKeeper client state changed [prevState=" + state + ",
newState=" + newState + ']');
 
                     state = newState;
 
@@ -278,9 +290,6 @@ public class ZookeeperClient implements Watcher {
 
     }
 
-    /** */
-    private static final int MAX_REQ_SIZE = 1048528;
-
     /**
      * @param path Path.
      * @param data Data.
@@ -291,6 +300,12 @@ public class ZookeeperClient implements Watcher {
         return requestOverhead(path) + data.length + overhead > MAX_REQ_SIZE;
     }
 
+    /**
+     * @param path Path.
+     * @param data Data.
+     * @param overhead Extra overhead.
+     * @return Splitted data.
+     */
     List<byte[]> splitNodeData(String path, byte[] data, int overhead) {
         int partSize = MAX_REQ_SIZE - requestOverhead(path) - overhead;
 
@@ -447,6 +462,21 @@ public class ZookeeperClient implements Watcher {
 
     /**
      * @param path Path.
+     * @param ver Expected version.
+     * @throws InterruptedException If interrupted.
+     * @throws KeeperException In case of error.
+     */
+    void deleteIfExistsNoRetry(String path, int ver) throws InterruptedException, KeeperException
{
+        try {
+            zk.delete(path, ver);
+        }
+        catch (KeeperException.NoNodeException e) {
+            // No-op if znode does not exist.
+        }
+    }
+
+    /**
+     * @param path Path.
      * @param ver Version.
      * @throws ZookeeperClientFailedException If connection to zk was lost.
      * @throws InterruptedException If interrupted.
@@ -458,7 +488,7 @@ public class ZookeeperClient implements Watcher {
             delete(path, ver);
         }
         catch (KeeperException.NoNodeException e) {
-            // No-op if node does not exist.
+            // No-op if znode does not exist.
         }
     }
 
@@ -493,6 +523,9 @@ public class ZookeeperClient implements Watcher {
 
                 return;
             }
+            catch (KeeperException.NoNodeException e) {
+                throw e;
+            }
             catch (Exception e) {
                 onZookeeperError(connStartTime, e);
             }
@@ -532,9 +565,12 @@ public class ZookeeperClient implements Watcher {
      * @param ver Version.
      * @throws ZookeeperClientFailedException If connection to zk was lost.
      * @throws InterruptedException If interrupted.
+     * @throws KeeperException.NoNodeException If node does not exist.
+     * @throws KeeperException.BadVersionException If version does not match.
      */
     void setData(String path, byte[] data, int ver)
-        throws ZookeeperClientFailedException, InterruptedException, KeeperException.NoNodeException,
KeeperException.BadVersionException
+        throws ZookeeperClientFailedException, InterruptedException, KeeperException.NoNodeException,
+        KeeperException.BadVersionException
     {
         if (data == null)
             data = EMPTY_BYTES;
@@ -547,10 +583,7 @@ public class ZookeeperClient implements Watcher {
 
                 return;
             }
-            catch (KeeperException.NoNodeException e) {
-                throw e;
-            }
-            catch (KeeperException.BadVersionException e) {
+            catch (KeeperException.BadVersionException | KeeperException.NoNodeException
e) {
                 throw e;
             }
             catch (Exception e) {
@@ -683,9 +716,9 @@ public class ZookeeperClient implements Watcher {
 
         synchronized (stateMux) {
             if (closing)
-                throw new ZookeeperClientFailedException("Zookeeper client is closed.");
+                throw new ZookeeperClientFailedException("ZooKeeper client is closed.");
 
-            U.warn(log, "Failed to execute zookeeper operation [err=" + e + ", state=" +
state + ']');
+            U.warn(log, "Failed to execute ZooKeeper operation [err=" + e + ", state=" +
state + ']');
 
             if (state == ConnectionState.Lost) {
                 U.error(log, "Operation failed with unexpected error, connection lost: "
+ e, e);
@@ -715,7 +748,7 @@ public class ZookeeperClient implements Watcher {
                     if (remainingTime <= 0) {
                         state = ConnectionState.Lost;
 
-                        U.warn(log, "Failed to establish zookeeper connection, close client
" +
+                        U.warn(log, "Failed to establish ZooKeeper connection, close client
" +
                             "[timeout=" + connLossTimeout + ']');
 
                         err = new ZookeeperClientFailedException(e);
@@ -723,7 +756,7 @@ public class ZookeeperClient implements Watcher {
                 }
 
                 if (err == null) {
-                    U.warn(log, "Zookeeper operation failed, will retry [err=" + e +
+                    U.warn(log, "ZooKeeper operation failed, will retry [err=" + e +
                         ", retryTimeout=" + RETRY_TIMEOUT +
                         ", connLossTimeout=" + connLossTimeout +
                         ", path=" + ((KeeperException)e).getPath() +
@@ -732,11 +765,11 @@ public class ZookeeperClient implements Watcher {
                     stateMux.wait(RETRY_TIMEOUT);
 
                     if (closing)
-                        throw new ZookeeperClientFailedException("Zookeeper client is closed.");
+                        throw new ZookeeperClientFailedException("ZooKeeper client is closed.");
                 }
             }
             else {
-                U.error(log, "Operation failed with unexpected error, close client: " + e,
e);
+                U.error(log, "Operation failed with unexpected error, close ZooKeeper client:
" + e, e);
 
                 state = ConnectionState.Lost;
 
@@ -771,7 +804,7 @@ public class ZookeeperClient implements Watcher {
             zk.close();
         }
         catch (Exception closeErr) {
-            U.warn(log, "Failed to close zookeeper client: " + closeErr, closeErr);
+            U.warn(log, "Failed to close ZooKeeper client: " + closeErr, closeErr);
         }
 
         connTimer.cancel();
@@ -1116,7 +1149,7 @@ public class ZookeeperClient implements Watcher {
 
                     state = ConnectionState.Lost;
 
-                    U.warn(log, "Failed to establish zookeeper connection, close client "
+
+                    U.warn(log, "Failed to establish ZooKeeper connection, close client "
+
                         "[timeout=" + connLossTimeout + ']');
 
                     connLoss = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0918da57/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 10755ad..10d8061 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -3607,6 +3607,18 @@ public class ZookeeperDiscoveryImpl {
         if (!stop.compareAndSet(false, true))
             return;
 
+        ZkRuntimeState rtState = this.rtState;
+
+        if (rtState.zkClient != null && rtState.locNodeZkPath != null &&
rtState.zkClient.connected()) {
+            try {
+                rtState.zkClient.deleteIfExistsNoRetry(rtState.locNodeZkPath, -1);
+            }
+            catch (Exception err) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to delete local node's znode on stop: " + err);
+            }
+        }
+
         IgniteCheckedException err = new IgniteCheckedException("Node stopped.");
 
         synchronized (stateMux) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0918da57/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
index 7a8453c..80ffdef 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
@@ -95,9 +95,15 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest
{
 
         initStoreStrategy();
 
-        startGridsMultiThreaded(1, cnt);
+        IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(0));
 
-        startGrid(0);
+        if (cfg.isClientMode() != null && cfg.isClientMode()) {
+            startGridsMultiThreaded(1, cnt);
+
+            startGrid(getTestIgniteInstanceName(0), cfg, null);
+        }
+        else
+            startGrids(cnt);
 
         awaitPartitionMapExchange();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0918da57/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java
index 61ceef7..16ea848 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java
@@ -18,11 +18,12 @@
 package org.apache.ignite.internal.processors.cache.version;
 
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.Cache;
 import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheEntry;
@@ -56,23 +57,15 @@ public abstract class CacheVersionedEntryAbstractTest extends GridCacheAbstractS
     public void testInvoke() throws Exception {
         Cache<Integer, String> cache = grid(0).cache(DEFAULT_CACHE_NAME);
 
-        final AtomicInteger invoked = new AtomicInteger();
-
-        cache.invoke(100, new EntryProcessor<Integer, String, Object>() {
-            @Override public Object process(MutableEntry<Integer, String> entry, Object...
arguments)
-                throws EntryProcessorException {
-
-                invoked.incrementAndGet();
-
+        assertNotNull(cache.invoke(100, new EntryProcessor<Integer, String, Object>()
{
+            @Override public Object process(MutableEntry<Integer, String> entry, Object...
args) {
                 CacheEntry<Integer, String> verEntry = entry.unwrap(CacheEntry.class);
 
                 checkVersionedEntry(verEntry);
 
-                return entry;
+                return verEntry.version();
             }
-        });
-
-        assert invoked.get() > 0;
+        }));
     }
 
     /**
@@ -86,23 +79,17 @@ public abstract class CacheVersionedEntryAbstractTest extends GridCacheAbstractS
         for (int i = 0; i < ENTRIES_NUM; i++)
             keys.add(i);
 
-        final AtomicInteger invoked = new AtomicInteger();
-
-        cache.invokeAll(keys, new EntryProcessor<Integer, String, Object>() {
-            @Override public Object process(MutableEntry<Integer, String> entry, Object...
arguments)
-                throws EntryProcessorException {
-
-                invoked.incrementAndGet();
-
+        Map<Integer, EntryProcessorResult<Object>> res = cache.invokeAll(keys,
new EntryProcessor<Integer, String, Object>() {
+            @Override public Object process(MutableEntry<Integer, String> entry, Object...
args) {
                 CacheEntry<Integer, String> verEntry = entry.unwrap(CacheEntry.class);
 
                 checkVersionedEntry(verEntry);
 
-                return null;
+                return verEntry.version();
             }
         });
 
-        assert invoked.get() > 0;
+        assertEquals(ENTRIES_NUM, res.size());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/0918da57/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
index 314e3ab..0e7141a 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
@@ -1643,6 +1643,60 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest
{
     /**
      * @throws Exception If failed.
      */
+    public void testMultipleClusters() throws Exception {
+        Ignite c0 = startGrid(0);
+
+        zkRootPath = "/cluster2";
+
+        Ignite c1 = startGridsMultiThreaded(1, 5);
+
+        zkRootPath = "/cluster3";
+
+        Ignite c2 = startGridsMultiThreaded(6, 3);
+
+        checkNodesNumber(c0, 1);
+        checkNodesNumber(c1, 5);
+        checkNodesNumber(c2, 3);
+
+        stopGrid(2);
+
+        checkNodesNumber(c0, 1);
+        checkNodesNumber(c1, 4);
+        checkNodesNumber(c2, 3);
+
+        for (int i = 0; i < 3; i++)
+            stopGrid(i + 6);
+
+        checkNodesNumber(c0, 1);
+        checkNodesNumber(c1, 4);
+
+        c2 = startGridsMultiThreaded(6, 2);
+
+        checkNodesNumber(c0, 1);
+        checkNodesNumber(c1, 4);
+        checkNodesNumber(c2, 2);
+
+        evts.clear();
+    }
+
+    /**
+     * @param node Node.
+     * @param expNodes Expected node in cluster.
+     * @throws Exception If failed.
+     */
+    private void checkNodesNumber(final Ignite node, final int expNodes) throws Exception
{
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return node.cluster().nodes().size() == expNodes;
+            }
+        }, 5000);
+
+        assertEquals(expNodes, node.cluster().nodes().size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testStartStop1() throws Exception {
         ackEveryEventSystemProperty();
 


Mime
View raw message