ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: zk
Date Tue, 12 Dec 2017 13:37:46 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 0cf778363 -> 953f07929


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/953f0792
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/953f0792
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/953f0792

Branch: refs/heads/ignite-zk
Commit: 953f079299ed4b662285174058a4c3b9d8320c86
Parents: 0cf7783
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Dec 12 16:26:40 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Dec 12 16:37:38 2017 +0300

----------------------------------------------------------------------
 .../discovery/CommunicationProblemContext.java  |  62 ++++
 .../discovery/CommunicationProblemResolver.java |  28 ++
 .../DefaultCommunicationProblemResolver.java    | 172 +++++++++++
 .../discovery/zk/internal/ZkRuntimeState.java   |  46 +++
 .../discovery/zk/internal/ZookeeperClient.java  |   9 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 288 +++++++++++--------
 .../distributed/IgniteCacheManyClientsTest.java |  44 +--
 7 files changed, 516 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/953f0792/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemContext.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemContext.java
new file mode 100644
index 0000000..71673f1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemContext.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+
+/**
+ *
+ */
+public interface CommunicationProblemContext {
+    /**
+     * @return Current topology snapshot.
+     */
+    public List<ClusterNode> topologySnapshot();
+
+    /**
+     * @param node1 First node.
+     * @param node2 Second node.
+     * @return {@code True} if {@link CommunicationSpi} is able to establish connection from
first node to second node.
+     */
+    public boolean connectionAvailable(ClusterNode node1, ClusterNode node2);
+
+    /**
+     * @return List of currently started cache.
+     */
+    public List<String> startedCaches();
+
+    /**
+     * @param cacheName Cache name.
+     * @return Cache partitions affinity assignment.
+     */
+    public List<List<ClusterNode>> cacheAffinity(String cacheName);
+
+    /**
+     * @param cacheName Cache name.
+     * @return Cache partitions owners.
+     */
+    public List<List<ClusterNode>> cachePartitionOwners(String cacheName);
+
+    /**
+     * @param node Node to kill.
+     */
+    public void killNode(ClusterNode node);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/953f0792/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemResolver.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemResolver.java
new file mode 100644
index 0000000..a9b620b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemResolver.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery;
+
+/**
+ *
+ */
+public interface CommunicationProblemResolver {
+    /**
+     * @param ctx Context.
+     */
+    public void resolve(CommunicationProblemContext ctx);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/953f0792/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java
new file mode 100644
index 0000000..4d0262d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import java.util.BitSet;
+import java.util.List;
+import org.apache.ignite.cluster.ClusterNode;
+
+/**
+ *
+ */
+public class DefaultCommunicationProblemResolver implements CommunicationProblemResolver
{
+    /** {@inheritDoc} */
+    @Override public void resolve(CommunicationProblemContext ctx) {
+        ClusterGraph graph = new ClusterGraph(ctx);
+
+        BitSet cluster = graph.findLargestIndependentCluster();
+
+        List<ClusterNode> nodes = ctx.topologySnapshot();
+
+        if (graph.checkFullyConnected(cluster) && cluster.cardinality() < nodes.size())
{
+            for (int i = 0; i < nodes.size(); i++) {
+                if (!cluster.get(i))
+                    ctx.killNode(nodes.get(i));
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class ClusterGraph {
+        /** */
+        private final static int WORD_IDX_SHIFT = 6;
+
+        /**
+         * @param bitIndex Bit index.
+         * @return Word index containing bit with given index.
+         */
+        private static int wordIndex(int bitIndex) {
+            return bitIndex >> WORD_IDX_SHIFT;
+        }
+
+        /** */
+        private final int nodeCnt;
+
+        /** */
+        private final long[] visitBitSet;
+
+        /** */
+        private final CommunicationProblemContext ctx;
+
+        /** */
+        private final List<ClusterNode> nodes;
+
+        ClusterGraph(CommunicationProblemContext ctx) {
+            this.ctx = ctx;
+
+            nodes = ctx.topologySnapshot();
+
+            nodeCnt = nodes.size();
+
+            assert nodeCnt > 0;
+
+            visitBitSet = initBitSet(nodeCnt);
+        }
+
+        static long[] initBitSet(int bitCnt) {
+            return new long[wordIndex(bitCnt - 1) + 1];
+        }
+
+        BitSet findLargestIndependentCluster() {
+            BitSet maxCluster = null;
+            int maxClusterSize = 0;
+
+            for (int i = 0; i < nodeCnt; i++) {
+                if (getBit(visitBitSet, i))
+                    continue;
+
+                BitSet cluster = new BitSet(nodeCnt);
+
+                search(cluster, i);
+
+                int size = cluster.cardinality();
+
+                if (maxCluster == null || size > maxClusterSize) {
+                    maxCluster = cluster;
+                    maxClusterSize = size;
+                }
+            }
+
+            return maxCluster;
+        }
+
+        boolean checkFullyConnected(BitSet cluster) {
+            int startIdx = 0;
+
+            int clusterNodes = cluster.cardinality();
+
+            for (;;) {
+                int idx = cluster.nextSetBit(startIdx);
+
+                if (idx == -1)
+                    break;
+
+                ClusterNode node1 = nodes.get(idx);
+
+                for (int i = 0; i < clusterNodes; i++) {
+                    if (!cluster.get(i) || i == idx)
+                        continue;
+
+                    ClusterNode node2 = nodes.get(i);
+
+                    if (cluster.get(i) && ctx.connectionAvailable(node1, node2))
+                        return false;
+                }
+
+                startIdx = idx + 1;
+            }
+
+            return true;
+        }
+
+        void search(BitSet cluster, int idx) {
+            setBit(visitBitSet, idx);
+
+            cluster.set(idx);
+
+            ClusterNode node1 = nodes.get(idx);
+
+            for (int i = 0; i < nodeCnt; i++) {
+                if (i == idx || getBit(visitBitSet, i))
+                    continue;
+
+                ClusterNode node2 = nodes.get(i);
+
+                boolean connected = ctx.connectionAvailable(node1, node2) ||
+                    ctx.connectionAvailable(node2, node1);
+
+                if (connected)
+                    search(cluster, i);
+            }
+        }
+
+        static void setBit(long words[], int bitIndex) {
+            int wordIndex = wordIndex(bitIndex);
+
+            words[wordIndex] |= (1L << bitIndex);
+        }
+
+        static boolean getBit(long[] words, int bitIndex) {
+            int wordIndex = wordIndex(bitIndex);
+
+            return (words[wordIndex] & (1L << bitIndex)) != 0;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/953f0792/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
index 4653109..fc03f8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
@@ -18,12 +18,23 @@
 package org.apache.ignite.spi.discovery.zk.internal;
 
 import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.Watcher;
 
 /**
  *
  */
 class ZkRuntimeState {
     /** */
+    ZkWatcher watcher;
+
+    /** */
+    ZkAliveNodeDataWatcher aliveNodeDataWatcher;
+
+    /** */
+    volatile boolean closing;
+
+    /** */
     final boolean prevJoined;
 
     /** */
@@ -68,4 +79,39 @@ class ZkRuntimeState {
     ZkRuntimeState(boolean prevJoined) {
         this.prevJoined = prevJoined;
     }
+
+    /**
+     * @param watcher Watcher.
+     * @param aliveNodeDataWatcher Alive nodes data watcher.
+     */
+    void init(ZkWatcher watcher, ZkAliveNodeDataWatcher aliveNodeDataWatcher) {
+        this.watcher = watcher;
+        this.aliveNodeDataWatcher = aliveNodeDataWatcher;
+    }
+
+    /**
+     *
+     */
+    void onCloseStart() {
+        closing = true;
+
+        ZookeeperClient zkClient = this.zkClient;
+
+        if (zkClient != null)
+            zkClient.onCloseStart();
+    }
+
+    /**
+     *
+     */
+    interface ZkWatcher extends Watcher, AsyncCallback.Children2Callback, AsyncCallback.DataCallback
{
+        // No-op.
+    }
+
+    /**
+     *
+     */
+    interface ZkAliveNodeDataWatcher extends Watcher, AsyncCallback.DataCallback {
+        // No-op.
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/953f0792/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index a0bc2f0..a83886a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -348,7 +348,8 @@ public class ZookeeperClient implements Watcher {
                 return zk.create(path, data, ZK_ACL, createMode);
             }
             catch (KeeperException.NodeExistsException e) {
-                log.info("Node already exists: " + path);
+                if (log.isDebugEnabled())
+                    log.debug("Node already exists: " + path);
 
                 return path;
             }
@@ -391,7 +392,8 @@ public class ZookeeperClient implements Watcher {
                         if (children.get(i).startsWith(checkPrefix)) {
                             String resPath = parentPath + "/" + child;
 
-                            log.info("Check before retry, node already created: " + resPath);
+                            if (log.isDebugEnabled())
+                                log.debug("Check before retry, node already created: " +
resPath);
 
                             return resPath;
                         }
@@ -403,7 +405,8 @@ public class ZookeeperClient implements Watcher {
             catch (KeeperException.NodeExistsException e) {
                 assert !createMode.isSequential() : createMode;
 
-                log.info("Node already exists: " + path);
+                if (log.isDebugEnabled())
+                    log.debug("Node already exists: " + path);
 
                 return path;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/953f0792/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 9c1e398..effecbb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -127,18 +127,6 @@ public class ZookeeperDiscoveryImpl {
     private final GridFutureAdapter<Void> joinFut = new GridFutureAdapter<>();
 
     /** */
-    private final AliveNodeDataWatcher aliveNodeDataWatcher = new AliveNodeDataWatcher();
-
-    /** */
-    private final ZkWatcher watcher;
-
-    /** */
-    private final ZKChildrenCallback childrenCallback;
-
-    /** */
-    private final ZkDataCallback dataCallback;
-
-    /** */
     private final int evtsAckThreshold;
 
     /** */
@@ -197,10 +185,6 @@ public class ZookeeperDiscoveryImpl {
         this.exchange = exchange;
         this.clientReconnectEnabled = locNode.isClient() && !spi.isClientReconnectDisabled();
 
-        watcher = new ZkWatcher();
-        childrenCallback = new ZKChildrenCallback();
-        dataCallback = new ZkDataCallback();
-
         int evtsAckThreshold = IgniteSystemProperties.getInteger(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD,
5);
 
         if (evtsAckThreshold <= 0)
@@ -342,7 +326,7 @@ public class ZookeeperDiscoveryImpl {
                 return;
         }
 
-        rtState.zkClient.onCloseStart();
+        rtState.onCloseStart();
 
         busyLock.block();
 
@@ -764,6 +748,8 @@ public class ZookeeperDiscoveryImpl {
 
             final ZkRuntimeState rtState = this.rtState;
 
+            rtState.init(new ZkWatcher(rtState), new AliveNodeDataWatcher(rtState));
+
             ZookeeperClient zkClient = rtState.zkClient;
 
             final int OVERHEAD = 5;
@@ -818,9 +804,9 @@ public class ZookeeperDiscoveryImpl {
 
             spi.getSpiContext().addTimeoutObject(rtState.joinTimeoutObj);
 
-            zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback());
+            zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback(rtState));
 
-            zkClient.getDataAsync(zkPaths.evtsPath, watcher, dataCallback);
+            zkClient.getDataAsync(zkPaths.evtsPath, rtState.watcher, rtState.watcher);
         }
         catch (IgniteCheckedException | ZookeeperClientFailedException e) {
             throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e);
@@ -968,7 +954,7 @@ public class ZookeeperDiscoveryImpl {
                 "locId=" + locNode.id() +
                 ", prevPath=" + prevE.getValue() + ']');
 
-            PreviousNodeWatcher watcher = new PreviousNodeWatcher();
+            PreviousNodeWatcher watcher = new PreviousNodeWatcher(rtState);
 
             rtState.zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + prevE.getValue(),
watcher, watcher);
         }
@@ -988,7 +974,7 @@ public class ZookeeperDiscoveryImpl {
         if (log.isInfoEnabled())
             log.info("Previous node failed, check is node new coordinator [locId=" + locNode.id()
+ ']');
 
-        rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback());
+        rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback(rtState));
     }
 
     /**
@@ -1023,8 +1009,8 @@ public class ZookeeperDiscoveryImpl {
             newClusterStarted(locInternalId);
         }
 
-        rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, watcher, childrenCallback);
-        rtState.zkClient.getChildrenAsync(zkPaths.customEvtsDir, watcher, childrenCallback);
+        rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, rtState.watcher, rtState.watcher);
+        rtState.zkClient.getChildrenAsync(zkPaths.customEvtsDir, rtState.watcher, rtState.watcher);
 
         for (String alivePath : aliveNodes)
             watchAliveNodeData(alivePath);
@@ -1039,7 +1025,7 @@ public class ZookeeperDiscoveryImpl {
         String path = zkPaths.aliveNodesDir + "/" + alivePath;
 
         if (!path.equals(rtState.locNodeZkPath))
-            rtState.zkClient.getDataAsync(path, aliveNodeDataWatcher, aliveNodeDataWatcher);
+            rtState.zkClient.getDataAsync(path, rtState.aliveNodeDataWatcher, rtState.aliveNodeDataWatcher);
     }
 
     /**
@@ -1940,7 +1926,7 @@ public class ZookeeperDiscoveryImpl {
     public void simulateNodeFailure() {
         zkClient().deleteIfExistsAsync(zkPaths.aliveNodesDir);
 
-        zkClient().onCloseStart();
+        rtState.onCloseStart();
 
         zkClient().close();
     }
@@ -2014,7 +2000,7 @@ public class ZookeeperDiscoveryImpl {
         if (failedNode.isLocal()) {
             U.warn(log, "Received EVT_NODE_FAILED for local node.");
 
-            zkClient().onCloseStart();
+            rtState.onCloseStart();
 
             if (locNode.isClient() && clientReconnectEnabled) {
                 boolean reconnect = false;
@@ -2315,8 +2301,7 @@ public class ZookeeperDiscoveryImpl {
 
         ZookeeperClient zkClient = rtState.zkClient;
 
-        if (zkClient != null)
-            zkClient.onCloseStart();
+        rtState.onCloseStart();
 
         busyLock.block();
 
@@ -2410,6 +2395,8 @@ public class ZookeeperDiscoveryImpl {
 
         /** {@inheritDoc} */
         @Override public void run() {
+            rtState.closing = true;
+
             busyLock.block();
 
             busyLock.unblock();
@@ -2455,45 +2442,136 @@ public class ZookeeperDiscoveryImpl {
     /**
      *
      */
-    private class ZkWatcher implements Watcher {
+    abstract class ZkCallabck {
+        /** */
+        final ZkRuntimeState rtState;
+
+        /**
+         * @param rtState Runtime state.
+         */
+        ZkCallabck(ZkRuntimeState rtState) {
+            this.rtState = rtState;
+        }
+
+        /**
+         * @return {@code True} if is able to start processing.
+         */
+        final boolean onProcessStart() {
+            return !rtState.closing && busyLock.enterBusy();
+        }
+
+        /**
+         *
+         */
+        final void onProcessEnd() {
+            busyLock.leaveBusy();
+        }
+
+        /**
+         * @param e Error.
+         */
+        final void onProcessError(Throwable e) {
+            onFatalError(busyLock, e);
+        }
+    }
+
+    /**
+     *
+     */
+    abstract class AbstractWatcher extends ZkCallabck implements Watcher {
+        /**
+         * @param rtState Runtime state.
+         */
+        AbstractWatcher(ZkRuntimeState rtState) {
+            super(rtState);
+        }
+
         /** {@inheritDoc} */
-        @Override public void process(WatchedEvent evt) {
-            if (!busyLock.enterBusy())
+        @Override public final void process(WatchedEvent evt) {
+            if (!onProcessStart())
                 return;
 
             try {
-                if (evt.getType() == Event.EventType.NodeDataChanged) {
-                    if (evt.getPath().equals(zkPaths.evtsPath)) {
-                        if (!rtState.crd)
-                            rtState.zkClient.getDataAsync(evt.getPath(), this, dataCallback);
-                    }
-                    else
-                        U.warn(log, "Received NodeDataChanged for unexpected path: " + evt.getPath());
-                }
-                else if (evt.getType() == Event.EventType.NodeChildrenChanged) {
-                    if (evt.getPath().equals(zkPaths.aliveNodesDir))
-                        rtState.zkClient.getChildrenAsync(evt.getPath(), this, childrenCallback);
-                    else if (evt.getPath().equals(zkPaths.customEvtsDir))
-                        rtState.zkClient.getChildrenAsync(evt.getPath(), this, childrenCallback);
-                    else
-                        U.warn(log, "Received NodeChildrenChanged for unexpected path: "
+ evt.getPath());
-                }
+                process0(evt);
 
-                busyLock.leaveBusy();
+                onProcessEnd();
             }
             catch (Throwable e) {
-                onFatalError(busyLock, e);
+                onProcessError(e);
             }
         }
+
+        /**
+         * @param evt Event.
+         * @throws Exception If failed.
+         */
+        protected abstract void process0(WatchedEvent evt) throws Exception;
     }
 
     /**
      *
      */
-    private class ZKChildrenCallback implements AsyncCallback.Children2Callback {
+    abstract class AbstractChildrenCallback extends ZkCallabck implements AsyncCallback.Children2Callback
{
+        /**
+         * @param rtState Runtime state.
+         */
+        AbstractChildrenCallback(ZkRuntimeState rtState) {
+            super(rtState);
+        }
+
         /** {@inheritDoc} */
         @Override public void processResult(int rc, String path, Object ctx, List<String>
children, Stat stat) {
-            if (!busyLock.enterBusy())
+            if (!onProcessStart())
+                return;
+
+            try {
+                processResult0(rc, path, ctx, children, stat);
+
+                onProcessEnd();
+            }
+            catch (Throwable e) {
+                onProcessError(e);
+            }
+        }
+
+        abstract void processResult0(int rc, String path, Object ctx, List<String>
children, Stat stat)
+            throws Exception;
+    }
+
+    /**
+     *
+     */
+    private class ZkWatcher extends AbstractWatcher implements ZkRuntimeState.ZkWatcher {
+        /**
+         * @param rtState Runtime state.
+         */
+        ZkWatcher(ZkRuntimeState rtState) {
+            super(rtState);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void process0(WatchedEvent evt) {
+            if (evt.getType() == Event.EventType.NodeDataChanged) {
+                if (evt.getPath().equals(zkPaths.evtsPath)) {
+                    if (!rtState.crd)
+                        rtState.zkClient.getDataAsync(evt.getPath(), this, this);
+                }
+                else
+                    U.warn(log, "Received NodeDataChanged for unexpected path: " + evt.getPath());
+            }
+            else if (evt.getType() == Event.EventType.NodeChildrenChanged) {
+                if (evt.getPath().equals(zkPaths.aliveNodesDir))
+                    rtState.zkClient.getChildrenAsync(evt.getPath(), this, this);
+                else if (evt.getPath().equals(zkPaths.customEvtsDir))
+                    rtState.zkClient.getChildrenAsync(evt.getPath(), this, this);
+                else
+                    U.warn(log, "Received NodeChildrenChanged for unexpected path: " + evt.getPath());
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx, List<String>
children, Stat stat) {
+            if (!onProcessStart())
                 return;
 
             try {
@@ -2506,21 +2584,16 @@ public class ZookeeperDiscoveryImpl {
                 else
                     U.warn(log, "Children callback for unexpected path: " + path);
 
-                busyLock.leaveBusy();
+                onProcessEnd();
             }
             catch (Throwable e) {
-                onFatalError(busyLock, e);
+                onProcessError(e);
             }
         }
-    }
 
-    /**
-     *
-     */
-    private class ZkDataCallback implements AsyncCallback.DataCallback {
         /** {@inheritDoc} */
         @Override public void processResult(int rc, String path, Object ctx, byte[] data,
Stat stat) {
-            if (!busyLock.enterBusy())
+            if (!onProcessStart())
                 return;
 
             try {
@@ -2533,10 +2606,10 @@ public class ZookeeperDiscoveryImpl {
                 else
                     U.warn(log, "Data callback for unknown path: " + path);
 
-                busyLock.leaveBusy();
+                onProcessEnd();
             }
             catch (Throwable e) {
-                onFatalError(busyLock, e);
+                onProcessError(e);
             }
         }
     }
@@ -2544,26 +2617,23 @@ public class ZookeeperDiscoveryImpl {
     /**
      *
      */
-    private class AliveNodeDataWatcher implements Watcher, AsyncCallback.DataCallback {
-        /** {@inheritDoc} */
-        @Override public void process(WatchedEvent evt) {
-            if (!busyLock.enterBusy())
-                return;
-
-            try {
-                if (evt.getType() == Event.EventType.NodeDataChanged)
-                    rtState.zkClient.getDataAsync(evt.getPath(), this, this);
+    private class AliveNodeDataWatcher extends AbstractWatcher implements ZkRuntimeState.ZkAliveNodeDataWatcher
{
+        /**
+         * @param rtState Runtime state.
+         */
+        AliveNodeDataWatcher(ZkRuntimeState rtState) {
+            super(rtState);
+        }
 
-                busyLock.leaveBusy();
-            }
-            catch (Throwable e) {
-                onFatalError(busyLock, e);
-            }
+        /** {@inheritDoc} */
+        @Override public void process0(WatchedEvent evt) {
+            if (evt.getType() == Event.EventType.NodeDataChanged)
+                rtState.zkClient.getDataAsync(evt.getPath(), this, this);
         }
 
         /** {@inheritDoc} */
         @Override public void processResult(int rc, String path, Object ctx, byte[] data,
Stat stat) {
-            if (!busyLock.enterBusy())
+            if (!onProcessStart())
                 return;
 
             try {
@@ -2571,10 +2641,10 @@ public class ZookeeperDiscoveryImpl {
 
                 processResult0(rc, path, data);
 
-                busyLock.leaveBusy();
+                onProcessEnd();
             }
             catch (Throwable e) {
-                onFatalError(busyLock, e);
+                onProcessError(e);
             }
         }
 
@@ -2619,30 +2689,27 @@ public class ZookeeperDiscoveryImpl {
     /**
      *
      */
-    private class PreviousNodeWatcher implements Watcher, AsyncCallback.StatCallback {
-        /** {@inheritDoc} */
-        @Override public void process(WatchedEvent evt) {
-            if (!busyLock.enterBusy())
-                return;
-
-            try {
-                if (evt.getType() == Event.EventType.NodeDeleted)
-                    onPreviousNodeFail();
-                else {
-                    if (evt.getType() != Event.EventType.None)
-                        rtState.zkClient.existsAsync(evt.getPath(), this, this);
-                }
+    private class PreviousNodeWatcher extends AbstractWatcher implements AsyncCallback.StatCallback
{
+        /**
+         * @param rtState Runtime state.
+         */
+        PreviousNodeWatcher(ZkRuntimeState rtState) {
+            super(rtState);
+        }
 
-                busyLock.leaveBusy();
-            }
-            catch (Throwable e) {
-                onFatalError(busyLock, e);
+        /** {@inheritDoc} */
+        @Override public void process0(WatchedEvent evt) {
+            if (evt.getType() == Event.EventType.NodeDeleted)
+                onPreviousNodeFail();
+            else {
+                if (evt.getType() != Event.EventType.None)
+                    rtState.zkClient.existsAsync(evt.getPath(), this, this);
             }
         }
 
         /** {@inheritDoc} */
         @Override public void processResult(int rc, String path, Object ctx, Stat stat) {
-            if (!busyLock.enterBusy())
+            if (!onProcessStart())
                 return;
 
             try {
@@ -2651,10 +2718,10 @@ public class ZookeeperDiscoveryImpl {
                 if (rc == KeeperException.Code.NONODE.intValue() || stat == null)
                     onPreviousNodeFail();
 
-                busyLock.leaveBusy();
+                onProcessEnd();
             }
             catch (Throwable e) {
-                onFatalError(busyLock, e);
+                onProcessError(e);
             }
         }
     }
@@ -2662,22 +2729,19 @@ public class ZookeeperDiscoveryImpl {
     /**
      *
      */
-    class CheckCoordinatorCallback implements  AsyncCallback.Children2Callback {
-        /** {@inheritDoc} */
-        @Override public void processResult(int rc, String path, Object ctx, List<String>
children, Stat stat) {
-            if (!busyLock.enterBusy())
-                return;
-
-            try {
-                assert rc == 0 : KeeperException.Code.get(rc);
+    class CheckCoordinatorCallback extends AbstractChildrenCallback {
+        /**
+         * @param rtState Runtime state.
+         */
+        CheckCoordinatorCallback(ZkRuntimeState rtState) {
+            super(rtState);
+        }
 
-                checkIsCoordinator(rc, children);
+        /** {@inheritDoc} */
+        @Override public void processResult0(int rc, String path, Object ctx, List<String>
children, Stat stat) throws Exception {
+            assert rc == 0 : KeeperException.Code.get(rc);
 
-                busyLock.leaveBusy();
-            }
-            catch (Throwable e) {
-                onFatalError(busyLock, e);
-            }
+            checkIsCoordinator(rc, children);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/953f0792/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
index a0be40e..7785a3c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
@@ -178,7 +178,7 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest
{
         log.info("All clients started.");
 
         try {
-            checkNodes(SRVS + CLIENTS);
+            checkNodes0(SRVS + CLIENTS);
         }
         finally {
             for (Ignite client : clients)
@@ -188,6 +188,30 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest
{
 
     /**
      * @param expCnt Expected number of nodes.
+     * @throws Exception If failed.
+     */
+    private void checkNodes0(final int expCnt) throws Exception {
+        boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                try {
+                    checkNodes(expCnt);
+
+                    return true;
+                }
+                catch (AssertionFailedError e) {
+                    log.info("Check failed, will retry: " + e);
+                }
+
+                return false;
+            }
+        }, 10_000);
+
+        if (!wait)
+            checkNodes(expCnt);
+    }
+
+    /**
+     * @param expCnt Expected number of nodes.
      */
     private void checkNodes(int expCnt) {
         assertEquals(expCnt, G.allGrids().size());
@@ -297,23 +321,7 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest
{
             if (err0 != null)
                 throw err0;
 
-            boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
-                @Override public boolean apply() {
-                    try {
-                        checkNodes(SRVS + THREADS);
-
-                        return true;
-                    }
-                    catch (AssertionFailedError e) {
-                        log.info("Check failed, will retry: " + e);
-                    }
-
-                    return false;
-                }
-            }, 10_000);
-
-            if (!wait)
-                checkNodes(SRVS + THREADS);
+            checkNodes0(SRVS + THREADS);
 
             log.info("Stop clients.");
 


Mime
View raw message