ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [5/5] ignite git commit: zk
Date Wed, 22 Nov 2017 14:41:25 GMT
zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/42bbed0a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/42bbed0a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/42bbed0a

Branch: refs/heads/ignite-zk
Commit: 42bbed0adda149acb098fddfc830bcea768697d7
Parents: 8bd1e07
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Nov 22 17:08:56 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Nov 22 17:30:35 2017 +0300

----------------------------------------------------------------------
 modules/core/pom.xml                            |   15 +
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java |  313 ++++
 .../discovery/zk/internal/ZkAliveNodeData.java  |   37 +
 .../discovery/zk/internal/ZkClusterNodes.java   |   92 ++
 .../zk/internal/ZkDiscoveryCustomEventData.java |   72 +
 .../zk/internal/ZkDiscoveryEventData.java       |  110 ++
 .../zk/internal/ZkDiscoveryEventsData.java      |   67 +
 .../internal/ZkDiscoveryNodeFailEventData.java  |   51 +
 .../internal/ZkDiscoveryNodeJoinEventData.java  |   53 +
 .../discovery/zk/internal/ZkEventAckFuture.java |  142 ++
 .../discovery/zk/internal/ZkIgnitePaths.java    |  222 +++
 .../zk/internal/ZkJoinEventDataForJoined.java   |   53 +
 .../zk/internal/ZkJoiningNodeData.java          |   61 +
 .../discovery/zk/internal/ZookeeperClient.java  |  816 ++++++++++
 .../ZookeeperClientFailedException.java         |   30 +
 .../zk/internal/ZookeeperClusterNode.java       |  239 +++
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 1454 +++++++++++++++++
 .../zk/internal/ZookeeperClientTest.java        |  404 +++++
 .../ZookeeperDiscoverySpiBasicTest.java         |  994 ++++++++++++
 .../testframework/junits/GridAbstractTest.java  |   24 +
 .../zookeeper/ZkTestClientCnxnSocketNIO.java    |  131 ++
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java |  293 ----
 .../discovery/zk/internal/ZkAliveNodeData.java  |   37 -
 .../discovery/zk/internal/ZkClusterNodes.java   |   92 --
 .../zk/internal/ZkDiscoveryCustomEventData.java |   72 -
 .../zk/internal/ZkDiscoveryEventData.java       |  110 --
 .../zk/internal/ZkDiscoveryEventsData.java      |   67 -
 .../internal/ZkDiscoveryNodeFailEventData.java  |   51 -
 .../internal/ZkDiscoveryNodeJoinEventData.java  |   53 -
 .../discovery/zk/internal/ZkEventAckFuture.java |  142 --
 .../discovery/zk/internal/ZkIgnitePaths.java    |  137 --
 .../zk/internal/ZkJoinEventDataForJoined.java   |   53 -
 .../zk/internal/ZkJoiningNodeData.java          |   61 -
 .../discovery/zk/internal/ZookeeperClient.java  |  816 ----------
 .../ZookeeperClientFailedException.java         |   30 -
 .../zk/internal/ZookeeperClusterNode.java       |  224 ---
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 1455 ------------------
 .../zk/internal/ZookeeperClientTest.java        |  404 -----
 .../ZookeeperDiscoverySpiBasicTest.java         |  994 ------------
 .../zookeeper/ZkTestClientCnxnSocketNIO.java    |  131 --
 40 files changed, 5380 insertions(+), 5222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/core/pom.xml b/modules/core/pom.xml
index 622e076..34e6327 100644
--- a/modules/core/pom.xml
+++ b/modules/core/pom.xml
@@ -74,6 +74,21 @@
             <version>${jetbrains.annotations.version}</version>
         </dependency>
 
+        <!-- TODO ZK -->
+        <dependency>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+            <version>${zookeeper.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <version>${curator.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <!-- TODO ZK -->
+
         <dependency>
             <groupId>mx4j</groupId>
             <artifactId>mx4j-tools</artifactId>

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
new file mode 100644
index 0000000..ab59dc4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.discovery.JoiningNodesAware;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.IgniteSpiAdapter;
+import org.apache.ignite.spi.IgniteSpiContext;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
+import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
+import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
+import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
+import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
+import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClusterNode;
+import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+@IgniteSpiMultipleInstancesSupport(true)
+@DiscoverySpiOrderSupport(true)
+@DiscoverySpiHistorySupport(true)
+public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, JoiningNodesAware {
+    /** */
+    @GridToStringInclude
+    private String zkConnectionString;
+
+    /** */
+    @GridToStringInclude
+    private int sesTimeout = 5000;
+
+    /** */
+    @GridToStringInclude
+    private String basePath = "/apacheIgnite";
+
+    /** */
+    @GridToStringInclude
+    private String clusterName = "default";
+
+    /** */
+    @GridToStringExclude
+    private DiscoverySpiListener lsnr;
+
+    /** */
+    @GridToStringExclude
+    private DiscoverySpiDataExchange exchange;
+
+    /** */
+    @GridToStringExclude
+    private DiscoverySpiNodeAuthenticator auth;
+
+    /** */
+    @GridToStringExclude
+    private DiscoveryMetricsProvider metricsProvider;
+
+    /** */
+    @GridToStringExclude
+    private ZookeeperDiscoveryImpl impl;
+
+    /** */
+    @GridToStringExclude
+    private Map<String, Object> locNodeAttrs;
+
+    /** */
+    @GridToStringExclude
+    private IgniteProductVersion locNodeVer;
+
+    /** */
+    @GridToStringExclude
+    private Serializable consistentId;
+
+    /** */
+    @LoggerResource
+    @GridToStringExclude
+    private IgniteLogger log;
+
+    public String getBasePath() {
+        return basePath;
+    }
+
+    public ZookeeperDiscoverySpi setBasePath(String basePath) {
+        this.basePath = basePath;
+
+        return this;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public ZookeeperDiscoverySpi setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+
+        return this;
+    }
+
+    public int getSessionTimeout() {
+        return sesTimeout;
+    }
+
+    public ZookeeperDiscoverySpi setSessionTimeout(int sesTimeout) {
+        this.sesTimeout = sesTimeout;
+
+        return this;
+    }
+
+    public String getZkConnectionString() {
+        return zkConnectionString;
+    }
+
+    public ZookeeperDiscoverySpi setZkConnectionString(String zkConnectionString) {
+        this.zkConnectionString = zkConnectionString;
+
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean knownNode(UUID nodeId) {
+        return impl.knownNode(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Serializable consistentId() throws IgniteSpiException {
+        return consistentId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<ClusterNode> getRemoteNodes() {
+        return impl.remoteNodes();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterNode getLocalNode() {
+        return impl != null ? impl.localNode() : null;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public ClusterNode getNode(UUID nodeId) {
+        return impl.node(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean pingNode(UUID nodeId) {
+        return impl.pingNode(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) {
+        assert locNodeAttrs == null;
+        assert locNodeVer == null;
+
+        if (log.isDebugEnabled()) {
+            log.debug("Node attributes to set: " + attrs);
+            log.debug("Node version to set: " + ver);
+        }
+
+        locNodeAttrs = attrs;
+        locNodeVer = ver;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
+        this.lsnr = lsnr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setDataExchange(DiscoverySpiDataExchange exchange) {
+        this.exchange = exchange;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider) {
+        this.metricsProvider = metricsProvider;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void disconnect() throws IgniteSpiException {
+        // TODO ZK
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) {
+        // TODO ZK
+        this.auth = auth;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getGridStartTime() {
+        return impl.gridStartTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) {
+        impl.sendCustomMessage(msg);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void failNode(UUID nodeId, @Nullable String warning) {
+        // TODO ZK
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isClientMode() throws IllegalStateException {
+        return impl.localNode().isClient();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
+        super.onContextInitialized0(spiCtx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException {
+        ZookeeperClusterNode locNode = initLocalNode();
+
+        log.info("Start Zookeeper discovery [zkConnectionString=" + zkConnectionString +
+            ", sesTimeout=" + sesTimeout +
+            ", basePath=" + basePath +
+            ", clusterName=" + clusterName + ']');
+
+        impl = new ZookeeperDiscoveryImpl(log,
+            basePath,
+            clusterName,
+            locNode,
+            lsnr,
+            exchange);
+
+        try {
+            impl.joinTopology(igniteInstanceName, zkConnectionString, sesTimeout);
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteSpiException("Failed to join cluster, thread was interrupted", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void spiStop() throws IgniteSpiException {
+        if (impl != null)
+            impl.stop();
+    }
+
+    /**
+     * @return Local node instance.
+     */
+    private ZookeeperClusterNode initLocalNode() {
+        assert ignite != null;
+
+        consistentId = ignite.configuration().getConsistentId();
+
+        UUID nodeId = ignite.configuration().getNodeId();
+
+        // TODO ZK
+        if (consistentId == null)
+            consistentId = nodeId;
+
+        ZookeeperClusterNode locNode = new ZookeeperClusterNode(nodeId,
+            locNodeVer,
+            locNodeAttrs,
+            consistentId,
+            ignite.configuration().isClientMode());
+
+        locNode.local(true);
+
+        DiscoverySpiListener lsnr = this.lsnr;
+
+        if (lsnr != null)
+            lsnr.onLocalNodeInitialized(locNode);
+
+        if (log.isDebugEnabled())
+            log.debug("Local node initialized: " + locNode);
+
+        return locNode;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ZookeeperDiscoverySpi.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
new file mode 100644
index 0000000..45f453f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class ZkAliveNodeData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    long lastProcEvt = -1;
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ZkAliveNodeData.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
new file mode 100644
index 0000000..e3e5f8b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.ignite.cluster.ClusterNode;
+
+/**
+ *
+ */
+public class ZkClusterNodes {
+    /** */
+    final ConcurrentSkipListMap<Long, ZookeeperClusterNode> nodesByOrder = new ConcurrentSkipListMap<>();
+
+    /** */
+    final ConcurrentSkipListMap<Integer, ZookeeperClusterNode> nodesByInternalId = new ConcurrentSkipListMap<>();
+
+    /** */
+    final ConcurrentHashMap<UUID, ZookeeperClusterNode> nodesById = new ConcurrentHashMap<>();
+
+    /**
+     * @return Remote nodes.
+     */
+    public Collection<ClusterNode> remoteNodes() {
+        // TODO ZK
+        List<ClusterNode> nodes = new ArrayList<>();
+
+        for (ClusterNode node : nodesById.values()) {
+            if (!node.isLocal())
+                nodes.add(node);
+        }
+
+        return nodes;
+    }
+
+    /**
+     * @param node New node.
+     */
+    void addNode(ZookeeperClusterNode node) {
+        assert node.id() != null : node;
+        assert node.order() > 0 : node;
+
+        ZookeeperClusterNode old = nodesById.put(node.id(), node);
+
+        assert old == null : old;
+
+        old = nodesByOrder.put(node.order(), node);
+
+        assert old == null : old;
+
+        old = nodesByInternalId.put(node.internalId(), node);
+
+        assert old == null : old;
+    }
+
+    ZookeeperClusterNode removeNode(int internalId) {
+        ZookeeperClusterNode node = nodesByInternalId.remove(internalId);
+
+        assert node != null : internalId;
+        assert node.order() > 0 : node;
+
+        Object rvmd = nodesByOrder.remove(node.order());
+
+        assert rvmd != null;
+
+        rvmd = nodesById.remove(node.id());
+
+        assert rvmd != null;
+
+        return node;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
new file mode 100644
index 0000000..2e50831
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+
+/**
+ *
+ */
+class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData {
+    /** */
+    private static final int CUSTOM_MSG_ACK_FLAG = 1;
+
+    /** */
+    final UUID sndNodeId;
+
+    /** */
+    final String evtPath;
+
+    /** */
+    transient DiscoverySpiCustomMessage msg;
+
+    /**
+     * @param evtId Event ID.
+     * @param topVer Topology version.
+     * @param sndNodeId Sender node ID.
+     * @param evtPath Event path.
+     * @param ack Acknowledge event flag.
+     */
+    ZkDiscoveryCustomEventData(long evtId, long topVer, UUID sndNodeId, String evtPath, boolean ack) {
+        super(evtId, DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, topVer);
+
+        assert sndNodeId != null;
+        assert !F.isEmpty(evtPath);
+
+        this.sndNodeId = sndNodeId;
+        this.evtPath = evtPath;
+
+        if (ack)
+            flags |= CUSTOM_MSG_ACK_FLAG;
+    }
+
+    /**
+     * @return {@code True} for custom event ack message.
+     */
+    boolean ackEvent() {
+        return flagSet(CUSTOM_MSG_ACK_FLAG);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "ZkDiscoveryCustomEventData [topVer=" + topologyVersion() + ", evtId=" + eventId() + ", sndNode=" + sndNodeId + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
new file mode 100644
index 0000000..00330e4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Set;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+
+/**
+ *
+ */
+abstract class ZkDiscoveryEventData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final long evtId;
+
+    /** */
+    private final int evtType;
+
+    /** */
+    private final long topVer;
+
+    /** */
+    private transient Set<Integer> remainingAcks;
+
+    /** */
+    int flags;
+
+    /**
+     * @param evtType Event type.
+     * @param topVer Topology version.
+     */
+    ZkDiscoveryEventData(long evtId, int evtType, long topVer) {
+        assert evtType == EVT_NODE_JOINED || evtType == EVT_NODE_FAILED || evtType == EVT_DISCOVERY_CUSTOM_EVT : evtType;
+
+        this.evtId = evtId;
+        this.evtType = evtType;
+        this.topVer = topVer;
+    }
+
+    void remainingAcks(Collection<ZookeeperClusterNode> nodes) {
+        assert remainingAcks == null : this;
+
+        remainingAcks = U.newHashSet(nodes.size());
+
+        for (ZookeeperClusterNode node : nodes) {
+            if (!node.isLocal() && node.order() <= topVer)
+                remainingAcks.add(node.internalId());
+        }
+    }
+
+    boolean allAcksReceived() {
+        return remainingAcks.isEmpty();
+    }
+
+    boolean onAckReceived(Integer nodeInternalId, long ackEvtId) {
+        assert remainingAcks != null;
+
+        if (ackEvtId >= evtId)
+            remainingAcks.remove(nodeInternalId);
+
+        return remainingAcks.isEmpty();
+    }
+
+    boolean onNodeFail(ZookeeperClusterNode node) {
+        assert remainingAcks != null : this;
+
+        remainingAcks.remove(node.internalId());
+
+        return remainingAcks.isEmpty();
+    }
+
+    boolean flagSet(int flag) {
+        return (flags & flag) == flag;
+    }
+
+    long eventId() {
+        return evtId;
+    }
+
+    int eventType() {
+        return evtType;
+    }
+
+    long topologyVersion() {
+        return topVer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
new file mode 100644
index 0000000..ce21a06
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.TreeMap;
+
+/**
+ *
+ */
+class ZkDiscoveryEventsData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    int procCustEvt = -1;
+
+    /** */
+    long evtIdGen;
+
+    /** */
+    long topVer;
+
+    /** */
+    long gridStartTime;
+
+    /** */
+    TreeMap<Long, ZkDiscoveryEventData> evts;
+
+    /**
+     * @param topVer
+     * @param gridStartTime
+     * @param evts
+     */
+    ZkDiscoveryEventsData(long gridStartTime, long topVer, TreeMap<Long, ZkDiscoveryEventData> evts) {
+        this.gridStartTime = gridStartTime;
+        this.topVer = topVer;
+        this.evts = evts;
+    }
+
+    /**
+     * @param evt Event.
+     */
+    void addEvent(Collection<ZookeeperClusterNode> nodes, ZkDiscoveryEventData evt) {
+        Object old = evts.put(evt.eventId(), evt);
+
+        assert old == null : old;
+
+        evt.remainingAcks(nodes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
new file mode 100644
index 0000000..227bb94
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import org.apache.ignite.events.EventType;
+
+/**
+ *
+ */
+class ZkDiscoveryNodeFailEventData extends ZkDiscoveryEventData {
+    /** */
+    private int failedNodeInternalId;
+
+    /**
+     * @param evtId Event ID.
+     * @param topVer Topology version.
+     * @param failedNodeInternalId Failed node ID.
+     */
+    ZkDiscoveryNodeFailEventData(long evtId, long topVer, int failedNodeInternalId) {
+        super(evtId, EventType.EVT_NODE_FAILED, topVer);
+
+        this.failedNodeInternalId = failedNodeInternalId;
+    }
+
+    /**
+     * @return Failed node ID.
+     */
+    int failedNodeInternalId() {
+        return failedNodeInternalId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "NodeFailEventData [topVer=" + topologyVersion() + ", nodeId=" + failedNodeInternalId + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
new file mode 100644
index 0000000..5a828dc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+import org.apache.ignite.events.EventType;
+
+/**
+ *
+ */
+class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData {
+    /** */
+    final int joinedInternalId;
+
+    /** */
+    final UUID nodeId;
+
+    /** */
+    transient ZkJoiningNodeData joiningNodeData;
+
+    /**
+     * @param evtId Event ID.
+     * @param topVer Topology version.
+     * @param nodeId Joined node ID.
+     * @param joinedInternalId Joined node internal ID.
+     */
+    ZkDiscoveryNodeJoinEventData(long evtId, long topVer, UUID nodeId, int joinedInternalId) {
+        super(evtId, EventType.EVT_NODE_JOINED, topVer);
+
+        this.nodeId = nodeId;
+        this.joinedInternalId = joinedInternalId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "NodeJoinEventData [topVer=" + topologyVersion() + ", node=" + nodeId + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
new file mode 100644
index 0000000..c89b586
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class ZkEventAckFuture extends GridFutureAdapter<Void> implements Watcher, AsyncCallback.Children2Callback {
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private final ZookeeperDiscoveryImpl impl;
+
+    /** */
+    private final Long evtId;
+
+    /** */
+    private final String evtPath;
+
+    /** */
+    private final int expAcks;
+
+    /** */
+    private final Set<Integer> remaininAcks;
+
+    /**
+     * @param impl
+     * @param evtPath
+     * @param evtId
+     */
+    ZkEventAckFuture(ZookeeperDiscoveryImpl impl, String evtPath, Long evtId) {
+        this.impl = impl;
+        this.log = impl.log();
+        this.evtPath = evtPath;
+        this.evtId = evtId;
+
+        ZkClusterNodes top = impl.nodes();
+
+        remaininAcks = U.newHashSet(top.nodesById.size());
+
+        for (ZookeeperClusterNode node : top.nodesByInternalId.values()) {
+            if (!node.isLocal())
+                remaininAcks.add(node.internalId());
+        }
+
+        expAcks = remaininAcks.size();
+
+        if (expAcks == 0)
+            onDone();
+        else
+            impl.zkClient().getChildrenAsync(evtPath, this, this);
+    }
+
+    /**
+     * @return Event ID.
+     */
+    Long eventId() {
+        return evtId;
+    }
+
+    /**
+     * @param node Failed node.
+     */
+    void onNodeFail(ZookeeperClusterNode node) {
+        assert !remaininAcks.isEmpty();
+
+        if (remaininAcks.remove(node.internalId()) && remaininAcks.isEmpty())
+            onDone();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
+        if (super.onDone(res, err)) {
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void process(WatchedEvent evt) {
+        if (isDone())
+            return;
+
+        if (evt.getType() == Event.EventType.NodeChildrenChanged) {
+            if (evtPath.equals(evt.getPath()))
+                impl.zkClient().getChildrenAsync(evtPath, this, this);
+            else
+                U.warn(log, "Received event for unknown path: " + evt.getPath());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
+        assert rc == 0 : rc;
+
+        if (isDone())
+            return;
+
+        if (expAcks == stat.getCversion()) {
+            log.info("Received expected number of acks [expCnt=" + expAcks + ", cVer=" + stat.getCversion() + ']');
+
+            onDone();
+        }
+        else {
+            for (int i = 0; i < children.size(); i++) {
+                Integer nodeInternalId = Integer.parseInt(children.get(i));
+
+                if (remaininAcks.remove(nodeInternalId) && remaininAcks.size() == 0)
+                    onDone();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
new file mode 100644
index 0000000..1f6315c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+
+/**
+ *
+ */
+class ZkIgnitePaths {
+    /** */
+    private static final int UUID_LEN = 36;
+
+    /** */
+    private static final String JOIN_DATA_DIR = "joinData";
+
+    /** */
+    private static final String CUSTOM_EVTS_DIR = "customEvts";
+
+    /** */
+    private static final String CUSTOM_EVTS_ACKS_DIR = "customEvtsAcks";
+
+    /** */
+    private static final String ALIVE_NODES_DIR = "alive";
+
+    /** */
+    private static final String DISCO_EVENTS_PATH = "events";
+
+    /** */
+    final String basePath;
+
+    /** */
+    private final String clusterName;
+
+    /** */
+    final String clusterDir;
+
+    /** */
+    final String aliveNodesDir;
+
+    /** */
+    final String joinDataDir;
+
+    /** */
+    final String evtsPath;
+
+    /** */
+    final String customEvtsDir;
+
+    /** */
+    final String customEvtsAcksDir;
+
+    /**
+     * @param basePath Base directory.
+     * @param clusterName Cluster name.
+     */
+    ZkIgnitePaths(String basePath, String clusterName) {
+        this.basePath = basePath;
+        this.clusterName = clusterName;
+
+        clusterDir = basePath + "/" + clusterName;
+
+        aliveNodesDir = zkPath(ALIVE_NODES_DIR);
+        joinDataDir = zkPath(JOIN_DATA_DIR);
+        evtsPath = zkPath(DISCO_EVENTS_PATH);
+        customEvtsDir = zkPath(CUSTOM_EVTS_DIR);
+        customEvtsAcksDir = zkPath(CUSTOM_EVTS_ACKS_DIR);
+    }
+
+    /**
+     * TODO ZK: copied from curator.
+     *
+     * validate the provided znode path string
+     * @param path znode path string
+     * @param isSequential if the path is being created
+     * with a sequential flag
+     * @throws IllegalArgumentException if the path is invalid
+     */
+    public static void validatePath(String path, boolean isSequential)
+        throws IllegalArgumentException {
+        validatePath(isSequential? path + "1": path);
+    }
+
+    /**
+     * TODO ZK: copied from curator.
+     *
+     * Validate the provided znode path string
+     * @param path znode path string
+     * @return The given path if it was valid, for fluent chaining
+     * @throws IllegalArgumentException if the path is invalid
+     */
+    public static String validatePath(String path) throws IllegalArgumentException {
+        if (path == null) {
+            throw new IllegalArgumentException("Path cannot be null");
+        }
+        if (path.length() == 0) {
+            throw new IllegalArgumentException("Path length must be > 0");
+        }
+        if (path.charAt(0) != '/') {
+            throw new IllegalArgumentException(
+                "Path must start with / character");
+        }
+        if (path.length() == 1) { // done checking - it's the root
+            return path;
+        }
+        if (path.charAt(path.length() - 1) == '/') {
+            throw new IllegalArgumentException(
+                "Path must not end with / character");
+        }
+
+        String reason = null;
+        char lastc = '/';
+        char chars[] = path.toCharArray();
+        char c;
+        for (int i = 1; i < chars.length; lastc = chars[i], i++) {
+            c = chars[i];
+
+            if (c == 0) {
+                reason = "null character not allowed @" + i;
+                break;
+            } else if (c == '/' && lastc == '/') {
+                reason = "empty node name specified @" + i;
+                break;
+            } else if (c == '.' && lastc == '.') {
+                if (chars[i-2] == '/' &&
+                    ((i + 1 == chars.length)
+                        || chars[i+1] == '/')) {
+                    reason = "relative paths not allowed @" + i;
+                    break;
+                }
+            } else if (c == '.') {
+                if (chars[i-1] == '/' &&
+                    ((i + 1 == chars.length)
+                        || chars[i+1] == '/')) {
+                    reason = "relative paths not allowed @" + i;
+                    break;
+                }
+            } else if (c > '\u0000' && c < '\u001f'
+                || c > '\u007f' && c < '\u009F'
+                || c > '\ud800' && c < '\uf8ff'
+                || c > '\ufff0' && c < '\uffff') {
+                reason = "invalid charater @" + i;
+                break;
+            }
+        }
+
+        if (reason != null) {
+            throw new IllegalArgumentException(
+                "Invalid path string \"" + path + "\" caused by " + reason);
+        }
+
+        return path;
+    }
+
+    /**
+     * @param path Relative path.
+     * @return Full path.
+     */
+    String zkPath(String path) {
+        return basePath + "/" + clusterName + "/" + path;
+    }
+
+    static int aliveInternalId(String path) {
+        int idx = path.lastIndexOf('|');
+
+        return Integer.parseInt(path.substring(idx + 1));
+    }
+
+    static UUID aliveNodeId(String path) {
+        String idStr = path.substring(0, ZkIgnitePaths.UUID_LEN);
+
+        return UUID.fromString(idStr);
+    }
+
+    static int aliveJoinSequence(String path) {
+        int idx1 = path.indexOf('|');
+        int idx2 = path.lastIndexOf('|');
+
+        return Integer.parseInt(path.substring(idx1 + 1, idx2));
+    }
+
+    static int customEventSequence(String path) {
+        int idx = path.lastIndexOf('|');
+
+        return Integer.parseInt(path.substring(idx + 1));
+    }
+
+    static UUID customEventSendNodeId(String path) {
+        String idStr = path.substring(0, ZkIgnitePaths.UUID_LEN);
+
+        return UUID.fromString(idStr);
+    }
+
+    String joinEventDataPath(long evtId) {
+        return evtsPath + "/" + evtId;
+    }
+
+    String joinEventDataPathForJoined(long evtId) {
+        return evtsPath + "/joined-" + evtId;
+    }
+
+    String customEventDataPath(boolean ack, String child) {
+        String baseDir = ack ? customEvtsAcksDir : customEvtsDir;
+
+        return baseDir + "/" + child;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
new file mode 100644
index 0000000..cdbfdc0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+class ZkJoinEventDataForJoined implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final List<ZookeeperClusterNode> top;
+
+    /** */
+    private final Map<Integer, Serializable> discoData;
+
+    /**
+     * @param top Topology.
+     * @param discoData Discovery data.
+     */
+    ZkJoinEventDataForJoined(List<ZookeeperClusterNode> top, Map<Integer, Serializable> discoData) {
+        this.top = top;
+        this.discoData = discoData;
+    }
+
+    List<ZookeeperClusterNode> topology() {
+        return top;
+    }
+
+    Map<Integer, Serializable> discoveryData() {
+        return discoData;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
new file mode 100644
index 0000000..1947b6b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ *
+ */
+class ZkJoiningNodeData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final ZookeeperClusterNode node;
+
+    /** */
+    private final Map<Integer, Serializable> discoData;
+
+    /**
+     * @param node Node.
+     * @param discoData Discovery data.
+     */
+    ZkJoiningNodeData(ZookeeperClusterNode node, Map<Integer, Serializable> discoData) {
+        assert node != null && node.id() != null : node;
+        assert discoData != null;
+
+        this.node = node;
+        this.discoData = discoData;
+    }
+
+    /**
+     * @return Node.
+     */
+    ZookeeperClusterNode node() {
+        return node;
+    }
+
+    /**
+     * @return Discovery data.
+     */
+    Map<Integer, Serializable> discoveryData() {
+        return discoData;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
new file mode 100644
index 0000000..626b235
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -0,0 +1,816 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class ZookeeperClient implements Watcher {
+    /** */
+    private static final long RETRY_TIMEOUT = 1000;
+
+    /** */
+    private static final List<ACL> ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+    /** */
+    private static final byte[] EMPTY_BYTES = {};
+
+    /** */
+    private final ZooKeeper zk;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private ConnectionState state = ConnectionState.Disconnected;
+
+    /** */
+    private long connLossTimeout;
+
+    /** */
+    private volatile long connStartTime;
+
+    /** */
+    private final Object stateMux = new Object();
+
+    /** */
+    private final IgniteRunnable connLostC;
+
+    /** */
+    private final Timer connTimer;
+
+    /** */
+    private final ArrayDeque<ZkAsyncOperation> retryQ = new ArrayDeque<>();
+
+    /**
+     * @param log Logger.
+     * @param connectString ZK connection string.
+     * @param sesTimeout ZK session timeout.
+     * @param connLostC Lost connection callback.
+     * @throws Exception If failed.
+     */
+    ZookeeperClient(IgniteLogger log, String connectString, int sesTimeout, IgniteRunnable connLostC) throws Exception {
+        this(null, log, connectString, sesTimeout, connLostC);
+    }
+
+    /**
+     * @param igniteInstanceName Ignite instance name.
+     * @param log Logger.
+     * @param connectString ZK connection string.
+     * @param sesTimeout ZK session timeout.
+     * @param connLostC Lost connection callback.
+     * @throws Exception If failed.
+     */
+    ZookeeperClient(String igniteInstanceName,
+        IgniteLogger log,
+        String connectString,
+        int sesTimeout,
+        IgniteRunnable connLostC)
+        throws Exception
+    {
+        this.log = log.getLogger(getClass());
+        this.connLostC = connLostC;
+
+        connLossTimeout = sesTimeout;
+
+        connStartTime = System.currentTimeMillis();
+
+        String threadName = Thread.currentThread().getName();
+
+        // ZK generates internal threads' names using current thread name.
+        Thread.currentThread().setName("zk-" + igniteInstanceName);
+
+        try {
+            zk = new ZooKeeper(connectString, sesTimeout, this);
+        }
+        finally {
+            Thread.currentThread().setName(threadName);
+        }
+
+        connTimer = new Timer("zk-timer-" + igniteInstanceName);
+
+        scheduleConnectionCheck();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void process(WatchedEvent evt) {
+        if (evt.getType() == Event.EventType.None) {
+            ConnectionState newState;
+
+            synchronized (stateMux) {
+                if (state == ConnectionState.Lost) {
+                    U.warn(log, "Received event after connection was lost [evtState=" + evt.getState() + "]");
+
+                    return;
+                }
+
+                if (!zk.getState().isAlive())
+                    return;
+
+                Event.KeeperState zkState = evt.getState();
+
+                switch (zkState) {
+                    case Disconnected:
+                        newState = ConnectionState.Disconnected;
+
+                        break;
+
+                    case SyncConnected:
+                        newState = ConnectionState.Connected;
+
+                        break;
+
+                    case Expired:
+                        newState = ConnectionState.Lost;
+
+                        break;
+
+                    default:
+                        U.error(log, "Unexpected state for zookeeper client, close connection: " + zkState);
+
+                        newState = ConnectionState.Lost;
+                }
+
+                if (newState != state) {
+                    log.info("Zookeeper client state changed [prevState=" + state + ", newState=" + newState + ']');
+
+                    state = newState;
+
+                    if (newState == ConnectionState.Disconnected) {
+                        connStartTime = System.currentTimeMillis();
+
+                        scheduleConnectionCheck();
+                    }
+                    else if (newState == ConnectionState.Connected)
+                        stateMux.notifyAll();
+                    else {
+                        assert state == ConnectionState.Lost : state;
+
+                        closeClient();
+                    }
+                }
+                else
+                    return;
+            }
+
+            if (newState == ConnectionState.Lost)
+                notifyConnectionLost();
+            else if (newState == ConnectionState.Connected) {
+                for (ZkAsyncOperation op : retryQ)
+                    op.execute();
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private void notifyConnectionLost() {
+        if (state == ConnectionState.Lost && connLostC != null)
+            connLostC.run();
+    }
+
+    boolean exists(String path) throws ZookeeperClientFailedException, InterruptedException {
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                return zk.exists(path, false) != null;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    void createAllIfNeeded(List<String> paths, CreateMode createMode)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        // TODO ZK: need check for max size?
+        List<Op> ops = new ArrayList<>(paths.size());
+
+        for (String path : paths)
+            ops.add(Op.create(path, EMPTY_BYTES, ZK_ACL, createMode));
+
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                zk.multi(ops);
+
+                return;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+
+    }
+
+    String createIfNeeded(String path, byte[] data, CreateMode createMode)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        if (data == null)
+            data = EMPTY_BYTES;
+
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                return zk.create(path, data, ZK_ACL, createMode);
+            }
+            catch (KeeperException.NodeExistsException e) {
+                log.info("Node already exists: " + path);
+
+                return path;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    List<String> getChildren(String path)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                return zk.getChildren(path, false);
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    void deleteIfExists(String path, int ver)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        try {
+            delete(path, ver);
+        }
+        catch (KeeperException.NoNodeException e) {
+            // No-op if node does not exist.
+        }
+    }
+
+
+    void deleteAll(@Nullable String parent, List<String> paths, int ver)
+        throws KeeperException.NoNodeException, ZookeeperClientFailedException, InterruptedException
+    {
+        if (paths.isEmpty())
+            return;
+
+        // TODO ZK: need check for max size?
+        List<Op> ops = new ArrayList<>(paths.size());
+
+        for (String path : paths) {
+            String path0 = parent != null ? parent + "/" + path : path;
+
+            ops.add(Op.delete(path0, ver));
+        }
+
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                zk.multi(ops);
+
+                return;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    void delete(String path, int ver)
+        throws KeeperException.NoNodeException, ZookeeperClientFailedException, InterruptedException
+    {
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                zk.delete(path, ver);
+
+                return;
+            }
+            catch (KeeperException.NoNodeException e) {
+                throw e;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    void setData(String path, byte[] data, int ver)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        if (data == null)
+            data = EMPTY_BYTES;
+
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                zk.setData(path, data, ver);
+
+                return;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    byte[] getData(String path)
+        throws KeeperException.NoNodeException, ZookeeperClientFailedException, InterruptedException
+    {
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                return zk.getData(path, false, null);
+            }
+            catch (KeeperException.NoNodeException e) {
+                throw e;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    void existsAsync(String path, Watcher watcher, AsyncCallback.StatCallback cb) {
+        ExistsOperation op = new ExistsOperation(path, watcher, cb);
+
+        zk.exists(path, watcher, new StatCallbackWrapper(op), null);
+    }
+
+    void getChildrenAsync(String path, Watcher watcher, AsyncCallback.Children2Callback cb) {
+        GetChildrenOperation op = new GetChildrenOperation(path, watcher, cb);
+
+        zk.getChildren(path, watcher, new ChildrenCallbackWrapper(op), null);
+    }
+
+    void getDataAsync(String path, Watcher watcher, AsyncCallback.DataCallback cb) {
+        GetDataOperation op = new GetDataOperation(path, watcher, cb);
+
+        zk.getData(path, watcher, new DataCallbackWrapper(op), null);
+    }
+
+    void createAsync(String path, byte[] data, CreateMode createMode, AsyncCallback.StringCallback cb) {
+        if (data == null)
+            data = EMPTY_BYTES;
+
+        CreateOperation op = new CreateOperation(path, data, createMode, cb);
+
+        zk.create(path, data, ZK_ACL, createMode, new CreateCallbackWrapper(op), null);
+    }
+
+    /**
+     *
+     */
+    public void close() {
+        closeClient();
+    }
+
+    /**
+     * @param e Error.
+     */
+    private void onZookeeperError(long prevConnStartTime, Exception e)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        ZookeeperClientFailedException err = null;
+
+        synchronized (stateMux) {
+            U.warn(log, "Failed to execute zookeeper operation [err=" + e + ", state=" + state + ']');
+
+            if (zk.getState() == ZooKeeper.States.CLOSED)
+                throw new ZookeeperClientFailedException(e);
+
+            if (state == ConnectionState.Lost) {
+                U.error(log, "Operation failed with unexpected error, connection lost: " + e, e);
+
+                throw new ZookeeperClientFailedException(e);
+            }
+
+            boolean retry = (e instanceof KeeperException) && needRetry(((KeeperException)e).code().intValue());
+
+            if (retry) {
+                long remainingTime;
+
+                if (state == ConnectionState.Connected && connStartTime == prevConnStartTime) {
+                    state = ConnectionState.Disconnected;
+
+                    connStartTime = System.currentTimeMillis();
+
+                    remainingTime = connLossTimeout;
+                }
+                else {
+                    assert connStartTime != 0;
+
+                    assert state == ConnectionState.Disconnected;
+
+                    remainingTime = connLossTimeout - (System.currentTimeMillis() - connStartTime);
+
+                    if (remainingTime <= 0) {
+                        state = ConnectionState.Lost;
+
+                        U.warn(log, "Failed to establish zookeeper connection, close client " +
+                            "[timeout=" + connLossTimeout + ']');
+
+                        closeClient();
+
+                        err = new ZookeeperClientFailedException(e);
+                    }
+                }
+
+                if (err == null) {
+                    U.warn(log, "Zookeeper operation failed, will retry [err=" + e +
+                        ", retryTimeout=" + RETRY_TIMEOUT +
+                        ", connLossTimeout=" + connLossTimeout +
+                        ", remainingWaitTime=" + remainingTime + ']');
+
+                    stateMux.wait(RETRY_TIMEOUT);
+                }
+            }
+            else {
+                U.error(log, "Operation failed with unexpected error, close client: " + e, e);
+
+                closeClient();
+
+                state = ConnectionState.Lost;
+
+                throw new ZookeeperClientFailedException(e);
+            }
+        }
+
+        if (err != null) {
+            notifyConnectionLost();
+
+            throw err;
+        }
+    }
+
+    /**
+     * @param code Zookeeper error code.
+     * @return {@code True} if can retry operation.
+     */
+    private boolean needRetry(int code) {
+        // TODO ZL: other codes.
+        return code == KeeperException.Code.CONNECTIONLOSS.intValue();
+    }
+
+    /**
+     *
+     */
+    private void closeClient() {
+        try {
+            zk.close();
+        }
+        catch (Exception closeErr) {
+            U.warn(log, "Failed to close zookeeper client: " + closeErr, closeErr);
+        }
+
+        connTimer.cancel();
+    }
+
+    /**
+     *
+     */
+    private void scheduleConnectionCheck() {
+        assert state == ConnectionState.Disconnected : state;
+
+        connTimer.schedule(new ConnectionTimeoutTask(connStartTime), connLossTimeout);
+    }
+
+    /**
+     *
+     */
+    interface ZkAsyncOperation {
+        /**
+         *
+         */
+        void execute();
+    }
+
+    /**
+     *
+     */
+    class GetChildrenOperation implements ZkAsyncOperation {
+        /** */
+        private final String path;
+
+        /** */
+        private final Watcher watcher;
+
+        /** */
+        private final AsyncCallback.Children2Callback cb;
+
+        GetChildrenOperation(String path, Watcher watcher, AsyncCallback.Children2Callback cb) {
+            this.path = path;
+            this.watcher = watcher;
+            this.cb = cb;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute() {
+            getChildrenAsync(path, watcher, cb);
+        }
+    }
+
+    /**
+     *
+     */
+    class GetDataOperation implements ZkAsyncOperation {
+        /** */
+        private final String path;
+
+        /** */
+        private final Watcher watcher;
+
+        /** */
+        private final AsyncCallback.DataCallback cb;
+
+        /**
+         * @param path Path.
+         * @param watcher Watcher.
+         * @param cb Callback.
+         */
+        GetDataOperation(String path, Watcher watcher, AsyncCallback.DataCallback cb) {
+            this.path = path;
+            this.watcher = watcher;
+            this.cb = cb;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute() {
+            getDataAsync(path, watcher, cb);
+        }
+    }
+
+    /**
+     *
+     */
+    class ExistsOperation implements ZkAsyncOperation {
+        /** */
+        private final String path;
+
+        /** */
+        private final Watcher watcher;
+
+        /** */
+        private final AsyncCallback.StatCallback cb;
+
+        /**
+         * @param path Path.
+         * @param watcher Watcher.
+         * @param cb Callback.
+         */
+        ExistsOperation(String path, Watcher watcher, AsyncCallback.StatCallback cb) {
+            this.path = path;
+            this.watcher = watcher;
+            this.cb = cb;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute() {
+            existsAsync(path, watcher, cb);
+        }
+    }
+
+    /**
+     *
+     */
+    class CreateOperation implements ZkAsyncOperation {
+        /** */
+        private final String path;
+
+        /** */
+        private final byte[] data;
+
+        /** */
+        private final CreateMode createMode;
+
+        /** */
+        private final AsyncCallback.StringCallback cb;
+
+        CreateOperation(String path, byte[] data, CreateMode createMode, AsyncCallback.StringCallback cb) {
+            this.path = path;
+            this.data = data;
+            this.createMode = createMode;
+            this.cb = cb;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute() {
+            createAsync(path, data, createMode, cb);
+        }
+    }
+
+    /**
+     *
+     */
+    class CreateCallbackWrapper implements AsyncCallback.StringCallback {
+        /** */
+        final CreateOperation op;
+
+        /**
+         * @param op Operation.
+         */
+        CreateCallbackWrapper(CreateOperation op) {
+            this.op = op;
+        }
+
+        @Override public void processResult(int rc, String path, Object ctx, String name) {
+            if (rc == KeeperException.Code.NODEEXISTS.intValue())
+                return;
+
+            if (needRetry(rc)) {
+                U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [path=" + path + ']');
+
+                retryQ.add(op);
+            }
+            else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue())
+                U.warn(log, "Failed to execute async operation, connection lost [path=" + path + ']');
+            else {
+                if (op.cb != null)
+                    op.cb.processResult(rc, path, ctx, name);
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    class ChildrenCallbackWrapper implements AsyncCallback.Children2Callback {
+        /** */
+        private final GetChildrenOperation op;
+
+        /**
+         * @param op Operation.
+         */
+        private ChildrenCallbackWrapper(GetChildrenOperation op) {
+            this.op = op;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
+            if (needRetry(rc)) {
+                U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [path=" + path + ']');
+
+                retryQ.add(op);
+            }
+            else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue())
+                U.warn(log, "Failed to execute async operation, connection lost [path=" + path + ']');
+            else
+                op.cb.processResult(rc, path, ctx, children, stat);
+        }
+    }
+
+    /**
+     *
+     */
+    class DataCallbackWrapper implements AsyncCallback.DataCallback {
+        /** */
+        private final GetDataOperation op;
+
+        /**
+         * @param op Operation.
+         */
+        private DataCallbackWrapper(GetDataOperation op) {
+            this.op = op;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+            if (needRetry(rc)) {
+                U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [path=" + path + ']');
+
+                retryQ.add(op);
+            }
+            else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue())
+                U.warn(log, "Failed to execute async operation, connection lost [path=" + path + ']');
+            else
+                op.cb.processResult(rc, path, ctx, data, stat);
+        }
+    }
+
+    /**
+     *
+     */
+    class StatCallbackWrapper implements AsyncCallback.StatCallback {
+        /** */
+        private final ExistsOperation op;
+
+        /**
+         * @param op Operation.
+         */
+        private StatCallbackWrapper(ExistsOperation op) {
+            this.op = op;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx, Stat stat) {
+            if (needRetry(rc)) {
+                U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [path=" + path + ']');
+
+                retryQ.add(op);
+            }
+            else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue())
+                U.warn(log, "Failed to execute async operation, connection lost [path=" + path + ']');
+            else
+                op.cb.processResult(rc, path, ctx, stat);
+        }
+    }
+
+    /**
+     *
+     */
+    private class ConnectionTimeoutTask extends TimerTask {
+        /** */
+        private final long connectStartTime;
+
+        /**
+         * @param connectStartTime Time was connection started.
+         */
+        ConnectionTimeoutTask(long connectStartTime) {
+            this.connectStartTime = connectStartTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            boolean connLoss = false;
+
+            synchronized (stateMux) {
+                if (state == ConnectionState.Disconnected &&
+                    ZookeeperClient.this.connStartTime == connectStartTime) {
+
+                    state = ConnectionState.Lost;
+
+                    U.warn(log, "Failed to establish zookeeper connection, close client " +
+                        "[timeout=" + connLossTimeout + ']');
+
+                    connLoss = true;
+
+                    closeClient();
+                }
+            }
+
+            if (connLoss)
+                notifyConnectionLost();
+        }
+    }
+
+    /**
+     *
+     */
+    private enum ConnectionState {
+        /** */
+        Connected,
+        /** */
+        Disconnected,
+        /** */
+        Lost
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java
new file mode 100644
index 0000000..75a577c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+/**
+ *
+ */
+public class ZookeeperClientFailedException extends Exception {
+    /**
+     * @param cause Cause.
+     */
+    public ZookeeperClientFailedException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
new file mode 100644
index 0000000..504a1b1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterMetrics;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.ClusterMetricsSnapshot;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DAEMON;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTENT_ID;
+
+/**
+ *
+ */
+public class ZookeeperClusterNode implements ClusterNode, Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private UUID id;
+
+    /** */
+    private Serializable consistentId;
+
+    /** */
+    private int internalId;
+
+    /** */
+    private long order;
+
+    /** */
+    private IgniteProductVersion ver;
+
+    /** Node attributes. */
+    @GridToStringExclude
+    private Map<String, Object> attrs;
+
+    /** */
+    private transient boolean loc;
+
+    /** TODO ZK */
+    private transient ClusterMetrics metrics;
+
+    /** */
+    private boolean client;
+
+    /** Daemon node flag. */
+    @GridToStringExclude
+    private transient boolean daemon;
+
+    /** Daemon node initialization flag. */
+    @GridToStringExclude
+    private transient volatile boolean daemonInit;
+
+    /**
+     * @param id Node ID.
+     * @param ver Node version.
+     * @param attrs Node attributes.
+     * @param consistentId Consistent ID.
+     * @param client Client node flag.
+     */
+    public ZookeeperClusterNode(UUID id,
+        IgniteProductVersion ver,
+        Map<String, Object> attrs,
+        Serializable consistentId,
+        boolean client) {
+        assert id != null;
+        assert consistentId != null;
+
+        this.id = id;
+        this.ver = ver;
+        this.attrs = U.sealMap(attrs);
+        this.consistentId = consistentId;
+        this.client = client;
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object consistentId() {
+        return consistentId;
+    }
+
+    /**
+     * Sets consistent globally unique node ID which survives node restarts.
+     *
+     * @param consistentId Consistent globally unique node ID.
+     */
+    public void setConsistentId(Serializable consistentId) {
+        this.consistentId = consistentId;
+
+        final Map<String, Object> map = new HashMap<>(attrs);
+
+        map.put(ATTR_NODE_CONSISTENT_ID, consistentId);
+
+        attrs = Collections.unmodifiableMap(map);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public <T> T attribute(String name) {
+        // Even though discovery SPI removes this attribute after authentication, keep this check for safety.
+        if (IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(name))
+            return null;
+
+        return (T)attrs.get(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterMetrics metrics() {
+        if (metrics == null)
+            metrics = new ClusterMetricsSnapshot();
+
+        return metrics;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, Object> attributes() {
+        // Even though discovery SPI removes this attribute after authentication, keep this check for safety.
+        return F.view(attrs, new IgnitePredicate<String>() {
+            @Override public boolean apply(String s) {
+                return !IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(s);
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<String> addresses() {
+        return Collections.emptyList();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<String> hostNames() {
+        return Collections.emptyList();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long order() {
+        return order;
+    }
+
+    /**
+     * @return Internal ID corresponds to Zookeeper sequential node.
+     */
+    public int internalId() {
+        return internalId;
+    }
+
+    /**
+     * @param internalId Internal ID corresponds to Zookeeper sequential node.
+     */
+    void internalId(int internalId) {
+        this.internalId = internalId;
+    }
+
+    void order(long order) {
+        assert order > 0 : order;
+
+        this.order = order;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteProductVersion version() {
+        return ver;
+    }
+
+    /**
+     * @param loc Local node flag.
+     */
+    public void local(boolean loc) {
+        this.loc = loc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isLocal() {
+        return loc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isDaemon() {
+        if (!daemonInit) {
+            daemon = "true".equalsIgnoreCase((String)attribute(ATTR_DAEMON));
+
+            daemonInit = true;
+        }
+
+        return daemon;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isClient() {
+        return client;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return id.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        return F.eqNodes(this, obj);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "ZookeeperClusterNode [id=" + id + ", order=" + order + ", client=" + client + ']';
+    }
+}


Mime
View raw message