ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [13/13] ignite git commit: zk
Date Mon, 27 Nov 2017 13:28:10 GMT
zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1ccbac03
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1ccbac03
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1ccbac03

Branch: refs/heads/ignite-zk
Commit: 1ccbac03cfa94123adc9be7270a76311415b7389
Parents: 96aa846
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Nov 27 11:35:13 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Nov 27 16:27:45 2017 +0300

----------------------------------------------------------------------
 .../processors/cluster/ClusterNodeMetrics.java  |   7 +-
 .../processors/cluster/ClusterProcessor.java    |   9 +-
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java |  23 +-
 .../discovery/zk/internal/ZkClusterNodes.java   |   8 +
 .../discovery/zk/internal/ZkIgnitePaths.java    |  14 +-
 .../discovery/zk/internal/ZkRuntimeState.java   |  60 ++
 .../discovery/zk/internal/ZookeeperClient.java  |  50 +-
 .../ZookeeperClientFailedException.java         |   7 +
 .../zk/internal/ZookeeperClusterNode.java       |  15 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 999 ++++++++++++-------
 .../internal/ClusterNodeMetricsUpdateTest.java  |  98 +-
 .../ZookeeperDiscoverySpiBasicTest.java         | 143 ++-
 .../testframework/junits/GridAbstractTest.java  |   8 +
 .../testsuites/IgniteComputeGridTestSuite.java  |   2 +
 14 files changed, 1041 insertions(+), 402 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java
index 4a7dd77..75a83a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java
@@ -22,13 +22,14 @@ import java.util.Collections;
 import java.util.Map;
 import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cluster.ClusterMetrics;
+import org.apache.ignite.internal.ClusterMetricsSnapshot;
 
 /**
  *
  */
 class ClusterNodeMetrics implements Serializable {
     /** */
-    private final ClusterMetrics metrics;
+    private final byte[] metrics;
 
     /** */
     private final Map<Integer, CacheMetrics> cacheMetrics;
@@ -38,14 +39,14 @@ class ClusterNodeMetrics implements Serializable {
      * @param cacheMetrics Cache metrics.
      */
     ClusterNodeMetrics(ClusterMetrics metrics, Map<Integer, CacheMetrics> cacheMetrics) {
-        this.metrics = metrics;
+        this.metrics = ClusterMetricsSnapshot.serialize(metrics);
         this.cacheMetrics = cacheMetrics;
     }
 
     /**
      * @return Metrics.
      */
-    ClusterMetrics metrics() {
+    byte[] metrics() {
         return metrics;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index 8812161..36c4c0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@ -33,6 +33,7 @@ import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.ClusterMetricsSnapshot;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteDiagnosticInfo;
@@ -387,6 +388,7 @@ public class ClusterProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param sndNodeId Sender node ID.
      * @param msg Message.
      */
     private void processMetricsUpdateMessage(UUID sndNodeId, ClusterMetricsUpdateMessage msg) {
@@ -413,6 +415,11 @@ public class ClusterProcessor extends GridProcessorAdapter {
         }
     }
 
+    /**
+     * @param discoCache Discovery data cache.
+     * @param nodeId Node ID.
+     * @param metricsBytes Marshalled metrics.
+     */
     private void updateNodeMetrics(DiscoCache discoCache, UUID nodeId, byte[] metricsBytes) {
         ClusterNode node = discoCache.node(nodeId);
 
@@ -426,7 +433,7 @@ public class ClusterProcessor extends GridProcessorAdapter {
 
             IgniteClusterNode node0 = (IgniteClusterNode)node;
 
-            node0.setMetrics(metrics.metrics());
+            node0.setMetrics(ClusterMetricsSnapshot.deserialize(metrics.metrics(), 0));
             node0.setCacheMetrics(metrics.cacheMetrics());
 
             ctx.discovery().metricsUpdateEvent(discoCache, node0);

http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index 3c3ffa7..e4450d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -283,15 +283,20 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
             ", basePath=" + basePath +
             ", clusterName=" + clusterName + ']');
 
-        impl = new ZookeeperDiscoveryImpl(log,
+        impl = new ZookeeperDiscoveryImpl(
+            igniteInstanceName,
+            zkConnectionString,
+            sesTimeout,
+            log,
             basePath,
             clusterName,
             locNode,
             lsnr,
-            exchange);
+            exchange,
+            locNode.isClient() && !clientReconnectDisabled);
 
         try {
-            impl.joinTopology(igniteInstanceName, zkConnectionString, sesTimeout);
+            impl.joinTopology();
         }
         catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -302,8 +307,16 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
 
     /** {@inheritDoc} */
     @Override public void spiStop() throws IgniteSpiException {
-        if (impl != null)
-            impl.stop();
+        if (impl != null) {
+            try {
+                impl.stop();
+            }
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+
+                throw new IgniteSpiException(e);
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
index 7ca1360..4c114a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
@@ -53,6 +53,14 @@ public class ZkClusterNodes {
     }
 
     /**
+     * @return Current nodes in topology.
+     */
+    @SuppressWarnings("unchecked")
+    List<ClusterNode> topologySnapshot() {
+        return new ArrayList<>((Collection)nodesByOrder.values());
+    }
+
+    /**
      * @param node New node.
      */
     void addNode(ZookeeperClusterNode node) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index a98ea8d..cd2ff0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -175,6 +175,16 @@ class ZkIgnitePaths {
         return basePath + "/" + clusterName + "/" + path;
     }
 
+    String joiningNodeDataPath(UUID nodeId, String aliveNodePath) {
+        int joinSeq = ZkIgnitePaths.aliveJoinDataSequence(aliveNodePath);
+
+        return joinDataDir + '/' +
+            ZkIgnitePaths.aliveNodePrefixId(aliveNodePath) + ":" +
+            nodeId.toString() +
+            "|" +
+            String.format("%010d", joinSeq);
+    }
+
     /**
      * @param path Alive node zk path.
      * @return Node internal ID.
@@ -210,9 +220,9 @@ class ZkIgnitePaths {
      * @param path Alive node zk path.
      * @return Joined node sequence.
      */
-    static int aliveJoinDataSequence(String path) {
-        int idx1 = path.indexOf('|');
+    private static int aliveJoinDataSequence(String path) {
         int idx2 = path.lastIndexOf('|');
+        int idx1 = path.lastIndexOf('|', idx2 - 1);
 
         return Integer.parseInt(path.substring(idx1 + 1, idx2));
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
new file mode 100644
index 0000000..d2d0372
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+/**
+ *
+ */
+class ZkRuntimeState {
+    /** */
+    final boolean prevJoined;
+
+    /** */
+    ZookeeperClient zkClient;
+
+    /** */
+    long gridStartTime;
+
+    /** */
+    boolean joined;
+
+    /** */
+    ZkDiscoveryEventsData evtsData;
+
+    /** */
+    boolean crd;
+
+    /** */
+    String locNodeZkPath;
+
+    /** */
+    ZkAliveNodeData locNodeInfo = new ZkAliveNodeData();
+
+    /** */
+    int procEvtCnt;
+
+    /** */
+    final ZkClusterNodes top = new ZkClusterNodes();
+
+    /**
+     * @param prevJoined {@code True} if joined topology before reconnect attempt.
+     */
+    ZkRuntimeState(boolean prevJoined) {
+        this.prevJoined = prevJoined;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index fb6a697..0f39da6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -77,6 +77,9 @@ public class ZookeeperClient implements Watcher {
     /** */
     private final ArrayDeque<ZkAsyncOperation> retryQ = new ArrayDeque<>();
 
+    /** */
+    private volatile boolean closing;
+
     /**
      * @param log Logger.
      * @param connectString ZK connection string.
@@ -122,13 +125,16 @@ public class ZookeeperClient implements Watcher {
             Thread.currentThread().setName(threadName);
         }
 
-        connTimer = new Timer("zk-timer-" + igniteInstanceName);
+        connTimer = new Timer("zk-client-timer-" + igniteInstanceName);
 
         scheduleConnectionCheck();
     }
 
     /** {@inheritDoc} */
     @Override public void process(WatchedEvent evt) {
+        if (closing)
+            return;
+
         if (evt.getType() == Event.EventType.None) {
             ConnectionState newState;
 
@@ -179,18 +185,18 @@ public class ZookeeperClient implements Watcher {
                     }
                     else if (newState == ConnectionState.Connected)
                         stateMux.notifyAll();
-                    else {
+                    else
                         assert state == ConnectionState.Lost : state;
-
-                        closeClient();
-                    }
                 }
                 else
                     return;
             }
 
-            if (newState == ConnectionState.Lost)
+            if (newState == ConnectionState.Lost) {
+                closeClient();
+
                 notifyConnectionLost();
+            }
             else if (newState == ConnectionState.Connected) {
                 for (ZkAsyncOperation op : retryQ)
                     op.execute();
@@ -557,6 +563,17 @@ public class ZookeeperClient implements Watcher {
     /**
      *
      */
+    void onCloseStart() {
+        closing = true;
+
+        synchronized (stateMux) {
+            stateMux.notifyAll();
+        }
+    }
+
+    /**
+     *
+     */
     public void close() {
         closeClient();
     }
@@ -573,6 +590,9 @@ public class ZookeeperClient implements Watcher {
         ZookeeperClientFailedException err = null;
 
         synchronized (stateMux) {
+            if (closing)
+                throw new ZookeeperClientFailedException("Zookeeper client is closed.");
+
             U.warn(log, "Failed to execute zookeeper operation [err=" + e + ", state=" + state + ']');
 
             if (zk.getState() == ZooKeeper.States.CLOSED)
@@ -609,8 +629,6 @@ public class ZookeeperClient implements Watcher {
                         U.warn(log, "Failed to establish zookeeper connection, close client " +
                             "[timeout=" + connLossTimeout + ']');
 
-                        closeClient();
-
                         err = new ZookeeperClientFailedException(e);
                     }
                 }
@@ -623,20 +641,23 @@ public class ZookeeperClient implements Watcher {
                         ", remainingWaitTime=" + remainingTime + ']');
 
                     stateMux.wait(RETRY_TIMEOUT);
+
+                    if (closing)
+                        throw new ZookeeperClientFailedException("Zookeeper client is closed.");
                 }
             }
             else {
                 U.error(log, "Operation failed with unexpected error, close client: " + e, e);
 
-                closeClient();
-
                 state = ConnectionState.Lost;
 
-                throw new ZookeeperClientFailedException(e);
+                err = new ZookeeperClientFailedException(e);
             }
         }
 
         if (err != null) {
+            closeClient();
+
             notifyConnectionLost();
 
             throw err;
@@ -955,13 +976,14 @@ public class ZookeeperClient implements Watcher {
                         "[timeout=" + connLossTimeout + ']');
 
                     connLoss = true;
-
-                    closeClient();
                 }
             }
 
-            if (connLoss)
+            if (connLoss) {
+                closeClient();
+
                 notifyConnectionLost();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java
index a61f478..01d011b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java
@@ -25,6 +25,13 @@ class ZookeeperClientFailedException extends Exception {
     private static final long serialVersionUID = 0L;
 
     /**
+     * @param msg Message.
+     */
+    ZookeeperClientFailedException(String msg) {
+        super(msg);
+    }
+
+    /**
      * @param cause Cause.
      */
     ZookeeperClientFailedException(Throwable cause) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
index 855d7cc..aa90c67 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
@@ -168,9 +168,6 @@ public class ZookeeperClusterNode implements IgniteClusterNode, Serializable {
             return metrics0;
         }
 
-        if (metrics == null)
-            System.out.println();
-
         return metrics;
     }
 
@@ -227,7 +224,7 @@ public class ZookeeperClusterNode implements IgniteClusterNode, Serializable {
     /**
      * @return Internal ID corresponds to Zookeeper sequential node.
      */
-    public int internalId() {
+    int internalId() {
         return internalId;
     }
 
@@ -238,12 +235,22 @@ public class ZookeeperClusterNode implements IgniteClusterNode, Serializable {
         this.internalId = internalId;
     }
 
+    /**
+     * @param order Node order.
+     */
     void order(long order) {
         assert order > 0 : order;
 
         this.order = order;
     }
 
+    /**
+     * @param newId New node ID.
+     */
+    public void onClientDisconnected(UUID newId) {
+        id = newId;
+    }
+
     /** {@inheritDoc} */
     @Override public IgniteProductVersion version() {
         return ver;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index c7b9224..a08e879 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -27,8 +27,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
@@ -39,12 +42,14 @@ import org.apache.ignite.internal.ClusterMetricsSnapshot;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiThread;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
@@ -57,6 +62,8 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
 import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL;
 import static org.apache.zookeeper.CreateMode.PERSISTENT;
 
@@ -68,6 +75,15 @@ public class ZookeeperDiscoveryImpl {
     static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD = "IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD";
 
     /** */
+    private final String igniteInstanceName;
+
+    /** */
+    private final String connectString;
+
+    /** */
+    private final int sesTimeout;
+
+    /** */
     private final JdkMarshaller marsh = new JdkMarshaller();
 
     /** */
@@ -77,6 +93,9 @@ public class ZookeeperDiscoveryImpl {
     private final IgniteLogger log;
 
     /** */
+    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+    /** */
     private final ZookeeperClusterNode locNode;
 
     /** */
@@ -86,7 +105,7 @@ public class ZookeeperDiscoveryImpl {
     private final DiscoverySpiDataExchange exchange;
 
     /** */
-    private ZookeeperClient zkClient;
+    private final boolean clientReconnectEnabled;
 
     /** */
     private final GridFutureAdapter<Void> joinFut = new GridFutureAdapter<>();
@@ -104,31 +123,16 @@ public class ZookeeperDiscoveryImpl {
     private final ZkDataCallback dataCallback;
 
     /** */
-    private final ZkClusterNodes top = new ZkClusterNodes();
-
-    /** */
-    private long gridStartTime;
-
-    /** */
-    private boolean joined;
-
-    /** */
-    private ZkDiscoveryEventsData evtsData;
-
-    /** */
-    private boolean crd;
+    private final int evtsAckThreshold;
 
     /** */
-    private String locNodeZkPath;
+    private ZkRuntimeState state = new ZkRuntimeState(false);
 
     /** */
-    private ZkAliveNodeData locNodeInfo = new ZkAliveNodeData();
+    private volatile ConnectionState connState = ConnectionState.STARTED;
 
     /** */
-    private final int evtsAckThreshold;
-
-    /** */
-    private int procEvtCnt;
+    private ZkEventWorker evtWorker;
 
     /**
      * @param log Logger.
@@ -138,12 +142,17 @@ public class ZookeeperDiscoveryImpl {
      * @param lsnr Discovery events listener.
      * @param exchange Discovery data exchange.
      */
-    public ZookeeperDiscoveryImpl(IgniteLogger log,
+    public ZookeeperDiscoveryImpl(
+        String igniteInstanceName,
+        String connectString,
+        int sesTimeout,
+        IgniteLogger log,
         String basePath,
         String clusterName,
         ZookeeperClusterNode locNode,
         DiscoverySpiListener lsnr,
-        DiscoverySpiDataExchange exchange) {
+        DiscoverySpiDataExchange exchange,
+        boolean clientReconnectEnabled) {
         assert locNode.id() != null && locNode.isLocal() : locNode;
 
         if (F.isEmpty(clusterName))
@@ -153,10 +162,14 @@ public class ZookeeperDiscoveryImpl {
 
         zkPaths = new ZkIgnitePaths(basePath, clusterName);
 
+        this.igniteInstanceName = igniteInstanceName;
+        this.connectString = connectString;
+        this.sesTimeout = sesTimeout;
         this.log = log.getLogger(getClass());
         this.locNode = locNode;
         this.lsnr = lsnr;
         this.exchange = exchange;
+        this.clientReconnectEnabled = clientReconnectEnabled;
 
         watcher = new ZkWatcher();
         childrenCallback = new ZKChildrenCallback();
@@ -191,7 +204,7 @@ public class ZookeeperDiscoveryImpl {
     @Nullable public ClusterNode node(UUID nodeId) {
         assert nodeId != null;
 
-        return top.nodesById.get(nodeId);
+        return state.top.nodesById.get(nodeId);
     }
 
     /**
@@ -207,7 +220,25 @@ public class ZookeeperDiscoveryImpl {
      * @return Remote nodes.
      */
     public Collection<ClusterNode> remoteNodes() {
-        return top.remoteNodes();
+        checkState();
+
+        return state.top.remoteNodes();
+    }
+
+    /**
+     *
+     */
+    private void checkState() {
+        switch (connState) {
+            case STARTED:
+                break;
+
+            case STOPPED:
+                throw new IgniteSpiException("Zookeeper client closed.");
+
+            case DISCONNECTED:
+                throw new IgniteClientDisconnectedException(null, "Client is disconnected.");
+        }
     }
 
     /**
@@ -215,8 +246,16 @@ public class ZookeeperDiscoveryImpl {
      * @return {@code True} if node joined or joining topology.
      */
     public boolean knownNode(UUID nodeId) {
+        checkState();
+
+        if (!busyLock.enterBusy()) {
+            checkState();
+
+            throw new IgniteSpiException("Zookeeper client closed.");
+        }
+
         try {
-            List<String> children = zkClient.getChildren(zkPaths.aliveNodesDir);
+            List<String> children = state.zkClient.getChildren(zkPaths.aliveNodesDir);
 
             for (int i = 0; i < children.size(); i++) {
                 UUID id = ZkIgnitePaths.aliveNodeId(children.get(i));
@@ -235,12 +274,17 @@ public class ZookeeperDiscoveryImpl {
 
             throw new IgniteInterruptedException(e);
         }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
      * @param msg Message.
      */
     public void sendCustomMessage(DiscoverySpiCustomMessage msg) {
+        checkState();
+
         assert msg != null;
 
         byte[] msgBytes;
@@ -252,10 +296,16 @@ public class ZookeeperDiscoveryImpl {
             throw new IgniteSpiException("Failed to marshal custom message: " + msg, e);
         }
 
+        if (!busyLock.enterBusy()) {
+            checkState();
+
+            throw new IgniteSpiException("Zookeeper client closed.");
+        }
+
         try {
             String prefix = UUID.randomUUID().toString();
 
-            zkClient.createSequential(prefix,
+            state.zkClient.createSequential(prefix,
                 zkPaths.customEvtsDir,
                 prefix + ":" + locNode.id() + '|',
                 msgBytes,
@@ -269,24 +319,43 @@ public class ZookeeperDiscoveryImpl {
 
             throw new IgniteInterruptedException(e);
         }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
      * @return Cluster start time.
      */
     public long gridStartTime() {
-        return gridStartTime;
+        return state.gridStartTime;
     }
 
     /**
-     * @param igniteInstanceName Ignite instance name.
-     * @param connectString Zookeeper connect string.
-     * @param sesTimeout Zookeeper session timeout.
      * @throws InterruptedException If interrupted.
      */
-    public void joinTopology(String igniteInstanceName, String connectString, int sesTimeout)
-        throws InterruptedException
-    {
+    public void joinTopology() throws InterruptedException {
+        joinTopology0(false);
+
+        for (;;) {
+            try {
+                joinFut.get(10_000);
+
+                break;
+            }
+            catch (IgniteFutureTimeoutCheckedException e) {
+                U.warn(log, "Waiting for local join event [nodeId=" + locNode.id() + ", name=" + igniteInstanceName + ']');
+            }
+            catch (Exception e) {
+                throw new IgniteSpiException("Failed to join cluster", e);
+            }
+        }
+    }
+
+    /**
+     * @throws InterruptedException If interrupted.
+     */
+    private void joinTopology0(boolean reconnect) throws InterruptedException {
         DiscoveryDataBag discoDataBag = new DiscoveryDataBag(locNode.id());
 
         exchange.collect(discoDataBag);
@@ -303,7 +372,7 @@ public class ZookeeperDiscoveryImpl {
         }
 
         try {
-            zkClient = new ZookeeperClient(igniteInstanceName,
+            state.zkClient = new ZookeeperClient(igniteInstanceName,
                 log,
                 connectString,
                 sesTimeout,
@@ -313,23 +382,15 @@ public class ZookeeperDiscoveryImpl {
             throw new IgniteSpiException("Failed to create Zookeeper client", e);
         }
 
-        initZkNodes();
-
-        startJoin(joinDataBytes);
-
-        for (;;) {
-            try {
-                joinFut.get(10_000);
+        if (!reconnect) {
+            evtWorker = new ZkEventWorker(igniteInstanceName, "zookeeper-disco-evt-worker", log);
 
-                break;
-            }
-            catch (IgniteFutureTimeoutCheckedException e) {
-                U.warn(log, "Waiting for local join event [nodeId=" + locNode.id() + ", name=" + igniteInstanceName + ']');
-            }
-            catch (Exception e) {
-                throw new IgniteSpiException("Failed to join cluster", e);
-            }
+            evtWorker.start();
         }
+        else
+            assert evtWorker != null;
+
+        startJoin(joinDataBytes);
     }
 
     /**
@@ -337,7 +398,7 @@ public class ZookeeperDiscoveryImpl {
      */
     private void initZkNodes() throws InterruptedException {
         try {
-            if (zkClient.exists(zkPaths.aliveNodesDir))
+            if (state.zkClient.exists(zkPaths.aliveNodesDir))
                 return; // This path is created last, assume all others dirs are created.
 
             List<String> dirs = new ArrayList<>();
@@ -351,14 +412,14 @@ public class ZookeeperDiscoveryImpl {
             dirs.add(zkPaths.aliveNodesDir);
 
             try {
-                zkClient.createAll(dirs, PERSISTENT);
+                state.zkClient.createAll(dirs, PERSISTENT);
             }
             catch (KeeperException.NodeExistsException e) {
                 if (log.isDebugEnabled())
                     log.debug("Failed to create nodes using bulk operation: " + e);
 
                 for (String dir : dirs)
-                    zkClient.createIfNeeded(dir, null, PERSISTENT);
+                    state.zkClient.createIfNeeded(dir, null, PERSISTENT);
             }
         }
         catch (ZookeeperClientFailedException e) {
@@ -371,12 +432,17 @@ public class ZookeeperDiscoveryImpl {
      * @throws InterruptedException If interrupted.
      */
     private void startJoin(byte[] joinDataBytes) throws InterruptedException {
+        if (!busyLock.enterBusy())
+            return;
+
         try {
+            initZkNodes();
+
             String prefix = UUID.randomUUID().toString();
 
             // TODO ZK: handle max size.
 
-            String path = zkClient.createSequential(prefix,
+            String path = state.zkClient.createSequential(prefix,
                 zkPaths.joinDataDir,
                 prefix + ":" + locNode.id() + "|",
                 joinDataBytes,
@@ -384,7 +450,7 @@ public class ZookeeperDiscoveryImpl {
 
             int seqNum = Integer.parseInt(path.substring(path.lastIndexOf('|') + 1));
 
-            locNodeZkPath = zkClient.createSequential(
+            state.locNodeZkPath = state.zkClient.createSequential(
                 prefix,
                 zkPaths.aliveNodesDir,
                 prefix + ":" + locNode.id() + "|" + seqNum + "|",
@@ -393,21 +459,20 @@ public class ZookeeperDiscoveryImpl {
 
             log.info("Node started join [nodeId=" + locNode.id() +
                 ", instanceName=" + locNode.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME) +
-                ", nodePath=" + locNodeZkPath + ']');
+                ", nodePath=" + state.locNodeZkPath + ']');
 
-            zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new AsyncCallback.Children2Callback() {
-                @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
-                    onConnected(rc, children);
-                }
-            });
+            state.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback());
 
-            zkClient.getDataAsync(zkPaths.evtsPath, watcher, dataCallback);
+            state.zkClient.getDataAsync(zkPaths.evtsPath, watcher, dataCallback);
 
             connStartLatch.countDown();
         }
         catch (ZookeeperClientFailedException e) {
             throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e);
         }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /** TODO ZK */
@@ -423,103 +488,51 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
-     * @param rc Async callback result.
-     * @param aliveNodes Alive nodes.
-     */
-    private void onConnected(int rc, List<String> aliveNodes) {
-        assert !joined;
-
-        checkIsCoordinator(rc, aliveNodes);
-    }
-
-    /**
      * @param rc Callback result code.
      * @param aliveNodes Alive nodes.
+     * @throws Exception If failed.
      */
-    private void checkIsCoordinator(int rc, final List<String> aliveNodes) {
-        try {
-            assert rc == 0 : KeeperException.Code.get(rc);
+    private void checkIsCoordinator(int rc, final List<String> aliveNodes) throws Exception {
+        assert rc == 0 : KeeperException.Code.get(rc);
 
-            TreeMap<Integer, String> alives = new TreeMap<>();
+        TreeMap<Integer, String> alives = new TreeMap<>();
 
-            Integer locInternalId = null;
+        Integer locInternalId = null;
 
-            for (String aliveNodePath : aliveNodes) {
-                Integer internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath);
+        for (String aliveNodePath : aliveNodes) {
+            Integer internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath);
 
-                alives.put(internalId, aliveNodePath);
+            alives.put(internalId, aliveNodePath);
 
-                if (locInternalId == null) {
-                    UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath);
+            if (locInternalId == null) {
+                UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath);
 
-                    if (locNode.id().equals(nodeId))
-                        locInternalId = internalId;
-                }
+                if (locNode.id().equals(nodeId))
+                    locInternalId = internalId;
             }
+        }
 
-            assert !alives.isEmpty();
-            assert locInternalId != null;
-
-            Map.Entry<Integer, String> crdE = alives.firstEntry();
-
-            if (locInternalId.equals(crdE.getKey()))
-                onBecomeCoordinator(aliveNodes, locInternalId);
-            else {
-                assert alives.size() > 1;
-
-                Map.Entry<Integer, String> prevE = alives.floorEntry(locInternalId - 1);
-
-                assert prevE != null;
-
-                log.info("Discovery coordinator already exists, watch for previous node [" +
-                    "locId=" + locNode.id() +
-                    ", prevPath=" + prevE.getValue() + ']');
+        assert !alives.isEmpty();
+        assert locInternalId != null;
 
-                PreviousNodeWatcher watcher = new PreviousNodeWatcher();
+        Map.Entry<Integer, String> crdE = alives.firstEntry();
 
-                zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + prevE.getValue(), watcher, watcher);
-            }
-        }
-        catch (Throwable e) {
-            onFatalError(e);
-        }
-    }
+        if (locInternalId.equals(crdE.getKey()))
+            onBecomeCoordinator(aliveNodes, locInternalId);
+        else {
+            assert alives.size() > 1;
 
-    /**
-     *
-     */
-    private class PreviousNodeWatcher implements Watcher, AsyncCallback.StatCallback {
-        @Override public void process(WatchedEvent evt) {
-            if (evt.getType() == Event.EventType.NodeDeleted) {
-                try {
-                    onPreviousNodeFail();
-                }
-                catch (Throwable e) {
-                    onFatalError(e);
-                }
-            }
-            else {
-                if (log.isInfoEnabled())
-                    log.info("Previous node watch event: " + evt);
+            Map.Entry<Integer, String> prevE = alives.floorEntry(locInternalId - 1);
 
-                if (evt.getType() != Event.EventType.None)
-                    zkClient.existsAsync(evt.getPath(), this, this);
-            }
-        }
+            assert prevE != null;
 
-        @Override public void processResult(int rc, String path, Object ctx, Stat stat) {
-            log.info("Previous node stat callback [rc=" + rc + ", path=" + path + ", stat=" + stat + ']');
+            log.info("Discovery coordinator already exists, watch for previous node [" +
+                "locId=" + locNode.id() +
+                ", prevPath=" + prevE.getValue() + ']');
 
-            assert rc == 0 || rc == KeeperException.Code.NONODE.intValue() : KeeperException.Code.get(rc);
+            PreviousNodeWatcher watcher = new PreviousNodeWatcher();
 
-            if (rc == KeeperException.Code.NONODE.intValue() || stat == null) {
-                try {
-                    onPreviousNodeFail();
-                }
-                catch (Throwable e) {
-                    onFatalError(e);
-                }
-            }
+            state.zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + prevE.getValue(), watcher, watcher);
         }
     }
 
@@ -534,11 +547,7 @@ public class ZookeeperDiscoveryImpl {
         if (log.isInfoEnabled())
             log.info("Previous node failed, check is node new coordinator [locId=" + locNode.id() + ']');
 
-        zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new AsyncCallback.Children2Callback() {
-            @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
-                checkIsCoordinator(rc, children);
-            }
-        });
+        state.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback());
     }
 
     /**
@@ -547,22 +556,22 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      */
     private void onBecomeCoordinator(List<String> aliveNodes, int locInternalId) throws Exception {
-        byte[] evtsDataBytes = zkClient.getData(zkPaths.evtsPath);
+        byte[] evtsDataBytes = state.zkClient.getData(zkPaths.evtsPath);
 
         if (evtsDataBytes.length > 0)
             processNewEvents(evtsDataBytes);
 
-        crd = true;
+        state.crd = true;
 
-        if (joined) {
+        if (state.joined) {
             if (log.isInfoEnabled())
                 log.info("Node is new discovery coordinator [locId=" + locNode.id() + ']');
 
             assert locNode.order() > 0 : locNode;
-            assert this.evtsData != null;
+            assert state.evtsData != null;
 
-            for (ZkDiscoveryEventData evtData : evtsData.evts.values())
-                evtData.initRemainingAcks(top.nodesByOrder.values());
+            for (ZkDiscoveryEventData evtData : state.evtsData.evts.values())
+                evtData.initRemainingAcks(state.top.nodesByOrder.values());
 
             handleProcessedEvents();
         }
@@ -573,8 +582,8 @@ public class ZookeeperDiscoveryImpl {
             newClusterStarted(locInternalId);
         }
 
-        zkClient.getChildrenAsync(zkPaths.aliveNodesDir, watcher, childrenCallback);
-        zkClient.getChildrenAsync(zkPaths.customEvtsDir, watcher, childrenCallback);
+        state.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, watcher, childrenCallback);
+        state.zkClient.getChildrenAsync(zkPaths.customEvtsDir, watcher, childrenCallback);
 
         for (String alivePath : aliveNodes)
             watchAliveNodeData(alivePath);
@@ -584,12 +593,12 @@ public class ZookeeperDiscoveryImpl {
      * @param alivePath
      */
     private void watchAliveNodeData(String alivePath) {
-        assert locNodeZkPath != null;
+        assert state.locNodeZkPath != null;
 
         String path = zkPaths.aliveNodesDir + "/" + alivePath;
 
-        if (!path.equals(locNodeZkPath))
-            zkClient.getDataAsync(path, aliveNodeDataWatcher, aliveNodeDataWatcher);
+        if (!path.equals(state.locNodeZkPath))
+            state.zkClient.getDataAsync(path, aliveNodeDataWatcher, aliveNodeDataWatcher);
     }
 
     /**
@@ -597,14 +606,14 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      */
     private void generateTopologyEvents(List<String> aliveNodes) throws Exception {
-        assert crd;
+        assert state.crd;
 
         if (log.isInfoEnabled())
             log.info("Process alive nodes change: " + aliveNodes);
 
         TreeMap<Integer, String> alives = new TreeMap<>();
 
-        TreeMap<Long, ZookeeperClusterNode> curTop = new TreeMap<>(top.nodesByOrder);
+        TreeMap<Long, ZookeeperClusterNode> curTop = new TreeMap<>(state.top.nodesByOrder);
 
         boolean newEvts = false;
 
@@ -615,7 +624,7 @@ public class ZookeeperDiscoveryImpl {
 
             assert old == null;
 
-            if (!top.nodesByInternalId.containsKey(internalId)) {
+            if (!state.top.nodesByInternalId.containsKey(internalId)) {
                 generateNodeJoin(curTop, internalId, child);
 
                 watchAliveNodeData(child);
@@ -624,7 +633,7 @@ public class ZookeeperDiscoveryImpl {
             }
         }
 
-        for (Map.Entry<Integer, ZookeeperClusterNode> e : top.nodesByInternalId.entrySet()) {
+        for (Map.Entry<Integer, ZookeeperClusterNode> e : state.top.nodesByInternalId.entrySet()) {
             if (!alives.containsKey(e.getKey())) {
                 ZookeeperClusterNode failedNode = e.getValue();
 
@@ -646,14 +655,14 @@ public class ZookeeperDiscoveryImpl {
     private void saveAndProcessNewEvents() throws Exception {
         long start = System.currentTimeMillis();
 
-        zkClient.setData(zkPaths.evtsPath, marsh.marshal(evtsData), -1);
+        state.zkClient.setData(zkPaths.evtsPath, marsh.marshal(state.evtsData), -1);
 
         long time = System.currentTimeMillis() - start;
 
         if (log.isInfoEnabled())
-            log.info("Discovery coordinator saved new topology events [topVer=" + evtsData.topVer + ", saveTime=" + time + ']');
+            log.info("Discovery coordinator saved new topology events [topVer=" + state.evtsData.topVer + ", saveTime=" + time + ']');
 
-        processNewEvents(evtsData);
+        processNewEvents(state.evtsData);
     }
 
     /**
@@ -665,15 +674,15 @@ public class ZookeeperDiscoveryImpl {
 
         assert rmvd != null;
 
-        evtsData.topVer++;
-        evtsData.evtIdGen++;
+        state.evtsData.topVer++;
+        state.evtsData.evtIdGen++;
 
         ZkDiscoveryNodeFailEventData evtData = new ZkDiscoveryNodeFailEventData(
-            evtsData.evtIdGen,
-            evtsData.topVer,
+            state.evtsData.evtIdGen,
+            state.evtsData.topVer,
             failedNode.internalId());
 
-        evtsData.addEvent(curTop.values(), evtData);
+        state.evtsData.addEvent(curTop.values(), evtData);
 
         if (log.isInfoEnabled())
             log.info("Generated NODE_FAILED event [evt=" + evtData + ']');
@@ -691,18 +700,12 @@ public class ZookeeperDiscoveryImpl {
         throws Exception
     {
         UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath);
-        int joinSeq = ZkIgnitePaths.aliveJoinDataSequence(aliveNodePath);
-
-        String joinDataPath = zkPaths.joinDataDir + '/' +
-            ZkIgnitePaths.aliveNodePrefixId(aliveNodePath) + ":" +
-            nodeId.toString() +
-            "|" +
-            String.format("%010d", joinSeq);
 
+        String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, aliveNodePath);
         byte[] joinData;
 
         try {
-            joinData = zkClient.getData(joinDataPath);
+            joinData = state.zkClient.getData(joinDataPath);
         }
         catch (KeeperException.NoNodeException e) {
             U.warn(log, "Failed to read joining node data, node left before join process finished: " + nodeId);
@@ -717,10 +720,10 @@ public class ZookeeperDiscoveryImpl {
 
         assert nodeId.equals(joinedNode.id()) : joiningNodeData.node();
 
-        evtsData.topVer++;
-        evtsData.evtIdGen++;
+        state.evtsData.topVer++;
+        state.evtsData.evtIdGen++;
 
-        joinedNode.order(evtsData.topVer);
+        joinedNode.order(state.evtsData.topVer);
         joinedNode.internalId(internalId);
 
         DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(nodeId);
@@ -744,21 +747,21 @@ public class ZookeeperDiscoveryImpl {
         assert old == null;
 
         ZkDiscoveryNodeJoinEventData evtData = new ZkDiscoveryNodeJoinEventData(
-            evtsData.evtIdGen,
-            evtsData.topVer,
+            state.evtsData.evtIdGen,
+            state.evtsData.topVer,
             joinedNode.id(),
             joinedNode.internalId());
 
         evtData.joiningNodeData = joiningNodeData;
 
-        evtsData.addEvent(dataForJoined.topology(), evtData);
+        state.evtsData.addEvent(dataForJoined.topology(), evtData);
 
         evtData.addRemainingAck(joinedNode); // Topology for joined node does not contain joined node.
 
         long start = System.currentTimeMillis();
 
-        zkClient.createIfNeeded(zkPaths.joinEventDataPath(evtData.eventId()), joinData, PERSISTENT);
-        zkClient.createIfNeeded(zkPaths.joinEventDataPathForJoined(evtData.eventId()), marshal(dataForJoined), PERSISTENT);
+        state.zkClient.createIfNeeded(zkPaths.joinEventDataPath(evtData.eventId()), joinData, PERSISTENT);
+        state.zkClient.createIfNeeded(zkPaths.joinEventDataPathForJoined(evtData.eventId()), marshal(dataForJoined), PERSISTENT);
 
         long time = System.currentTimeMillis() - start;
 
@@ -774,27 +777,38 @@ public class ZookeeperDiscoveryImpl {
     private void newClusterStarted(int locInternalId) throws Exception {
         cleanupPreviousClusterData();
 
-        joined = true;
+        state.joined = true;
 
-        gridStartTime = U.currentTimeMillis();
+        state.gridStartTime = U.currentTimeMillis();
 
-        evtsData = new ZkDiscoveryEventsData(gridStartTime, 1L, new TreeMap<Long, ZkDiscoveryEventData>());
+        state.evtsData = new ZkDiscoveryEventsData(state.gridStartTime, 1L, new TreeMap<Long, ZkDiscoveryEventData>());
 
         locNode.internalId(locInternalId);
         locNode.order(1);
 
-        top.addNode(locNode);
+        state.top.addNode(locNode);
 
-        lsnr.onDiscovery(EventType.EVT_NODE_JOINED,
-            1L,
-            locNode,
-            (Collection)top.nodesByOrder.values(),
-            Collections.<Long, Collection<ClusterNode>>emptyMap(),
-            null);
+        String path = state.locNodeZkPath.substring(state.locNodeZkPath.lastIndexOf('/') + 1);
 
-        joinFut.onDone();
+        String joinDataPath = zkPaths.joiningNodeDataPath(locNode.id(), path);
 
-        // TODO ZK: remove join zk nodes
+        // TODO ZK async
+        state.zkClient.deleteIfExists(joinDataPath, -1);
+
+        final List<ClusterNode> topSnapshot = Collections.singletonList((ClusterNode)locNode);
+
+        evtWorker.evtsQ.add(new Runnable() {
+            @Override public void run() {
+                lsnr.onDiscovery(EventType.EVT_NODE_JOINED,
+                    1L,
+                    locNode,
+                    topSnapshot,
+                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
+                    null);
+
+                joinFut.onDone();
+            }
+        });
     }
 
     /**
@@ -802,9 +816,9 @@ public class ZookeeperDiscoveryImpl {
      */
     private void cleanupPreviousClusterData() throws Exception {
         // TODO ZK: use multi, better batching.
-        zkClient.setData(zkPaths.evtsPath, null, -1);
+        state.zkClient.setData(zkPaths.evtsPath, null, -1);
 
-        List<String> evtChildren = zkClient.getChildren(zkPaths.evtsPath);
+        List<String> evtChildren = state.zkClient.getChildren(zkPaths.evtsPath);
 
         for (String evtPath : evtChildren) {
             String evtDir = zkPaths.evtsPath + "/" + evtPath;
@@ -812,14 +826,14 @@ public class ZookeeperDiscoveryImpl {
             removeChildren(evtDir);
         }
 
-        zkClient.deleteAll(zkPaths.evtsPath, evtChildren, -1);
+        state.zkClient.deleteAll(zkPaths.evtsPath, evtChildren, -1);
 
-        zkClient.deleteAll(zkPaths.customEvtsDir,
-            zkClient.getChildren(zkPaths.customEvtsDir),
+        state.zkClient.deleteAll(zkPaths.customEvtsDir,
+            state.zkClient.getChildren(zkPaths.customEvtsDir),
             -1);
 
-        zkClient.deleteAll(zkPaths.customEvtsAcksDir,
-            zkClient.getChildren(zkPaths.customEvtsAcksDir),
+        state.zkClient.deleteAll(zkPaths.customEvtsAcksDir,
+            state.zkClient.getChildren(zkPaths.customEvtsAcksDir),
             -1);
     }
 
@@ -828,15 +842,15 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      */
     private void removeChildren(String path) throws Exception {
-        zkClient.deleteAll(path, zkClient.getChildren(path), -1);
+        state.zkClient.deleteAll(path, state.zkClient.getChildren(path), -1);
     }
 
     ZkClusterNodes nodes() {
-        return top;
+        return state.top;
     }
 
     ZookeeperClient zkClient() {
-        return zkClient;
+        return state.zkClient;
     }
 
     /**
@@ -844,7 +858,7 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      */
     private void generateCustomEvents(List<String> customEvtNodes) throws Exception {
-        assert crd;
+        assert state.crd;
 
         TreeMap<Integer, String> newEvts = null;
 
@@ -853,7 +867,7 @@ public class ZookeeperDiscoveryImpl {
 
             int evtSeq = ZkIgnitePaths.customEventSequence(evtPath);
 
-            if (evtSeq > evtsData.procCustEvt) {
+            if (evtSeq > state.evtsData.procCustEvt) {
                 if (newEvts == null)
                     newEvts = new TreeMap<>();
 
@@ -865,30 +879,30 @@ public class ZookeeperDiscoveryImpl {
             for (Map.Entry<Integer, String> evtE : newEvts.entrySet()) {
                 UUID sndNodeId = ZkIgnitePaths.customEventSendNodeId(evtE.getValue());
 
-                ZookeeperClusterNode sndNode = top.nodesById.get(sndNodeId);
+                ZookeeperClusterNode sndNode = state.top.nodesById.get(sndNodeId);
 
                 String evtDataPath = zkPaths.customEvtsDir + "/" + evtE.getValue();
 
                 if (sndNode != null) {
-                    byte[] evtBytes = zkClient.getData(zkPaths.customEvtsDir + "/" + evtE.getValue());
+                    byte[] evtBytes = state.zkClient.getData(zkPaths.customEvtsDir + "/" + evtE.getValue());
 
                     DiscoverySpiCustomMessage msg;
 
                     try {
                         msg = unmarshal(evtBytes);
 
-                        evtsData.evtIdGen++;
+                        state.evtsData.evtIdGen++;
 
                         ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData(
-                            evtsData.evtIdGen,
-                            evtsData.topVer,
+                            state.evtsData.evtIdGen,
+                            state.evtsData.topVer,
                             sndNodeId,
                             evtE.getValue(),
                             false);
 
                         evtData.msg = msg;
 
-                        evtsData.addEvent(top.nodesByOrder.values(), evtData);
+                        state.evtsData.addEvent(state.top.nodesByOrder.values(), evtData);
 
                         if (log.isInfoEnabled())
                             log.info("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']');
@@ -900,10 +914,10 @@ public class ZookeeperDiscoveryImpl {
                 else {
                     U.warn(log, "Ignore custom event from unknown node: " + sndNodeId);
 
-                    zkClient.deleteIfExists(evtDataPath, -1);
+                    state.zkClient.deleteIfExists(evtDataPath, -1);
                 }
 
-                evtsData.procCustEvt = evtE.getKey();
+                state.evtsData.procCustEvt = evtE.getKey();
             }
 
             saveAndProcessNewEvents();
@@ -918,17 +932,17 @@ public class ZookeeperDiscoveryImpl {
         if (data.length == 0)
             return;
 
-        assert !crd;
+        assert !state.crd;
 
         ZkDiscoveryEventsData newEvtsData = unmarshal(data);
 
         // Need keep processed custom events since they contains message object.
-        if (evtsData != null)
-            newEvtsData.evts.putAll(evtsData.evts);
+        if (state.evtsData != null)
+            newEvtsData.evts.putAll(state.evtsData.evts);
 
         processNewEvents(newEvtsData);
 
-        this.evtsData = newEvtsData;
+        state.evtsData = newEvtsData;
     }
 
     /**
@@ -941,8 +955,8 @@ public class ZookeeperDiscoveryImpl {
 
         boolean updateNodeInfo = false;
 
-        for (ZkDiscoveryEventData evtData : evts.tailMap(locNodeInfo.lastProcEvt, false).values()) {
-            if (!joined) {
+        for (ZkDiscoveryEventData evtData : evts.tailMap(state.locNodeInfo.lastProcEvt, false).values()) {
+            if (!state.joined) {
                 if (evtData.eventType() != EventType.EVT_NODE_JOINED)
                     continue;
 
@@ -966,7 +980,7 @@ public class ZookeeperDiscoveryImpl {
 
                         ZkJoiningNodeData joiningData;
 
-                        if (crd) {
+                        if (state.crd) {
                             assert evtData0.joiningNodeData != null;
 
                             joiningData = evtData0.joiningNodeData;
@@ -974,7 +988,7 @@ public class ZookeeperDiscoveryImpl {
                         else {
                             String path = zkPaths.joinEventDataPath(evtData.eventId());
 
-                            joiningData = unmarshal(zkClient.getData(path));
+                            joiningData = unmarshal(state.zkClient.getData(path));
 
                             DiscoveryDataBag dataBag = new DiscoveryDataBag(evtData0.nodeId);
 
@@ -1002,7 +1016,7 @@ public class ZookeeperDiscoveryImpl {
 
                         DiscoverySpiCustomMessage msg;
 
-                        if (crd) {
+                        if (state.crd) {
                             assert evtData0.msg != null : evtData0;
 
                             msg = evtData0.msg;
@@ -1011,7 +1025,7 @@ public class ZookeeperDiscoveryImpl {
                             String path = zkPaths.customEventDataPath(evtData0.ackEvent(),
                                 evtData0.evtPath);
 
-                            msg = unmarshal(zkClient.getData(path));
+                            msg = unmarshal(state.zkClient.getData(path));
 
                             evtData0.msg = msg;
                         }
@@ -1029,23 +1043,23 @@ public class ZookeeperDiscoveryImpl {
                 }
             }
 
-            if (joined) {
-                locNodeInfo.lastProcEvt = evtData.eventId();
+            if (state.joined) {
+                state.locNodeInfo.lastProcEvt = evtData.eventId();
 
-                procEvtCnt++;
+                state.procEvtCnt++;
 
-                if (procEvtCnt % evtsAckThreshold == 0)
+                if (state.procEvtCnt % evtsAckThreshold == 0)
                     updateNodeInfo = true;
             }
         }
 
-        if (crd) {
+        if (state.crd) {
             handleProcessedEvents();
         }
         else if (updateNodeInfo) {
-            assert locNodeZkPath != null;
+            assert state.locNodeZkPath != null;
 
-            zkClient.setData(locNodeZkPath, marshal(locNodeInfo), -1);
+            state.zkClient.setData(state.locNodeZkPath, marshal(state.locNodeInfo), -1);
         }
     }
 
@@ -1055,7 +1069,7 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      */
     @SuppressWarnings("unchecked")
-    private void processLocalJoin(ZkDiscoveryEventsData evtsData, ZkDiscoveryNodeJoinEventData evtData)
+    private void processLocalJoin(ZkDiscoveryEventsData evtsData, final ZkDiscoveryNodeJoinEventData evtData)
         throws Exception
     {
         if (log.isInfoEnabled())
@@ -1063,9 +1077,9 @@ public class ZookeeperDiscoveryImpl {
 
         String path = zkPaths.joinEventDataPathForJoined(evtData.eventId());
 
-        ZkJoinEventDataForJoined dataForJoined = unmarshal(zkClient.getData(path));
+        ZkJoinEventDataForJoined dataForJoined = unmarshal(state.zkClient.getData(path));
 
-        gridStartTime = evtsData.gridStartTime;
+        state.gridStartTime = evtsData.gridStartTime;
 
         locNode.internalId(evtData.joinedInternalId);
         locNode.order(evtData.topologyVersion());
@@ -1083,26 +1097,44 @@ public class ZookeeperDiscoveryImpl {
 
             node.setMetrics(new ClusterMetricsSnapshot());
 
-            top.addNode(node);
+            state.top.addNode(node);
         }
 
-        top.addNode(locNode);
+        state.top.addNode(locNode);
 
-        List<ClusterNode> topSnapshot = new ArrayList<>((Collection)top.nodesByOrder.values());
+        final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
 
-        lsnr.onDiscovery(evtData.eventType(),
-            evtData.topologyVersion(),
-            locNode,
-            topSnapshot,
-            Collections.<Long, Collection<ClusterNode>>emptyMap(),
-            null);
+        evtWorker.evtsQ.add(new Runnable() {
+            @Override public void run() {
+                if (connState == ConnectionState.DISCONNECTED)
+                    connState = ConnectionState.STARTED;
 
-        joinFut.onDone();
+                lsnr.onDiscovery(evtData.eventType(),
+                    evtData.topologyVersion(),
+                    locNode,
+                    topSnapshot,
+                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
+                    null);
+
+                if (state.prevJoined) {
+                    lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
+                        evtData.topologyVersion(),
+                        locNode,
+                        topSnapshot,
+                        Collections.<Long, Collection<ClusterNode>>emptyMap(),
+                        null);
+
+                    U.quietAndWarn(log, "Client node was reconnected after it was already considered failed.");
+                }
 
-        joined = true;
+                joinFut.onDone();
+            }
+        });
+
+        state.joined = true;
 
         // TODO ZK: async
-        zkClient.deleteIfExists(path, -1);
+        state.zkClient.deleteIfExists(path, -1);
     }
 
     /**
@@ -1110,22 +1142,26 @@ public class ZookeeperDiscoveryImpl {
      * @param msg Custom message.
      */
     @SuppressWarnings("unchecked")
-    private void notifyCustomEvent(ZkDiscoveryCustomEventData evtData, DiscoverySpiCustomMessage msg) {
+    private void notifyCustomEvent(final ZkDiscoveryCustomEventData evtData, final DiscoverySpiCustomMessage msg) {
         if (log.isInfoEnabled())
             log.info(" [topVer=" + evtData.topologyVersion() + ", msg=" + msg + ']');
 
-        ZookeeperClusterNode sndNode = top.nodesById.get(evtData.sndNodeId);
+        final ZookeeperClusterNode sndNode = state.top.nodesById.get(evtData.sndNodeId);
 
         assert sndNode != null : evtData;
 
-        List<ClusterNode> topSnapshot = new ArrayList<>((Collection)top.nodesByOrder.values());
+        final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
 
-        lsnr.onDiscovery(evtData.eventType(),
-            evtData.topologyVersion(),
-            sndNode,
-            topSnapshot,
-            Collections.<Long, Collection<ClusterNode>>emptyMap(),
-            msg);
+        evtWorker.evtsQ.add(new Runnable() {
+            @Override public void run() {
+                lsnr.onDiscovery(evtData.eventType(),
+                    evtData.topologyVersion(),
+                    sndNode,
+                    topSnapshot,
+                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
+                    msg);
+            }
+        });
     }
 
     /**
@@ -1133,50 +1169,58 @@ public class ZookeeperDiscoveryImpl {
      * @param joiningData Joining node data.
      */
     @SuppressWarnings("unchecked")
-    private void notifyNodeJoin(ZkDiscoveryNodeJoinEventData evtData, ZkJoiningNodeData joiningData) {
-        ZookeeperClusterNode joinedNode = joiningData.node();
+    private void notifyNodeJoin(final ZkDiscoveryNodeJoinEventData evtData, ZkJoiningNodeData joiningData) {
+        final ZookeeperClusterNode joinedNode = joiningData.node();
 
         joinedNode.order(evtData.topologyVersion());
         joinedNode.internalId(evtData.joinedInternalId);
 
         joinedNode.setMetrics(new ClusterMetricsSnapshot());
 
-        top.addNode(joinedNode);
+        state.top.addNode(joinedNode);
 
-        List<ClusterNode> topSnapshot = new ArrayList<>((Collection)top.nodesByOrder.values());
+        final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
 
-        lsnr.onDiscovery(evtData.eventType(),
-            evtData.topologyVersion(),
-            joinedNode,
-            topSnapshot,
-            Collections.<Long, Collection<ClusterNode>>emptyMap(),
-            null);
+        evtWorker.evtsQ.add(new Runnable() {
+            @Override public void run() {
+                lsnr.onDiscovery(evtData.eventType(),
+                    evtData.topologyVersion(),
+                    joinedNode,
+                    topSnapshot,
+                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
+                    null);
+            }
+        });
     }
 
     /**
      * @param evtData Event data.
      */
     @SuppressWarnings("unchecked")
-    private void notifyNodeFail(ZkDiscoveryNodeFailEventData evtData) {
-        ZookeeperClusterNode failedNode = top.removeNode(evtData.failedNodeInternalId());
+    private void notifyNodeFail(final ZkDiscoveryNodeFailEventData evtData) {
+        final ZookeeperClusterNode failedNode = state.top.removeNode(evtData.failedNodeInternalId());
 
         assert failedNode != null;
 
-        List<ClusterNode> topSnapshot = new ArrayList<>((Collection)top.nodesByOrder.values());
+        final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
 
-        lsnr.onDiscovery(evtData.eventType(),
-            evtData.topologyVersion(),
-            failedNode,
-            topSnapshot,
-            Collections.<Long, Collection<ClusterNode>>emptyMap(),
-            null);
+        evtWorker.evtsQ.add(new Runnable() {
+            @Override public void run() {
+                lsnr.onDiscovery(evtData.eventType(),
+                    evtData.topologyVersion(),
+                    failedNode,
+                    topSnapshot,
+                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
+                    null);
+            }
+        });
     }
 
     /**
      * @throws Exception If failed.
      */
     private void handleProcessedEvents() throws Exception {
-        Iterator<ZkDiscoveryEventData> it = this.evtsData.evts.values().iterator();
+        Iterator<ZkDiscoveryEventData> it = state.evtsData.evts.values().iterator();
 
         List<ZkDiscoveryCustomEventData> newEvts = null;
 
@@ -1195,15 +1239,15 @@ public class ZookeeperDiscoveryImpl {
                         DiscoverySpiCustomMessage ack = handleProcessedCustomEvent((ZkDiscoveryCustomEventData)evtData);
 
                         if (ack != null) {
-                            evtsData.evtIdGen++;
+                            state.evtsData.evtIdGen++;
 
-                            long evtId = evtsData.evtIdGen;
+                            long evtId = state.evtsData.evtIdGen;
 
                             byte[] ackBytes = marshal(ack);
 
                             String evtChildPath = String.valueOf(evtId);
 
-                            zkClient.createIfNeeded(
+                            state.zkClient.createIfNeeded(
                                 zkPaths.customEventDataPath(true, evtChildPath),
                                 ackBytes,
                                 CreateMode.PERSISTENT);
@@ -1243,7 +1287,7 @@ public class ZookeeperDiscoveryImpl {
 
         if (newEvts != null) {
             for (int i = 0; i < newEvts.size(); i++)
-                evtsData.addEvent(top.nodesByOrder.values(), newEvts.get(i));
+                state.evtsData.addEvent(state.top.nodesByOrder.values(), newEvts.get(i));
 
             saveAndProcessNewEvents();
         }
@@ -1256,7 +1300,7 @@ public class ZookeeperDiscoveryImpl {
     private void handleProcessedEventsOnNodeFail(ZookeeperClusterNode failedNode) throws Exception {
         boolean processed = false;
 
-        for (Iterator<Map.Entry<Long, ZkDiscoveryEventData>> it = evtsData.evts.entrySet().iterator(); it.hasNext();) {
+        for (Iterator<Map.Entry<Long, ZkDiscoveryEventData>> it = state.evtsData.evts.entrySet().iterator(); it.hasNext();) {
             Map.Entry<Long, ZkDiscoveryEventData> e = it.next();
 
             ZkDiscoveryEventData evtData = e.getValue();
@@ -1274,10 +1318,11 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      */
     private void handleProcessedJoinEvent(ZkDiscoveryNodeJoinEventData evtData) throws Exception {
-        log.info("All nodes processed node join [evtData=" + evtData + ']');
+        if (log.isInfoEnabled())
+            log.info("All nodes processed node join [evtData=" + evtData + ']');
 
-        zkClient.deleteIfExists(zkPaths.joinEventDataPath(evtData.eventId()), -1);
-        zkClient.deleteIfExists(zkPaths.joinEventDataPathForJoined(evtData.eventId()), -1);
+        state.zkClient.deleteIfExists(zkPaths.joinEventDataPath(evtData.eventId()), -1);
+        state.zkClient.deleteIfExists(zkPaths.joinEventDataPathForJoined(evtData.eventId()), -1);
     }
 
     /**
@@ -1288,10 +1333,11 @@ public class ZookeeperDiscoveryImpl {
     @Nullable private DiscoverySpiCustomMessage handleProcessedCustomEvent(ZkDiscoveryCustomEventData evtData)
         throws Exception
     {
-        log.info("All nodes processed custom event [evtData=" + evtData + ']');
+        if (log.isInfoEnabled())
+            log.info("All nodes processed custom event [evtData=" + evtData + ']');
 
         if (!evtData.ackEvent()) {
-            zkClient.deleteIfExists(zkPaths.customEventDataPath(false, evtData.evtPath), -1);
+            state.zkClient.deleteIfExists(zkPaths.customEventDataPath(false, evtData.evtPath), -1);
 
             assert evtData.msg != null || locNode.order() > evtData.topologyVersion() : evtData;
 
@@ -1299,32 +1345,72 @@ public class ZookeeperDiscoveryImpl {
                 return evtData.msg.ackMessage();
         }
         else
-            zkClient.deleteIfExists(zkPaths.customEventDataPath(true, evtData.evtPath), -1);
+            state.zkClient.deleteIfExists(zkPaths.customEventDataPath(true, evtData.evtPath), -1);
 
         return null;
     }
 
     /**
-     *
+     * @throws InterruptedException If interrupted.
+     */
+    public void stop() throws InterruptedException {
+        stop0(new IgniteSpiException("Node stopped"));
+    }
+
+    /**
+     * @param e Error.
+     * @throws InterruptedException If interrupted.
      */
-    public void stop() {
+    private void stop0(Throwable e) throws InterruptedException {
+        log.info("Stop ZookeeperDiscovery [nodeId=" + locNode.id() + ", err=" + e + ']');
+
+        connState = ConnectionState.DISCONNECTED;
+
+        ZookeeperClient zkClient = state.zkClient;
+
+        if (zkClient != null)
+            zkClient.onCloseStart();
+
+        busyLock.block();
+
+        joinFut.onDone(e);
+
         if (zkClient != null)
             zkClient.close();
 
-        joinFut.onDone(new IgniteSpiException("Node stopped"));
+        ZkEventWorker evtWorker = this.evtWorker;
+
+        if (evtWorker != null) {
+            evtWorker.interrupt();
+
+            evtWorker.join();
+        }
     }
 
     /**
-     * @param e Error.
+     * @param busyLock Busy lock.
+     * @param err Error.
      */
-    private void onFatalError(Throwable e) {
+    private void onFatalError(GridSpinBusyLock busyLock, Throwable err) {
+        busyLock.leaveBusy();
+
+        if (connState == ConnectionState.STOPPED)
+            return;
+
         // TODO ZK
-        U.error(log, "Failed to process discovery data. Stopping the node in order to prevent cluster wide instability.", e);
+        U.error(log, "Fatal error in ZookeeperDiscovery.", err);
 
-        joinFut.onDone(e);
+        try {
+            stop0(err);
+        }
+        catch (InterruptedException e) {
+            U.warn(log, "Failed to finish stop procedure, thread was interrupted.");
+
+            Thread.currentThread().interrupt();
+        }
 
-        if (e instanceof Error)
-            throw (Error)e;
+        if (err instanceof Error)
+            throw (Error)err;
     }
 
     /**
@@ -1352,27 +1438,130 @@ public class ZookeeperDiscoveryImpl {
     /**
      *
      */
-    private class ConnectionLossListener implements IgniteRunnable {
+    private class ZkEventWorker extends IgniteSpiThread {
         /** */
-        private static final long serialVersionUID = 0L;
+        private final Runnable CONNECTION_LOST = new Runnable() {@Override public void run() {}};
+
+        /** */
+        private final BlockingQueue<Runnable> evtsQ;
+
+        /**
+         * @param igniteInstanceName Ignite instance name.
+         * @param name Thread name.
+         * @param log Logger.
+         */
+        ZkEventWorker(String igniteInstanceName, String name, IgniteLogger log) {
+            super(igniteInstanceName, name, log);
+
+            evtsQ = new LinkedBlockingQueue<>();
+        }
 
         /** {@inheritDoc} */
-        @Override public void run() {
-            // TODO ZK, can be called from any thread.
-            U.warn(log, "Zookeeper connection loss, local node is SEGMENTED");
+        @Override protected void body() throws InterruptedException {
+            while (!isInterrupted()) {
+                Runnable r = evtsQ.take();
+
+                if (r == CONNECTION_LOST)
+                    processConnectionLost();
+                else {
+                    if (!busyLock.enterBusy())
+                        return;
+
+                    try {
+                        r.run();
+                    }
+                    finally {
+                        busyLock.leaveBusy();
+                    }
+                }
+            }
+        }
+
+        /**
+         *
+         */
+        void onConnectionLoss() {
+            evtsQ.add(CONNECTION_LOST);
+        }
+
+        /**
+         *
+         */
+        void processConnectionLost() {
+            if (clientReconnectEnabled) {
+                connState = ConnectionState.DISCONNECTED;
 
-            if (joined) {
-                assert evtsData != null;
+                busyLock.block();
+
+                busyLock.unblock();
+
+                UUID newId = UUID.randomUUID();
+
+                U.quietAndWarn(log, "Connection to Zookeeper server is lost, local node will try to reconnect with new id [" +
+                    "newId=" + newId +
+                    ", prevId=" + locNode.id() +
+                    ", locNode=" + locNode + ']');
+
+                locNode.onClientDisconnected(newId);
+
+                if (state.joined) {
+                    assert state.evtsData != null;
+
+                    lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED,
+                        state.evtsData.topVer,
+                        locNode,
+                        state.top.topologySnapshot(),
+                        Collections.<Long, Collection<ClusterNode>>emptyMap(),
+                        null);
+                }
+
+                state = new ZkRuntimeState(state.joined);
+
+                try {
+                    joinTopology0(true);
+                }
+                catch (Exception e) {
+                    U.error(log, "Failed to reconnect: " + e, e);
+
+                    onSegemented(e);
+                }
+            }
+            else {
+                U.warn(log, "Connection to Zookeeper server is lost, local node SEGMENTED.");
+
+                onSegemented(new IgniteSpiException("Zookeeper connection loss."));
+            }
+        }
+
+        /**
+         * @param e Error.
+         */
+        private void onSegemented(Exception e) {
+            if (state.joined) {
+                assert state.evtsData != null;
 
                 lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED,
-                    evtsData.topVer,
+                    state.evtsData.topVer,
                     locNode,
-                    Collections.<ClusterNode>emptyList(),
+                    state.top.topologySnapshot(),
                     Collections.<Long, Collection<ClusterNode>>emptyMap(),
                     null);
             }
             else
-                joinFut.onDone(new IgniteSpiException("Local node SEGMENTED"));
+                joinFut.onDone(e);
+        }
+    }
+
+    /**
+     *
+     */
+    private class ConnectionLossListener implements IgniteRunnable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            evtWorker.onConnectionLoss();
         }
     }
 
@@ -1382,21 +1571,31 @@ public class ZookeeperDiscoveryImpl {
     private class ZkWatcher implements Watcher {
         /** {@inheritDoc} */
         @Override public void process(WatchedEvent evt) {
-            if (evt.getType() == Event.EventType.NodeDataChanged) {
-                if (evt.getPath().equals(zkPaths.evtsPath)) {
-                    if (!crd)
-                        zkClient.getDataAsync(evt.getPath(), this, dataCallback);
+            if (!busyLock.enterBusy())
+                return;
+
+            try {
+                if (evt.getType() == Event.EventType.NodeDataChanged) {
+                    if (evt.getPath().equals(zkPaths.evtsPath)) {
+                        if (!state.crd)
+                            state.zkClient.getDataAsync(evt.getPath(), this, dataCallback);
+                    }
+                    else
+                        U.warn(log, "Received NodeDataChanged for unexpected path: " + evt.getPath());
                 }
-                else
-                    U.warn(log, "Received NodeDataChanged for unexpected path: " + evt.getPath());
+                else if (evt.getType() == Event.EventType.NodeChildrenChanged) {
+                    if (evt.getPath().equals(zkPaths.aliveNodesDir))
+                        state.zkClient.getChildrenAsync(evt.getPath(), this, childrenCallback);
+                    else if (evt.getPath().equals(zkPaths.customEvtsDir))
+                        state.zkClient.getChildrenAsync(evt.getPath(), this, childrenCallback);
+                    else
+                        U.warn(log, "Received NodeChildrenChanged for unexpected path: " + evt.getPath());
+                }
+
+                busyLock.leaveBusy();
             }
-            else if (evt.getType() == Event.EventType.NodeChildrenChanged) {
-                if (evt.getPath().equals(zkPaths.aliveNodesDir))
-                    zkClient.getChildrenAsync(evt.getPath(), this, childrenCallback);
-                else if (evt.getPath().equals(zkPaths.customEvtsDir))
-                    zkClient.getChildrenAsync(evt.getPath(), this, childrenCallback);
-                else
-                    U.warn(log, "Received NodeChildrenChanged for unexpected path: " + evt.getPath());
+            catch (Throwable e) {
+                onFatalError(busyLock, e);
             }
         }
     }
@@ -1407,6 +1606,9 @@ public class ZookeeperDiscoveryImpl {
     private class ZKChildrenCallback implements AsyncCallback.Children2Callback {
         /** {@inheritDoc} */
         @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
+            if (!busyLock.enterBusy())
+                return;
+
             try {
                 assert rc == 0 : KeeperException.Code.get(rc);
 
@@ -1416,9 +1618,11 @@ public class ZookeeperDiscoveryImpl {
                     generateCustomEvents(children);
                 else
                     U.warn(log, "Children callback for unexpected path: " + path);
+
+                busyLock.leaveBusy();
             }
             catch (Throwable e) {
-                onFatalError(e);
+                onFatalError(busyLock, e);
             }
         }
     }
@@ -1429,18 +1633,23 @@ public class ZookeeperDiscoveryImpl {
     private class ZkDataCallback implements AsyncCallback.DataCallback {
         /** {@inheritDoc} */
         @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+            if (!busyLock.enterBusy())
+                return;
+
             try {
                 assert rc == 0 : KeeperException.Code.get(rc);
 
                 if (path.equals(zkPaths.evtsPath)) {
-                    if (!crd)
+                    if (!state.crd)
                         processNewEvents(data);
                 }
                 else
                     U.warn(log, "Data callback for unknown path: " + path);
+
+                busyLock.leaveBusy();
             }
             catch (Throwable e) {
-                onFatalError(e);
+                onFatalError(busyLock, e);
             }
         }
     }
@@ -1449,14 +1658,46 @@ public class ZookeeperDiscoveryImpl {
      *
      */
     private class AliveNodeDataWatcher implements Watcher, AsyncCallback.DataCallback {
+        /** {@inheritDoc} */
         @Override public void process(WatchedEvent evt) {
-            if (evt.getType() == Event.EventType.NodeDataChanged)
-                zkClient.getDataAsync(evt.getPath(), this, this);
+            if (!busyLock.enterBusy())
+                return;
+
+            try {
+                if (evt.getType() == Event.EventType.NodeDataChanged)
+                    state.zkClient.getDataAsync(evt.getPath(), this, this);
+
+                busyLock.leaveBusy();
+            }
+            catch (Throwable e) {
+                onFatalError(busyLock, e);
+            }
         }
 
+        /** {@inheritDoc} */
         @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-            assert crd;
+            assert state.crd;
+
+            if (!busyLock.enterBusy())
+                return;
+
+            try {
+                processResult0(rc, path, data);
+
+                busyLock.leaveBusy();
+            }
+            catch (Throwable e) {
+                onFatalError(busyLock, e);
+            }
+        }
 
+        /**
+         * @param rc Result code.
+         * @param path Path.
+         * @param data Data.
+         * @throws Exception If failed.
+         */
+        private void processResult0(int rc, String path, byte[] data) throws Exception {
             if (rc == KeeperException.Code.NONODE.intValue()) {
                 if (log.isDebugEnabled())
                     log.debug("Alive node callaback, no node: " + path);
@@ -1466,30 +1707,108 @@ public class ZookeeperDiscoveryImpl {
 
             assert rc == 0 : KeeperException.Code.get(rc);
 
-            try {
-                if (data.length > 0) {
-                    ZkAliveNodeData nodeData = unmarshal(data);
+            if (data.length > 0) {
+                ZkAliveNodeData nodeData = unmarshal(data);
 
-                    Integer nodeInternalId = ZkIgnitePaths.aliveInternalId(path);
+                Integer nodeInternalId = ZkIgnitePaths.aliveInternalId(path);
 
-                    Iterator<ZkDiscoveryEventData> it = evtsData.evts.values().iterator();
+                Iterator<ZkDiscoveryEventData> it = state.evtsData.evts.values().iterator();
 
-                    boolean processed = false;
+                boolean processed = false;
 
-                    while (it.hasNext()) {
-                        ZkDiscoveryEventData evtData = it.next();
+                while (it.hasNext()) {
+                    ZkDiscoveryEventData evtData = it.next();
 
-                        if (evtData.onAckReceived(nodeInternalId, nodeData.lastProcEvt))
-                            processed = true;
-                    }
+                    if (evtData.onAckReceived(nodeInternalId, nodeData.lastProcEvt))
+                        processed = true;
+                }
 
-                    if (processed)
-                        handleProcessedEvents();
+                if (processed)
+                    handleProcessedEvents();
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private class PreviousNodeWatcher implements Watcher, AsyncCallback.StatCallback {
+        /** {@inheritDoc} */
+        @Override public void process(WatchedEvent evt) {
+            if (!busyLock.enterBusy())
+                return;
+
+            try {
+                if (evt.getType() == Event.EventType.NodeDeleted) {
+                    onPreviousNodeFail();
                 }
+                else {
+                    if (log.isInfoEnabled())
+                        log.info("Previous node watch event: " + evt);
+
+                    if (evt.getType() != Event.EventType.None)
+                        state.zkClient.existsAsync(evt.getPath(), this, this);
+                }
+
+                busyLock.leaveBusy();
             }
             catch (Throwable e) {
-                onFatalError(e);
+                onFatalError(busyLock, e);
             }
         }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx, Stat stat) {
+            if (!busyLock.enterBusy())
+                return;
+
+            try {
+                log.info("Previous node stat callback [rc=" + rc + ", path=" + path + ", stat=" + stat + ']');
+
+                assert rc == 0 || rc == KeeperException.Code.NONODE.intValue() : KeeperException.Code.get(rc);
+
+                if (rc == KeeperException.Code.NONODE.intValue() || stat == null)
+                    onPreviousNodeFail();
+
+                busyLock.leaveBusy();
+            }
+            catch (Throwable e) {
+                onFatalError(busyLock, e);
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    class CheckCoordinatorCallback implements  AsyncCallback.Children2Callback {
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
+            if (!busyLock.enterBusy())
+                return;
+
+            try {
+                assert rc == 0 : rc;
+
+                checkIsCoordinator(rc, children);
+
+                busyLock.leaveBusy();
+            }
+            catch (Throwable e) {
+                onFatalError(busyLock, e);
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    enum ConnectionState {
+        /** */
+        STARTED,
+        /** */
+        DISCONNECTED,
+        /** */
+        STOPPED
     }
 }


Mime
View raw message